235 lines
6.7 KiB
Go
235 lines
6.7 KiB
Go
/*
|
|
Copyright 2021.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controllers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
|
"github.com/aws/aws-sdk-go/aws/request"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/ec2"
|
|
)
|
|
|
|
const (
|
|
instanceSyncInterval = 60 * time.Second
|
|
)
|
|
|
|
// NodeReconciler reconciles a Node object
|
|
type NodeReconciler struct {
|
|
client.Client
|
|
Scheme *runtime.Scheme
|
|
DryRun bool
|
|
Instances map[string]InstanceCache
|
|
Provider *ec2.EC2
|
|
}
|
|
|
|
// stores instances data from cloud provider
|
|
type InstanceCache struct {
|
|
lastCheck time.Time
|
|
}
|
|
|
|
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
|
|
//+kubebuilder:rbac:groups=core,resources=nodes/finalizers,verbs=update
|
|
|
|
// IsNodeReady returns true if node.Status.Conditions{KubeletReady: True}
|
|
func isNodeReady(conditions []corev1.NodeCondition) bool {
|
|
for _, condition := range conditions {
|
|
if condition.Type == corev1.NodeReady {
|
|
return condition.Status == corev1.ConditionTrue
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// parse node.ProviderID, return provider, id
|
|
// eg. AWS: 'aws://<availability-zone>/<instance-id>'
|
|
func parseProviderID(providerID string) (string, string, error) {
|
|
instanceID := ""
|
|
provider := ""
|
|
|
|
parsedProvider := strings.Split(providerID, "://")
|
|
if len(parsedProvider) == 2 {
|
|
provider = parsedProvider[0]
|
|
if provider == "aws" {
|
|
parsedID := strings.Split(strings.Trim(parsedProvider[1], "/"), "/")
|
|
if len(parsedID) == 2 {
|
|
instanceID = parsedID[1]
|
|
}
|
|
} else {
|
|
return "", "", errors.New("Unknown provider")
|
|
}
|
|
} else {
|
|
return "", "", errors.New("Cannot parse providerID")
|
|
}
|
|
|
|
return provider, instanceID, nil
|
|
}
|
|
|
|
func NewProviderEC2() *ec2.EC2 {
|
|
logger := ctrl.Log.WithName("aws")
|
|
|
|
sess := session.Must(session.NewSession())
|
|
sess.Handlers.Build.PushFrontNamed(request.NamedHandler{
|
|
Name: "authenticatorUserAgent",
|
|
Fn: request.MakeAddToUserAgentHandler(
|
|
"kubezero-forseti", "0.0.1"),
|
|
})
|
|
if aws.StringValue(sess.Config.Region) == "" {
|
|
ec2metadata := ec2metadata.New(sess)
|
|
regionFound, err := ec2metadata.Region()
|
|
if err != nil {
|
|
logger.Error(err, "Region not found in shared credentials, environment variable, or instance metadata.")
|
|
return nil
|
|
}
|
|
sess.Config.Region = aws.String(regionFound)
|
|
}
|
|
creds, err := sess.Config.Credentials.Get()
|
|
if err != nil {
|
|
logger.Error(err, "Unable to get AWS credentials")
|
|
}
|
|
logger.Info(fmt.Sprintf("AWS Credentials retrieved from provider: %s", creds.ProviderName))
|
|
|
|
return ec2.New(sess)
|
|
}
|
|
|
|
func getNode(provider *ec2.EC2, instanceID string) (bool, error) {
|
|
logger := ctrl.Log.WithName("aws")
|
|
|
|
result, err := provider.DescribeInstances(&ec2.DescribeInstancesInput{
|
|
InstanceIds: []*string{
|
|
aws.String(instanceID),
|
|
},
|
|
})
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "InvalidInstanceID.NotFound" {
|
|
logger.V(2).Info("Instance not found")
|
|
return false, nil
|
|
}
|
|
return true, err
|
|
}
|
|
if len(result.Reservations) == 0 || len(result.Reservations[0].Instances) == 0 {
|
|
logger.V(2).Info("Instance not found")
|
|
return false, nil
|
|
}
|
|
|
|
logger.V(2).Info(fmt.Sprintf("Instance %s found", instanceID))
|
|
return true, nil
|
|
}
|
|
|
|
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
|
|
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
var node corev1.Node
|
|
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
logger.Info("Node already deleted.")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
logger.Error(err, "unable to fetch Node")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
if isNodeReady(node.Status.Conditions) {
|
|
logger.V(2).Info("Node ready")
|
|
return ctrl.Result{}, nil
|
|
|
|
}
|
|
|
|
logger.V(2).Info("Node not ready")
|
|
instance := r.Instances[node.Name]
|
|
retryTime := instance.lastCheck.Add(instanceSyncInterval)
|
|
|
|
if time.Now().After(retryTime) {
|
|
provider, instanceID, err := parseProviderID(node.Spec.ProviderID)
|
|
if err != nil {
|
|
logger.Error(err, "Error parsing ProviderID from node")
|
|
return ctrl.Result{}, err
|
|
}
|
|
logger.V(2).Info("parsed node.Spec.ProviderID", "provider", provider, "id", instanceID)
|
|
|
|
//TODO. convert provider aws into interface to call getNode
|
|
exists, err := getNode(r.Provider, instanceID)
|
|
|
|
if err != nil {
|
|
logger.Error(err, "Error calling cloud provider")
|
|
return ctrl.Result{}, err
|
|
}
|
|
instance.lastCheck = time.Now()
|
|
r.Instances[node.Name] = instance
|
|
|
|
// Delete node
|
|
if exists == false {
|
|
logger.Info(fmt.Sprintf("Instance %s for node %s does not exist", instanceID, node.Name))
|
|
|
|
deleteOptions := []client.DeleteOption{client.PropagationPolicy(metav1.DeletePropagationBackground)}
|
|
if r.DryRun {
|
|
deleteOptions = append(deleteOptions, client.DryRunAll)
|
|
}
|
|
if err := r.Delete(ctx, &node, deleteOptions...); client.IgnoreNotFound(err) != nil {
|
|
logger.Error(err, "unable to delete node")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Also remove node from cache
|
|
delete(r.Instances, node.Name)
|
|
logger.Info("node deleted")
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Requeue at retryTime
|
|
requeueDelay := retryTime.Sub(time.Now())
|
|
logger.V(2).Info(fmt.Sprintf("Requeue event after %s", requeueDelay))
|
|
return ctrl.Result{RequeueAfter: requeueDelay}, nil
|
|
}
|
|
|
|
// SetupWithManager sets up the controller with the Manager.
|
|
// skip delete events, keep create to get all events at startup
|
|
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&corev1.Node{}).
|
|
WithEventFilter(predicate.Funcs{
|
|
// CreateFunc: func(e event.CreateEvent) bool {
|
|
// return false
|
|
// },
|
|
DeleteFunc: func(e event.DeleteEvent) bool {
|
|
return false
|
|
},
|
|
}).
|
|
Complete(r)
|
|
}
|