Files
alex bezek 3187082b0e RBAC overhaul (#804)
* docs: add RBAC overhaul design spec and requirements

Captures the motivation, constraints, and design decisions for the RBAC
overhaul before the implementation changes land.

* refactor(rbac): remove kubebuilder RBAC markers and disable controller-gen RBAC output

RBAC is now defined explicitly in Helm templates rather than generated
from code annotations. Removes all +kubebuilder:rbac markers from
controllers and drain.go, and drops the rbac output target from
controller-gen so it no longer clobbers the Helm-managed files.

* refactor(rbac): reorganize operator component RBAC into per-component Helm templates

Replaces the monolithic controller-rbac.yaml and per-component rbac.yaml
files with a consistent per-component directory structure (agent/,
api-manager/, bindings-forwarder/). Each component now owns its own
Role, RoleBinding, and optional namespace-scoped variants.

Key changes:
- agent: split rbac.yaml into role.yaml + rolebinding.yaml with
  optional namespaced variants for namespace-scoped installs
- api-manager: moved from templates/rbac/role.yaml into dedicated
  api-manager/ directory alongside its other templates; adds
  leader-election-role.yaml and namespaced role support
- bindings-forwarder: renamed rbac.yaml -> role.yaml for consistency
- Deleted controller-rbac.yaml (replaced by api-manager/role.yaml)
- Renamed controller-{cm,deployment,pdb,serviceaccount}.yaml into
  api-manager/ directory for cohesion
- Renamed service-account.yaml -> serviceaccount.yaml everywhere
- values.yaml/schema: adds crdAccessRoles and per-component RBAC flags

* feat(rbac): add CRD editor/viewer ClusterRoles for ngrok resources

Moves existing editor/viewer roles into a dedicated rbac/crd-access/
subdirectory with consistent naming, and adds new roles for
NgrokTrafficPolicy (previously missing).

These ClusterRoles are for users of the operator — granting cluster
members read or write access to ngrok CRDs — as opposed to the
operator's own service account permissions.

* test(rbac): update Helm unit tests and add chainsaw e2e RBAC verification

Updates all Helm unit tests and snapshots to match the reorganized
template structure (per-component directories, renamed files). Adds
new test suites for api-manager RBAC and crd-access roles.

Also adds a chainsaw e2e test that verifies the operator's service
accounts have exactly the permissions they need — no more, no less.

* chore: update generated artifacts after RBAC overhaul

Regenerates manifest-bundle.yaml and updates the Helm README to
reflect the new values added for per-component RBAC configuration.

* remove plan and chainsaw tests and make bindings not try to use watchNamespace

* break out k8soperator permissions and bindings permissions to separate role

* update requirements and gen manifest bundle

* make agent and api manager only query their release namespace when looking for the kubernetesoperator crd

* make bindings role always be created even if bindings is disabled
2026-05-06 18:42:34 +00:00

942 lines
31 KiB
Go

/*
MIT License
Copyright (c) 2024 ngrok, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package service
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"strconv"
"time"
"github.com/go-logr/logr"
"github.com/ngrok/ngrok-api-go/v7"
common "github.com/ngrok/ngrok-operator/api/common/v1alpha1"
ingressv1alpha1 "github.com/ngrok/ngrok-operator/api/ingress/v1alpha1"
ngrokv1alpha1 "github.com/ngrok/ngrok-operator/api/ngrok/v1alpha1"
"github.com/ngrok/ngrok-operator/internal/annotations"
"github.com/ngrok/ngrok-operator/internal/controller"
"github.com/ngrok/ngrok-operator/internal/controller/labels"
"github.com/ngrok/ngrok-operator/internal/errors"
"github.com/ngrok/ngrok-operator/internal/ir"
"github.com/ngrok/ngrok-operator/internal/ngrokapi"
"github.com/ngrok/ngrok-operator/internal/resolvers"
"github.com/ngrok/ngrok-operator/internal/trafficpolicy"
"github.com/ngrok/ngrok-operator/internal/util"
"github.com/ngrok/ngrok-operator/pkg/managerdriver"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/events"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
OwnerReferencePath = "metadata.ownerReferences.uid"
ModuleSetPath = "metadata.annotations.k8s.ngrok.com/module-set"
TrafficPolicyPath = "metadata.annotations.k8s.ngrok.com/traffic-policy"
NgrokLoadBalancerClass = "ngrok"
)
var (
coreGVStr = corev1.SchemeGroupVersion.String()
)
type ServiceReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder events.EventRecorder
ControllerLabels labels.ControllerLabelValues
ClusterDomain string
IPPolicyResolver resolvers.IPPolicyResolver
SecretResolver resolvers.SecretResolver
TCPAddresses ngrokapi.TCPAddressesClient
}
type ShouldHandleServicePredicate = TypedShouldHandleServicePredicate[client.Object]
type TypedShouldHandleServicePredicate[object client.Object] struct {
predicate.TypedFuncs[object]
}
func (p TypedShouldHandleServicePredicate[object]) Create(e event.CreateEvent) bool {
svc, ok := e.Object.(*corev1.Service)
if !ok {
return false
}
return shouldHandleService(svc)
}
func (p TypedShouldHandleServicePredicate[object]) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
oldSvc, ok1 := e.ObjectOld.(*corev1.Service)
newSvc, ok2 := e.ObjectNew.(*corev1.Service)
if !ok1 || !ok2 {
return false
}
// We need to reconcile if either the old or new service should be handled
return shouldHandleService(oldSvc) || shouldHandleService(newSvc)
}
func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := labels.ValidateControllerLabelValues(r.ControllerLabels); err != nil {
return err
}
if r.ClusterDomain == "" {
r.ClusterDomain = common.DefaultClusterDomain
}
if r.IPPolicyResolver == nil {
r.IPPolicyResolver = resolvers.NewDefaultIPPolicyResolver(mgr.GetClient())
}
if r.SecretResolver == nil {
r.SecretResolver = resolvers.NewDefaultSecretResovler(mgr.GetClient())
}
if r.TCPAddresses == nil {
return errors.New("TCPAddresses client is required")
}
owns := []client.Object{
&ngrokv1alpha1.AgentEndpoint{},
&ngrokv1alpha1.CloudEndpoint{},
}
controller := ctrl.NewControllerManagedBy(mgr).
For(&corev1.Service{}, builder.WithPredicates(
predicate.And(
ShouldHandleServicePredicate{},
predicate.ResourceVersionChangedPredicate{},
),
)).
// Watch traffic policies for changes
Watches(
&ngrokv1alpha1.NgrokTrafficPolicy{},
handler.EnqueueRequestsFromMapFunc(r.findServicesForTrafficPolicy),
)
// Index the subresources by their owner references
for _, o := range owns {
controller = controller.Owns(o, builder.WithPredicates(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
// Watch for status changes (e.g., when CloudEndpoint gets its domainRef updated)
predicate.ResourceVersionChangedPredicate{},
),
))
err := mgr.GetFieldIndexer().IndexField(context.Background(), o, OwnerReferencePath, func(obj client.Object) []string {
owner := metav1.GetControllerOf(obj)
if owner == nil {
return nil
}
if owner.APIVersion != coreGVStr || owner.Kind != "Service" {
return nil
}
return []string{string(owner.UID)}
})
if err != nil {
return err
}
}
// Index the services by the traffic policy they reference
err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, TrafficPolicyPath, func(obj client.Object) []string {
policy, err := annotations.ExtractNgrokTrafficPolicyFromAnnotations(obj)
if err != nil {
return nil
}
return []string{policy}
})
if err != nil {
return err
}
return controller.Complete(r)
}
// This reconcile function is called by the controller-runtime manager.
// It is invoked whenever there is an event that occurs for a resource
// being watched (in our case, service objects). If you tail the controller
// logs and delete, update, edit service objects, you see the events come in.
func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx).WithValues("service", req.NamespacedName)
ctx = ctrl.LoggerInto(ctx, log)
svc := &corev1.Service{}
if err := r.Client.Get(ctx, req.NamespacedName, svc); err != nil {
log.Error(err, "unable to fetch service")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
subResourceReconcilers := serviceSubresourceReconcilers{
newServiceCloudEndpointReconciler(),
newServiceAgentEndpointReconciler(),
}
ownedResources, err := subResourceReconcilers.GetOwnedResources(ctx, r.Client, svc)
if err != nil {
log.Error(err, "Failed to get owned resources")
return ctrl.Result{}, err
}
// If the service is being deleted, we need to clean up any resources that are owned by it
if controller.IsDelete(svc) {
if err := subResourceReconcilers.Reconcile(ctx, r.Client, nil); err != nil {
log.Error(err, "Failed to cleanup owned resources")
return ctrl.Result{}, err
}
// re-fetch owned resources after cleanup
ownedResources, err = subResourceReconcilers.GetOwnedResources(ctx, r.Client, svc)
if err != nil {
log.Error(err, "Failed to get owned resources")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
if len(ownedResources) > 0 {
log.Info("Service still owns ngrok resources, waiting for deletion...")
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
log.Info("Removing and syncing finalizer")
if err := util.RemoveAndSyncFinalizer(ctx, r.Client, svc); err != nil {
log.Error(err, "Failed to remove finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if !shouldHandleService(svc) {
if len(ownedResources) > 0 {
log.Info("Service is not of type LoadBalancer, performing cleanup...")
// We need to check if the service is being changed from a LoadBalancer to something else.
// If it is, we need to clean up any resources that are using it.
err = subResourceReconcilers.Reconcile(ctx, r.Client, nil)
if err != nil {
log.Error(err, "Failed to cleanup owned resources")
return ctrl.Result{}, err
}
}
// Once we clean up the Cloud/Agent Endpoints, we can remove the finalizer if it exists. We don't
// care about registering a finalizer since we only care about load balancer services
if err := util.RemoveAndSyncFinalizer(ctx, r.Client, svc); err != nil {
log.Error(err, "Failed to remove finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if len(svc.Spec.Ports) < 1 {
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "NoPorts", "Reconcile", "Unable to handle service with no ports")
return ctrl.Result{}, nil
}
log.Info("Registering and syncing finalizers")
if err := util.RegisterAndSyncFinalizer(ctx, r.Client, svc); err != nil {
log.Error(err, "Failed to register finalizer")
return ctrl.Result{}, err
}
var desired []client.Object
mappingStrategy, err := managerdriver.MappingStrategyAnnotationToIR(svc)
// If the annotation is not valid, we still return a reasonable default mapping strategy. This error
// is not fatal, so just log it and an event and continue
if err != nil {
log.Error(err, "Failed to get mapping strategy annotation")
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "FailedToGetMappingStrategy", "Reconcile", err.Error())
}
desired, err = r.buildEndpoints(ctx, svc, mappingStrategy)
if err != nil {
log.Error(err, "Failed to build desired endpoints")
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "FailedToBuildEndpoints", "Reconcile", err.Error())
return ctrl.Result{}, err
}
if err := subResourceReconcilers.Reconcile(ctx, r.Client, desired); err != nil {
log.Error(err, "Failed to reconcile owned resources")
return ctrl.Result{}, err
}
// Refetch owned resources after reconciliation and update the service's status
ownedResources, err = subResourceReconcilers.GetOwnedResources(ctx, r.Client, svc)
if err != nil {
log.Error(err, "Failed to get owned resources")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// Determine which object to use for updating the service status based on mapping strategy
statusObject, err := r.getObjectForStatusUpdate(mappingStrategy, ownedResources)
if err != nil {
log.Error(err, "Failed to determine object for status update")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
if err := subResourceReconcilers.UpdateServiceStatus(ctx, r.Client, svc, statusObject); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update service status: %w", err)
}
r.Recorder.Eventf(svc, nil, corev1.EventTypeNormal, "Reconciled", "Reconcile", "Successfully reconciled service and its ngrok resources")
return ctrl.Result{}, nil
}
func (r *ServiceReconciler) findServicesForTrafficPolicy(ctx context.Context, policy client.Object) []reconcile.Request {
log := r.Log
policyNamespace := policy.GetNamespace()
policyName := policy.GetName()
log.V(3).Info("Finding services for traffic policy", "namespace", policyNamespace, "name", policyName)
services := &corev1.ServiceList{}
listOpts := &client.ListOptions{
Namespace: policyNamespace,
FieldSelector: fields.OneTermEqualSelector(TrafficPolicyPath, policyName),
}
err := r.Client.List(ctx, services, listOpts)
if err != nil {
log.Error(err, "Failed to list services for traffic policy")
return []reconcile.Request{}
}
requests := make([]reconcile.Request, len(services.Items))
for i, svc := range services.Items {
svcNamespace := svc.GetNamespace()
svcName := svc.GetName()
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: svcNamespace,
Name: svcName,
},
}
log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName)
}
return requests
}
func (r *ServiceReconciler) clearComputedURLAnnotation(ctx context.Context, svc *corev1.Service) error {
a := svc.GetAnnotations()
delete(a, annotations.ComputedURLAnnotation)
svc.SetAnnotations(a)
return r.Client.Update(ctx, svc)
}
func (r *ServiceReconciler) setComputedURLAnnotation(ctx context.Context, svc *corev1.Service, computedURL string) error {
a := svc.GetAnnotations()
if a == nil {
a = make(map[string]string)
}
// Only update if the value has changed
if a[annotations.ComputedURLAnnotation] == computedURL {
return nil
}
a[annotations.ComputedURLAnnotation] = computedURL
svc.SetAnnotations(a)
return r.Client.Update(ctx, svc)
}
func (r *ServiceReconciler) tcpAddressIsReserved(ctx context.Context, hostport string) (bool, error) {
iter := r.TCPAddresses.List(&ngrok.Paging{})
for iter.Next(ctx) {
addr := iter.Item()
if addr.Addr == hostport {
return true, nil
}
}
return false, iter.Err()
}
// buildEndpoints creates a CloudEndpoint and an AgentEndpoint for the given LoadBalancer service. The CloudEndpoint
// will serve as the public endpoint for the service where we attach the traffic policy if one exists, the AgentEndpoint will
// serve as the internal endpoint.
func (r *ServiceReconciler) buildEndpoints(ctx context.Context, svc *corev1.Service, mappingStrategy ir.IRMappingStrategy) ([]client.Object, error) {
log := ctrl.LoggerFrom(ctx)
port := svc.Spec.Ports[0].Port
objects := make([]client.Object, 0)
// Get whether endpoint pooling should be enabled/disabled from annotations
useEndpointPooling, err := annotations.ExtractUseEndpointPooling(svc)
if err != nil {
log.Error(err, "failed to check endpoints-enabled annotation for service",
"service", fmt.Sprintf("%s.%s", svc.Name, svc.Namespace),
)
return objects, err
}
useBindings, err := annotations.ExtractUseBindings(svc)
if err != nil {
log.Error(err, "failed to get bindings annotation for service")
return objects, err
}
// The final traffic policy that will be applied to the listener endpoint
tp := trafficpolicy.NewTrafficPolicy()
// If an explicit traffic policy is defined on the service, merge it with the existing traffic policy
// before adding the forward-internal action.
// TODO: We still need to handle legacy traffic policy conversion
policy, err := getNgrokTrafficPolicyForService(ctx, r.Client, svc)
if err != nil {
log.Error(err, "Failed to get traffic policy")
return objects, err
}
if policy != nil {
explicitTP, err := trafficpolicy.NewTrafficPolicyFromJSON(policy.Spec.Policy)
if err != nil {
return objects, err
}
tp.Merge(explicitTP)
}
rawPolicy, err := json.Marshal(tp)
if err != nil {
return objects, err
}
listenerEndpointURL, err := r.getListenerURL(svc)
if err != nil {
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "FailedToGetListenerURL", "Reconcile", err.Error())
return objects, err
}
var computedEndpointURL string
if listenerEndpointURL == "tcp://" {
// The user has either not set a 'url' or 'domain' annotation, and desires a TCP endpoint.
// First, try to get the computed URL annotation. See the comment in the annotations package
// for more information on why we are temporarily doing this.
computedEndpointURL, err = annotations.ExtractComputedURL(svc)
if err != nil {
if !errors.IsMissingAnnotations(err) {
return objects, err
}
// We need to reserve a TCP address & update the service with the computed URL
addr, err := r.TCPAddresses.Create(ctx, &ngrok.ReservedAddrCreate{
Description: fmt.Sprintf("Reserved for %s/%s", svc.Namespace, svc.Name),
Metadata: fmt.Sprintf(`{"namespace":"%s","name":"%s"}`, svc.Namespace, svc.Name),
})
if err != nil {
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "FailedToReserveTCPAddr", "Reconcile", err.Error())
return objects, err
}
// Update the service with the computed URL
computedEndpointURL = fmt.Sprintf("tcp://%s", addr.Addr)
if err := r.setComputedURLAnnotation(ctx, svc, computedEndpointURL); err != nil {
return objects, err
}
} else {
// We have a computed URL, most likely from the url or domain not being set,
// implying the user wants a TCP address to be reserved. Let's use that, after
// verifying that it exists.
parsedURL, parseErr := url.Parse(computedEndpointURL)
if parseErr != nil {
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "FailedToParseComputedURL", "Reconcile", parseErr.Error())
// If we can't parse the URL, we need to clear the computed URL annotation
if err := r.clearComputedURLAnnotation(ctx, svc); err != nil {
return objects, err
}
return objects, parseErr
}
if parsedURL.Scheme == "tcp" {
// Check that the Address is still reserved
reserved, err := r.tcpAddressIsReserved(ctx, parsedURL.Host)
if err != nil {
return objects, err
}
if !reserved {
r.Recorder.Eventf(svc, nil, corev1.EventTypeWarning, "TCPAddrNotReserved", "Reconcile", "The computed TCP address is not reserved, recomputing")
if err := r.clearComputedURLAnnotation(ctx, svc); err != nil {
return objects, err
}
}
} else {
// The computed URL is not a TCP URL, so we also need to clear it
if err := r.clearComputedURLAnnotation(ctx, svc); err != nil {
return objects, err
}
}
}
} else {
// For non-TCP endpoints (e.g., TLS), set the computed URL to the listener URL
// so that updateStatus can use it as the single source of truth
if err := r.setComputedURLAnnotation(ctx, svc, listenerEndpointURL); err != nil {
return objects, err
}
computedEndpointURL = listenerEndpointURL
}
switch mappingStrategy {
// For the default/collapse strategy, make a single AgentEndpoint
case ir.IRMappingStrategy_EndpointsCollapsed:
agentEndpoint := &ngrokv1alpha1.AgentEndpoint{
ObjectMeta: metav1.ObjectMeta{
GenerateName: svc.Name + "-",
Namespace: svc.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(svc, corev1.SchemeGroupVersion.WithKind("Service")),
},
Labels: r.ControllerLabels.Labels(),
},
Spec: ngrokv1alpha1.AgentEndpointSpec{
URL: computedEndpointURL,
Bindings: useBindings,
Upstream: ngrokv1alpha1.EndpointUpstream{
URL: fmt.Sprintf("tcp://%s.%s.%s:%d", svc.Name, svc.Namespace, r.ClusterDomain, port),
},
TrafficPolicy: &ngrokv1alpha1.TrafficPolicyCfg{
Inline: rawPolicy,
},
},
}
objects = append(objects, agentEndpoint)
// For the verbose strategy, make a CloudEndpoint that routes to an AgentEndpoint
case ir.IRMappingStrategy_EndpointsVerbose:
internalURL := fmt.Sprintf("tcp://%s.%s.%s.internal:%d", svc.UID, svc.Name, svc.Namespace, port)
tp.AddRuleOnTCPConnect(trafficpolicy.Rule{
Actions: []trafficpolicy.Action{
trafficpolicy.NewForwardInternalAction(internalURL),
},
})
// We've added a new rule to the traffic policy, so we need to re-marshall it
rawPolicy, err = json.Marshal(tp)
if err != nil {
return objects, err
}
cloudEndpoint := &ngrokv1alpha1.CloudEndpoint{
ObjectMeta: metav1.ObjectMeta{
GenerateName: svc.Name + "-",
Namespace: svc.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(svc, corev1.SchemeGroupVersion.WithKind("Service")),
},
Labels: r.ControllerLabels.Labels(),
},
Spec: ngrokv1alpha1.CloudEndpointSpec{
URL: computedEndpointURL,
Bindings: useBindings,
PoolingEnabled: useEndpointPooling,
TrafficPolicy: &ngrokv1alpha1.NgrokTrafficPolicySpec{
Policy: rawPolicy,
},
},
}
objects = append(objects, cloudEndpoint)
agentEndpoint := &ngrokv1alpha1.AgentEndpoint{
ObjectMeta: metav1.ObjectMeta{
GenerateName: svc.Name + "-internal-",
Namespace: svc.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(svc, corev1.SchemeGroupVersion.WithKind("Service")),
},
Labels: r.ControllerLabels.Labels(),
},
Spec: ngrokv1alpha1.AgentEndpointSpec{
URL: internalURL,
Upstream: ngrokv1alpha1.EndpointUpstream{
URL: fmt.Sprintf("tcp://%s.%s.%s:%d", svc.Name, svc.Namespace, r.ClusterDomain, port),
},
},
}
objects = append(objects, agentEndpoint)
}
return objects, nil
}
func (r *ServiceReconciler) getListenerURL(svc *corev1.Service) (string, error) {
urlAnnotation, err := annotations.ExtractURL(svc)
if err == nil {
return urlAnnotation, nil
}
if !errors.IsMissingAnnotations(err) {
return "", err
}
// No URL annotation, assume TCP as the default
return "tcp://", nil
}
func (r *ServiceReconciler) getObjectForStatusUpdate(mappingStrategy ir.IRMappingStrategy, ownedResources []client.Object) (client.Object, error) {
switch mappingStrategy {
case ir.IRMappingStrategy_EndpointsCollapsed:
// We should only have 1 owned resource (the AgentEndpoint)
if len(ownedResources) != 1 {
return nil, fmt.Errorf("expected 1 owned resource, got %d", len(ownedResources))
}
return ownedResources[0], nil
case ir.IRMappingStrategy_EndpointsVerbose:
// We should have 2 owned resources (the CloudEndpoint and AgentEndpoint)
if len(ownedResources) != 2 {
return nil, fmt.Errorf("expected 2 owned resources, got %d", len(ownedResources))
}
for _, owned := range ownedResources {
if _, ok := owned.(*ngrokv1alpha1.CloudEndpoint); ok {
return owned, nil
}
}
return nil, errors.New("could not find CloudEndpoint among owned resources")
default:
return nil, fmt.Errorf("unknown mapping strategy: %s", mappingStrategy)
}
}
func shouldHandleService(svc *corev1.Service) bool {
return svc.Spec.Type == corev1.ServiceTypeLoadBalancer &&
ptr.Deref(svc.Spec.LoadBalancerClass, "") == NgrokLoadBalancerClass
}
type serviceSubresourceReconciler interface {
GetOwnedResources(context.Context, client.Client, *corev1.Service) ([]client.Object, error)
Reconcile(context.Context, client.Client, []client.Object) error
UpdateServiceStatus(context.Context, client.Client, *corev1.Service, client.Object) error
}
type serviceSubresourceReconcilers []serviceSubresourceReconciler
func (r serviceSubresourceReconcilers) GetOwnedResources(ctx context.Context, c client.Client, svc *corev1.Service) ([]client.Object, error) {
resources := make([]client.Object, 0)
for _, srr := range r {
owned, err := srr.GetOwnedResources(ctx, c, svc)
if err != nil {
return resources, err
}
resources = append(resources, owned...)
}
return resources, nil
}
func (r serviceSubresourceReconcilers) Reconcile(ctx context.Context, c client.Client, objects []client.Object) error {
g, gctx := errgroup.WithContext(ctx)
for _, srr := range r {
g.Go(func() error {
return srr.Reconcile(gctx, c, objects)
})
}
return g.Wait()
}
func (r serviceSubresourceReconcilers) UpdateServiceStatus(ctx context.Context, c client.Client, svc *corev1.Service, o client.Object) error {
g, gctx := errgroup.WithContext(ctx)
for _, srr := range r {
g.Go(func() error {
return srr.UpdateServiceStatus(gctx, c, svc, o)
})
}
return g.Wait()
}
type baseSubresourceReconciler[T any, PT interface {
*T
client.Object
}] struct {
owned []PT
listOwned func(context.Context, client.Client, ...client.ListOption) ([]T, error)
matches func(T, T) bool
mergeExisting func(T, PT)
updateStatus func(context.Context, client.Client, *corev1.Service, PT) error
}
func (r *baseSubresourceReconciler[T, PT]) GetOwnedResources(ctx context.Context, c client.Client, svc *corev1.Service) ([]client.Object, error) {
opts := []client.ListOption{
client.InNamespace(svc.Namespace),
client.MatchingFields{OwnerReferencePath: string(svc.UID)},
}
owned, err := r.listOwned(ctx, c, opts...)
if err != nil {
return nil, err
}
ptrs := make([]PT, len(owned))
objects := make([]client.Object, len(owned))
for i, o := range owned {
var p PT = &o
ptrs[i] = p
objects[i] = p
}
r.owned = ptrs
return objects, nil
}
func (r *baseSubresourceReconciler[T, PT]) Reconcile(ctx context.Context, c client.Client, objects []client.Object) error {
log := ctrl.LoggerFrom(ctx).WithValues("subresource", fmt.Sprintf("%T", *new(T)))
// Filter out objects that are not of the desired type for this reconciler
desired := make([]PT, 0)
for _, o := range objects {
if v, ok := o.(PT); ok {
desired = append(desired, v)
} else {
log.V(9).Info("skipping object", "name", o.GetName(), "kind", o.GetObjectKind().GroupVersionKind().Kind)
}
}
log.V(9).Info("Filtered objects", "desired", desired, "owned", r.owned)
// No desired resources, delete all owned resources if any
if len(desired) == 0 {
if len(r.owned) > 0 {
log.V(1).Info("Deleting owned resources")
for _, e := range r.owned {
if err := c.Delete(ctx, e); err != nil {
return err
}
}
}
return nil
}
// We only support one desired resource of a particular type for now
// If there are cases where we need to create multiple cloud or agent endpoints, we will need to change this handling
if len(desired) > 1 {
return errors.New("multiple desired resources not supported")
}
d := desired[0]
// We have a single desired resource and an existing resource, make them match
if len(r.owned) == 1 {
var e = r.owned[0]
log.Info(fmt.Sprintf("Updating %T", e), "desired", d, "existing", e)
// Fetch the existing resource as it may have been updated
if err := c.Get(ctx, client.ObjectKeyFromObject(e), e); err != nil {
return err
}
if r.matches(*d, *e) {
log.V(5).Info(fmt.Sprintf("%T matches desired state, no update needed", e))
return nil
}
r.mergeExisting(*d, e)
// Update the resource
if err := c.Update(ctx, e); err != nil {
log.Error(err, fmt.Sprintf("Failed to update %T", e))
return err
}
return nil
}
// If by this point we have more than one owned resource, something is wrong.
// Delete the owned resources.
if len(r.owned) > 1 {
log.Info(fmt.Sprintf("Found multiple %T resources owned by the service, deleting before creating", d))
for _, e := range r.owned {
if err := c.Delete(ctx, e); err != nil {
return err
}
}
}
log.Info(fmt.Sprintf("Creating %T", d))
return c.Create(ctx, d)
}
func (r *baseSubresourceReconciler[T, PT]) UpdateServiceStatus(ctx context.Context, c client.Client, svc *corev1.Service, o client.Object) error {
v, ok := o.(PT)
if !ok {
return nil
}
return r.updateStatus(ctx, c, svc, v)
}
func newServiceCloudEndpointReconciler() serviceSubresourceReconciler {
return &baseSubresourceReconciler[ngrokv1alpha1.CloudEndpoint, *ngrokv1alpha1.CloudEndpoint]{
listOwned: func(ctx context.Context, c client.Client, opts ...client.ListOption) ([]ngrokv1alpha1.CloudEndpoint, error) {
endpoints := &ngrokv1alpha1.CloudEndpointList{}
if err := c.List(ctx, endpoints, opts...); err != nil {
return nil, err
}
return endpoints.Items, nil
},
matches: func(desired, existing ngrokv1alpha1.CloudEndpoint) bool {
return reflect.DeepEqual(existing.Spec, desired.Spec) &&
reflect.DeepEqual(existing.Labels, desired.Labels)
},
mergeExisting: func(desired ngrokv1alpha1.CloudEndpoint, existing *ngrokv1alpha1.CloudEndpoint) {
existing.Spec = desired.Spec
existing.Labels = desired.Labels
},
updateStatus: func(ctx context.Context, c client.Client, svc *corev1.Service, endpoint *ngrokv1alpha1.CloudEndpoint) error {
return updateStatus(ctx, c, svc, endpoint)
},
}
}
func newServiceAgentEndpointReconciler() serviceSubresourceReconciler {
return &baseSubresourceReconciler[ngrokv1alpha1.AgentEndpoint, *ngrokv1alpha1.AgentEndpoint]{
listOwned: func(ctx context.Context, c client.Client, opts ...client.ListOption) ([]ngrokv1alpha1.AgentEndpoint, error) {
endpoints := &ngrokv1alpha1.AgentEndpointList{}
if err := c.List(ctx, endpoints, opts...); err != nil {
return nil, err
}
return endpoints.Items, nil
},
matches: func(desired, existing ngrokv1alpha1.AgentEndpoint) bool {
return reflect.DeepEqual(existing.Spec, desired.Spec) &&
reflect.DeepEqual(existing.Labels, desired.Labels)
},
mergeExisting: func(desired ngrokv1alpha1.AgentEndpoint, existing *ngrokv1alpha1.AgentEndpoint) {
existing.Spec = desired.Spec
existing.Labels = desired.Labels
},
updateStatus: func(ctx context.Context, c client.Client, svc *corev1.Service, endpoint *ngrokv1alpha1.AgentEndpoint) error {
return updateStatus(ctx, c, svc, endpoint)
},
}
}
func getNgrokTrafficPolicyForService(ctx context.Context, c client.Client, svc *corev1.Service) (*ngrokv1alpha1.NgrokTrafficPolicy, error) {
policyName, err := annotations.ExtractNgrokTrafficPolicyFromAnnotations(svc)
if err != nil {
if errors.IsMissingAnnotations(err) {
return nil, nil
}
return nil, err
}
policy := &ngrokv1alpha1.NgrokTrafficPolicy{}
err = c.Get(ctx, client.ObjectKey{Namespace: svc.Namespace, Name: policyName}, policy)
return policy, err
}
func updateStatus(ctx context.Context, c client.Client, svc *corev1.Service, endpoint ngrokv1alpha1.EndpointWithDomain) error {
clearIngressStatus := func(svc *corev1.Service) error {
if len(svc.Status.LoadBalancer.Ingress) == 0 {
return nil
}
svc.Status.LoadBalancer.Ingress = nil
return c.Status().Update(ctx, svc)
}
hostname := ""
port := int32(443)
// Check if the computed URL is set, if so, let's parse and use it
computedURL, err := annotations.ExtractComputedURL(svc)
switch {
case err == nil:
// Let's parse out the host and port
targetURL, err := url.Parse(computedURL)
if err != nil {
return err
}
hostname = targetURL.Hostname()
if p := targetURL.Port(); p != "" {
x, err := strconv.ParseInt(p, 10, 32)
if err != nil {
return err
}
port = int32(x)
}
// For TLS endpoints, we need to wait for the domainRef to be set on the endpoint
// so we can look up the CNAME target from the Domain CRD. This applies to both
// ngrok domains (*.ngrok.app) and custom domains - all TLS endpoints get a domainRef.
// TCP endpoints don't have domains, so they can use the hostname directly.
if targetURL.Scheme == "tls" {
dr := endpoint.GetDomainRef()
if dr == nil {
// domainRef not yet set by the CloudEndpoint/AgentEndpoint controller.
// Clear the status and wait for it to be populated.
return clearIngressStatus(svc)
}
domain := &ingressv1alpha1.Domain{}
if err := c.Get(ctx, dr.ToClientObjectKey(svc.Namespace), domain); err != nil {
// If we can't fetch the domain, we can't determine the CNAME target.
// Clear the status until the domain is available.
return clearIngressStatus(svc)
}
// Use CNAME target if available (for custom domains), otherwise use the domain itself
if domain.Status.CNAMETarget != nil && *domain.Status.CNAMETarget != "" {
hostname = *domain.Status.CNAMETarget
}
}
case !errors.IsMissingAnnotations(err): // Some other error
return err
default: // computedURL not present, clear status until endpoint is ready
return clearIngressStatus(svc)
}
newIngressStatus := []corev1.LoadBalancerIngress{
{
Hostname: hostname,
Ports: []corev1.PortStatus{
{
Port: port,
Protocol: corev1.ProtocolTCP,
},
},
},
}
// If the status is already set correctly, do nothing
if reflect.DeepEqual(svc.Status.LoadBalancer.Ingress, newIngressStatus) {
return nil
}
// Update the service status
svc.Status.LoadBalancer.Ingress = newIngressStatus
return c.Status().Update(ctx, svc)
}