mirror of
https://github.com/ngrok/ngrok-operator.git
synced 2026-05-17 16:50:44 +00:00
72cef66086
* use patches and retries to avoid the object has been modified error we see * add retryOnConflict to driver ingress update as well * fix lint
246 lines
9.4 KiB
Go
246 lines
9.4 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/ngrok/ngrok-api-go/v7"
|
|
"github.com/ngrok/ngrok-operator/internal/drain"
|
|
"github.com/ngrok/ngrok-operator/internal/ngrokapi"
|
|
"github.com/ngrok/ngrok-operator/internal/util"
|
|
v1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/client-go/util/retry"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
)
|
|
|
|
// Re-export drain types for convenience so consumers can use controller.DrainState
|
|
type DrainState = drain.State
|
|
|
|
var IsDraining = drain.IsDraining
|
|
|
|
// BaseControllerOp is an enum for the different operations that can be performed by a BaseController
|
|
type BaseControllerOp int
|
|
|
|
const (
|
|
// createOp is the operation for creating a resource
|
|
CreateOp BaseControllerOp = iota
|
|
|
|
// updateOp is the operation for updating a resource (upsert)
|
|
UpdateOp
|
|
|
|
// deleteOp is the operation for deleting a resource (and finalizers)
|
|
DeleteOp
|
|
)
|
|
|
|
// BaseController is our standard pattern for writing controllers
|
|
//
|
|
// Note: Non-provided methods are not called during reconcile
|
|
type BaseController[T client.Object] struct {
|
|
// Kube is the base client for interacting with the Kubernetes API
|
|
Kube client.Client
|
|
|
|
// Log is the logger for the controller
|
|
Log logr.Logger
|
|
|
|
// Recorder is the event recorder for the controller
|
|
Recorder events.EventRecorder
|
|
|
|
// Namespace is optional for controllers
|
|
Namespace *string
|
|
|
|
// DrainState is used to check if the operator is draining.
|
|
// If draining, non-delete reconciles are skipped to prevent new finalizers.
|
|
DrainState DrainState
|
|
|
|
StatusID func(obj T) string
|
|
Create func(ctx context.Context, obj T) error
|
|
Update func(ctx context.Context, obj T) error
|
|
Delete func(ctx context.Context, obj T) error
|
|
ErrResult func(op BaseControllerOp, obj T, err error) (ctrl.Result, error)
|
|
}
|
|
|
|
// reconcile is the primary function that a manager calls for this controller to reconcile an event for the give client.Object
|
|
func (self *BaseController[T]) Reconcile(ctx context.Context, req ctrl.Request, obj T) (ctrl.Result, error) {
|
|
// fill in the obj
|
|
if err := self.Kube.Get(ctx, req.NamespacedName, obj); err != nil {
|
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
// obj is filled in and can now be trusted
|
|
|
|
objFullName := util.ObjToHumanGvkName(obj)
|
|
objName := util.ObjToHumanName(obj)
|
|
|
|
log := self.Log.WithValues("resource", objFullName)
|
|
ctx = ctrl.LoggerInto(ctx, log)
|
|
|
|
log.V(1).Info("Reconciling Resource", "ID", self.StatusID(obj))
|
|
|
|
// Skip non-delete reconciles during drain to prevent adding new finalizers
|
|
if IsDraining(ctx, self.DrainState) && !IsDelete(obj) {
|
|
log.V(1).Info("Draining, skipping non-delete reconcile")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
if IsUpsert(obj) {
|
|
if err := util.RegisterAndSyncFinalizer(ctx, self.Kube, obj); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
if self.StatusID != nil && self.StatusID(obj) == "" {
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Creating", "Create", fmt.Sprintf("Creating %s", objName))
|
|
if err := self.Create(ctx, obj); err != nil {
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeWarning, "CreateError", "Create", fmt.Sprintf("Failed to Create %s: %s", objName, err.Error()))
|
|
return self.handleErr(CreateOp, obj, err)
|
|
}
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Created", "Create", fmt.Sprintf("Created %s", objName))
|
|
} else {
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Updating", "Update", fmt.Sprintf("Updating %s", objName))
|
|
if err := self.Update(ctx, obj); err != nil {
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeWarning, "UpdateError", "Update", fmt.Sprintf("Failed to update %s: %s", objName, err.Error()))
|
|
return self.handleErr(UpdateOp, obj, err)
|
|
}
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Updated", "Update", fmt.Sprintf("Updated %s", objName))
|
|
}
|
|
} else if util.HasFinalizer(obj) {
|
|
if self.StatusID != nil && self.StatusID(obj) != "" {
|
|
sid := self.StatusID(obj)
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Deleting", "Delete", fmt.Sprintf("Deleting %s", objName))
|
|
if err := self.Delete(ctx, obj); err != nil {
|
|
if !ngrok.IsNotFound(err) {
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeWarning, "DeleteError", "Delete", fmt.Sprintf("Failed to delete %s: %s", objName, err.Error()))
|
|
return self.handleErr(DeleteOp, obj, err)
|
|
}
|
|
log.Info(fmt.Sprintf("%s not found, assuming it was already deleted", objFullName), "ID", sid)
|
|
}
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Deleted", "Delete", fmt.Sprintf("Deleted %s", objName))
|
|
}
|
|
|
|
if err := util.RemoveAndSyncFinalizer(ctx, self.Kube, obj); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// NewEnqueueRequestForMapFunc wraps a map function to be used as an event handler.
|
|
// It also takes care to make sure that the controllers logger is passed through to the map function, so
|
|
// that we can use our common pattern of getting the logger from the context.
|
|
func (self *BaseController[T]) NewEnqueueRequestForMapFunc(f func(ctx context.Context, obj client.Object) []reconcile.Request) handler.EventHandler {
|
|
wrappedFunc := func(ctx context.Context, obj client.Object) []reconcile.Request {
|
|
ctx = ctrl.LoggerInto(ctx, self.Log)
|
|
return f(ctx, obj)
|
|
}
|
|
return handler.EnqueueRequestsFromMapFunc(wrappedFunc)
|
|
}
|
|
|
|
// handleErr is a helper function to handle errors in the controller. If an ErrResult function is not provided,
|
|
// it will use the default CtrlResultForErr function.
|
|
func (self *BaseController[T]) handleErr(op BaseControllerOp, obj T, err error) (ctrl.Result, error) {
|
|
if self.ErrResult != nil {
|
|
return self.ErrResult(op, obj, err)
|
|
}
|
|
return CtrlResultForErr(err)
|
|
}
|
|
|
|
// ReconcileStatus reconciles the status of an object, retrying on conflict.
|
|
//
|
|
// Status update conflicts are common because the object's resourceVersion can
|
|
// change between the initial Get() and this call (e.g., from the finalizer Patch
|
|
// earlier in the reconcile, or from an external spec mutation). On conflict, this
|
|
// method re-fetches the latest resourceVersion and retries.
|
|
//
|
|
// This is safe for controllers where BaseController is the sole status writer for
|
|
// the resource (AgentEndpoint, CloudEndpoint, Domain, IPPolicy, etc.). For
|
|
// resources with multiple concurrent status writers (BoundEndpoint, Gateway), the
|
|
// callers manage their own retry/conflict logic and should not use this method.
|
|
func (self *BaseController[T]) ReconcileStatus(ctx context.Context, obj T, origErr error) error {
|
|
log := ctrl.LoggerFrom(ctx).WithValues("originalError", origErr)
|
|
|
|
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
err := self.Kube.Status().Update(ctx, obj)
|
|
if apierrors.IsConflict(err) {
|
|
// Re-fetch the latest resourceVersion for the next attempt.
|
|
// Status().Update() only writes the status subresource, so we only
|
|
// need the current resourceVersion — the spec/metadata are irrelevant.
|
|
latest := obj.DeepCopyObject().(T)
|
|
if getErr := self.Kube.Get(ctx, client.ObjectKeyFromObject(obj), latest); getErr != nil {
|
|
return getErr
|
|
}
|
|
obj.SetResourceVersion(latest.GetResourceVersion())
|
|
}
|
|
return err
|
|
})
|
|
|
|
if retryErr != nil {
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeWarning, "StatusError", "UpdateStatus", fmt.Sprintf("Failed to reconcile status: %s", retryErr.Error()))
|
|
log.V(1).Error(retryErr, "Failed to update status")
|
|
return StatusError{err: origErr, cause: retryErr}
|
|
}
|
|
|
|
self.Recorder.Eventf(obj, nil, v1.EventTypeNormal, "Status", "UpdateStatus", "Successfully reconciled status")
|
|
log.V(1).Info("Successfully updated status")
|
|
return origErr
|
|
}
|
|
|
|
// CtrlResultForErr is a helper function to convert an error into a ctrl.Result passing through ngrok error mappings
|
|
func CtrlResultForErr(err error) (ctrl.Result, error) {
|
|
var nerr *ngrok.Error
|
|
if errors.As(err, &nerr) {
|
|
switch {
|
|
case nerr.StatusCode >= 500:
|
|
return ctrl.Result{}, err
|
|
case nerr.StatusCode == http.StatusTooManyRequests:
|
|
return ctrl.Result{RequeueAfter: time.Minute}, nil
|
|
case nerr.StatusCode == http.StatusNotFound:
|
|
return ctrl.Result{}, err
|
|
case ngrok.IsErrorCode(nerr, ngrokapi.NgrokOpErrFailedToCreateCSR.Code):
|
|
return ctrl.Result{RequeueAfter: 30 * time.Second}, nerr
|
|
case ngrok.IsErrorCode(nerr, ngrokapi.NgrokOpErrFailedToCreateUpstreamService.Code, ngrokapi.NgrokOpErrFailedToCreateTargetService.Code):
|
|
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nerr
|
|
case ngrok.IsErrorCode(nerr, ngrokapi.NgrokOpErrEndpointDenied.Code):
|
|
return ctrl.Result{}, nil // do not retry, endpoint poller will take care of this
|
|
default:
|
|
// the rest are client errors, we don't retry by default
|
|
return ctrl.Result{}, nil
|
|
}
|
|
}
|
|
|
|
// if error was because of status update, requeue for 10 seconds
|
|
var serr StatusError
|
|
if errors.As(err, &serr) {
|
|
return ctrl.Result{RequeueAfter: 10 * time.Second}, serr
|
|
}
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// StatusError wraps .Status().*() errors returned from k8s client.
|
|
// err is the original reconcile error (may be nil if reconcile succeeded but status update failed).
|
|
// cause is the status update error.
|
|
type StatusError struct {
|
|
err error
|
|
cause error
|
|
}
|
|
|
|
func (e StatusError) Error() string {
|
|
if e.err == nil {
|
|
return e.cause.Error()
|
|
}
|
|
return fmt.Sprintf("%s: %s", e.cause, e.err)
|
|
}
|
|
|
|
func (e StatusError) Unwrap() error {
|
|
return e.cause
|
|
}
|