/* Copyright 2021. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "errors" "fmt" "strings" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "git.zero-downtime.net/forseti/pkg/providers" "github.com/aws/aws-sdk-go/service/ec2" ) const ( instanceSyncInterval = 60 * time.Second ) // NodeReconciler reconciles a Node object type NodeReconciler struct { client.Client Scheme *runtime.Scheme DryRun bool Instances map[string]InstanceCache Provider *ec2.EC2 } // stores instances data from cloud provider type InstanceCache struct { lastCheck time.Time } //+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch //+kubebuilder:rbac:groups=core,resources=nodes/finalizers,verbs=update // IsNodeReady returns true if node.Status.Conditions{KubeletReady: True} func isNodeReady(conditions []corev1.NodeCondition) bool { for _, condition := range conditions { if condition.Type == corev1.NodeReady { return condition.Status == corev1.ConditionTrue } } return false } // parse node.ProviderID, return provider, id // eg. AWS: 'aws:///' func parseProviderID(providerID string) (string, string, error) { instanceID := "" provider := "" parsedProvider := strings.Split(providerID, "://") if len(parsedProvider) == 2 { provider = parsedProvider[0] if provider == "aws" { parsedID := strings.Split(strings.Trim(parsedProvider[1], "/"), "/") if len(parsedID) == 2 { instanceID = parsedID[1] } } else { return "", "", errors.New("Unknown provider") } } else { return "", "", errors.New("Cannot parse providerID") } return provider, instanceID, nil } // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) var node corev1.Node if err := r.Get(ctx, req.NamespacedName, &node); err != nil { if apierrors.IsNotFound(err) { logger.Info("Node already deleted.") return ctrl.Result{}, nil } logger.Error(err, "unable to fetch Node") return ctrl.Result{}, err } if isNodeReady(node.Status.Conditions) { logger.V(2).Info("Node ready") return ctrl.Result{}, nil } logger.V(2).Info("Node not ready") instance := r.Instances[node.Name] retryTime := instance.lastCheck.Add(instanceSyncInterval) requeueDelay := instanceSyncInterval if time.Now().After(retryTime) { provider, instanceID, err := parseProviderID(node.Spec.ProviderID) if err != nil { logger.Error(err, "Error parsing ProviderID from node") return ctrl.Result{}, err } logger.V(2).Info("got node.Spec.ProviderID", "provider", provider, "id", instanceID) //TODO. convert provider aws into interface to call getNode exists, err := aws.GetNode(r.Provider, instanceID) if err != nil { logger.Error(err, "Error calling cloud provider") return ctrl.Result{}, err } instance.lastCheck = time.Now() r.Instances[node.Name] = instance // Delete node if exists == false { logger.Info(fmt.Sprintf("Instance %s for node %s does not exist", instanceID, node.Name)) deleteOptions := []client.DeleteOption{client.PropagationPolicy(metav1.DeletePropagationBackground)} if r.DryRun { logger.Info("DryRun enabled") deleteOptions = append(deleteOptions, client.DryRunAll) } if err := r.Delete(ctx, &node, deleteOptions...); client.IgnoreNotFound(err) != nil { logger.Error(err, "unable to delete node") return ctrl.Result{}, err } // Also remove node from cache delete(r.Instances, node.Name) logger.Info("node deleted") return ctrl.Result{}, nil } } else { // Retry if node still exists requeueDelay = retryTime.Sub(time.Now()) } // Requeue at retryTime logger.V(2).Info(fmt.Sprintf("Requeue event after %s", requeueDelay)) return ctrl.Result{RequeueAfter: requeueDelay}, nil } // SetupWithManager sets up the controller with the Manager. // skip delete events, keep create to get all events at startup func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). WithEventFilter(predicate.Funcs{ // CreateFunc: func(e event.CreateEvent) bool { // return false // }, DeleteFunc: func(e event.DeleteEvent) bool { return false }, }). Complete(r) }