This commit is contained in:
inconshreveable
2026-05-02 04:38:44 +00:00
parent 37aceab209
commit af7ec566f7
7 changed files with 149 additions and 91 deletions
+1
View File
@@ -23,6 +23,7 @@ RUN --mount=type=cache,target=/go \
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
COPY certs /etc/ssl/certs/ngrok
COPY certs/root.crt.pem /etc/ssl/certs/ngrok-root.crt.pem
WORKDIR /
COPY --from=builder /workspace/bin/ngrok-operator ./
USER 65532:65532
-23
View File
@@ -1,23 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDwjCCAqqgAwIBAgIUZqF2AkB17pISojTndgc2U5BDt7wwDQYJKoZIhvcNAQEL
BQAwbzEQMA4GA1UEAwwHUm9vdCBDQTENMAsGA1UECwwEcHJvZDESMBAGA1UECgwJ
bmdyb2sgSW5jMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIDApDYWxp
Zm9ybmlhMQswCQYDVQQGEwJVUzAeFw0yMjA4MzExNDU5NDhaFw0zMjA4MjgxNDU5
NDhaMF8xCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRIwEAYDVQQK
DAluZ3JvayBJbmMxDTALBgNVBAsMBHByb2QxGDAWBgNVBAMMD0ludGVybWVkaWF0
ZSBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAK+t8q9Ost9BxCWX
fyGG0mVQOpIiyrzzZWyqT6CZpMY2fpOadLuZeBP7ti2Iw4FgCpfLntL0RldvMMNY
4qq61dVrCwhL/v2ldsaHUdzjtFj1i+ZNGUtV4E9korHxm2YdsD91w6WIjF/J0lvo
X2koLwFlGc/CkhT8z2VWebY8a6mYNyz5S7yPTQh2/mQ14lx/QPJgZSFEE/EEkMDC
bs4BoMuqKMhCpqEP8m4+CxPQ5/V6POSqUIxT4A7eWWj2MRpnmirmVbXOc24Aznqk
bdQUP4qagiR/i7qPsRx+f4mFfDninPsXp/djjByo0xzdh+i1HFyOR/7nyNDKlJ+e
rymRgnUCAwEAAaNmMGQwHQYDVR0OBBYEFJ47nRzHaOT+vY44N3TCMYtGlBjIMB8G
A1UdIwQYMBaAFNxeUxPIM8G7cX0DhFc81pLD4W+HMBIGA1UdEwEB/wQIMAYBAf8C
AQAwDgYDVR0PAQH/BAQDAgGGMA0GCSqGSIb3DQEBCwUAA4IBAQBRmnMoOtQbYL7P
Co1B5Chslb86HP2WI1jGRXhbfwAF2ySDFnX2ZbRPVtoQ+IuqXWxyXAeicYjXR6kz
xX8hLWfD14kWUIz6ZgT3uZrDSIzmQ+tz8ztbT6mTI1ECWdjLV/i58f6vKzgLD8Vp
3VdVns8NA9ee6a65QNjZEnwBVeccysoWkOwM/KzuazhSGcGu44y/S4ny9pAg7Pol
2kV4NicDKD6tSAdXmPmjFalYUfnMmyhurZIPrS2dgYgpOrGVMwronTOZ3BUf4DL4
zkkmcLXss1KztQnLd23nuNiIscwMcGM58a3O5zUp7aorfrm7cdRgkFmcYVNO/6uG
Q5iJ+Ppk
-----END CERTIFICATE-----
+31 -8
View File
@@ -18,6 +18,7 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
@@ -101,6 +102,7 @@ type apiManagerOpts struct {
ingressControllerName string
ingressWatchNamespace string
ngrokMetadata string
computeMetadata string
description string
managerName string
zapOpts *zap.Options
@@ -152,6 +154,7 @@ func apiCmd() *cobra.Command {
c.Flags().StringVar(&opts.probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
c.Flags().StringVar(&opts.electionID, "election-id", "ngrok-operator-leader", "The name of the configmap that is used for holding the leader lock")
c.Flags().StringVar(&opts.ngrokMetadata, "ngrokMetadata", "", "A comma separated list of key=value pairs such as 'key1=value1,key2=value2' to be added to ngrok api resources as labels")
c.Flags().StringVar(&opts.computeMetadata, "compute-metadata", "", "JSON compute metadata block to include in the KubernetesOperator API resource metadata")
c.Flags().StringVar(&opts.description, "description", "Created by the ngrok-operator", "Description for this installation")
c.Flags().StringVar(&opts.region, "region", "", "The region to use for ngrok tunnels")
c.Flags().StringVar(&opts.serverAddr, "server-addr", "", "The address of the ngrok server to use for tunnels")
@@ -333,7 +336,7 @@ func runNormalMode(ctx context.Context, opts apiManagerOpts, k8sClient client.Cl
return err
}
ngrokClientset, err := loadNgrokClientset(ctx, opts)
ngrokClientset, ngrokClientConfig, err := loadNgrokClientset(ctx, opts)
if err != nil {
return fmt.Errorf("Unable to load ngrokClientSet: %w", err)
}
@@ -419,12 +422,14 @@ func runNormalMode(ctx context.Context, opts apiManagerOpts, k8sClient client.Cl
os.Exit(1)
}
// App-replica poller: polls ngrok Vaults API and reconciles Deployments
// App-replica poller: polls ngrok Compute Replicas API and reconciles Deployments
if err := mgr.Add(&computecontroller.AppReplicaPoller{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("AppReplicaPoller"),
Namespace: opts.namespace,
NgrokClientset: ngrokClientset,
K8sOpName: opts.releaseName,
K8sOpNamespace: opts.namespace,
NgrokBaseClient: ngrok.NewBaseClient(ngrokClientConfig),
PollingInterval: 3 * time.Second,
}); err != nil {
return fmt.Errorf("unable to add AppReplicaPoller: %w", err)
@@ -486,12 +491,12 @@ func loadManager(k8sConfig *rest.Config, opts apiManagerOpts) (manager.Manager,
return mgr, nil
}
// loadNgrokClientset loads the ngrok API clientset from the environment and managerOpts
func loadNgrokClientset(ctx context.Context, opts apiManagerOpts) (ngrokapi.Clientset, error) {
// loadNgrokClientset loads the ngrok API clientset and base client config from the environment and managerOpts
func loadNgrokClientset(ctx context.Context, opts apiManagerOpts) (ngrokapi.Clientset, *ngrok.ClientConfig, error) {
var ok bool
opts.ngrokAPIKey, ok = os.LookupEnv("NGROK_API_KEY")
if !ok {
return nil, errors.New("NGROK_API_KEY environment variable should be set, but was not")
return nil, nil, errors.New("NGROK_API_KEY environment variable should be set, but was not")
}
clientConfigOpts := []ngrok.ClientConfigOption{
@@ -515,11 +520,11 @@ func loadNgrokClientset(ctx context.Context, opts apiManagerOpts) (ngrokapi.Clie
cIter := cApiKeys.List(&ngrok.Paging{Limit: new("1")})
cIter.Next(ctx)
if cIter.Err() != nil {
return nil, fmt.Errorf("Unable to verify API Key: %w", cIter.Err())
return nil, nil, fmt.Errorf("Unable to verify API Key: %w", cIter.Err())
}
ngrokClientset := ngrokapi.NewClientSet(ngrokClientConfig)
return ngrokClientset, nil
return ngrokClientset, ngrokClientConfig, nil
}
// getK8sResourceDriver returns a new Driver instance that is seeded with the current state of the cluster.
@@ -814,6 +819,7 @@ func createKubernetesOperator(ctx context.Context, client client.Client, opts ap
_, err := controllerutil.CreateOrUpdate(ctx, client, k8sOperator, func() error {
k8sOperator.Spec = ngrokv1alpha1.KubernetesOperatorSpec{
Description: opts.description,
Metadata: buildKubernetesOperatorMetadata(opts.computeMetadata),
Deployment: &ngrokv1alpha1.KubernetesOperatorDeployment{
Name: opts.releaseName,
Namespace: opts.namespace,
@@ -851,3 +857,20 @@ func createKubernetesOperator(ctx context.Context, client client.Client, opts ap
})
return err
}
// buildKubernetesOperatorMetadata constructs the metadata JSON string for the
// KubernetesOperator API resource, merging the default owned-by field with an
// optional compute metadata block.
func buildKubernetesOperatorMetadata(computeMetadataJSON string) string {
m := map[string]any{
"owned-by": "ngrok-operator",
}
if computeMetadataJSON != "" {
var compute any
if err := json.Unmarshal([]byte(computeMetadataJSON), &compute); err == nil {
m["compute"] = compute
}
}
b, _ := json.Marshal(m)
return string(b)
}
@@ -123,6 +123,9 @@ spec:
{{- end }}
{{- $metadataArgs | join "," }}
{{- end }}
{{- if .Values.compute }}
- --compute-metadata={{ .Values.compute | toJson | quote }}
{{- end }}
- --ingress-controller-name={{ .Values.controllerName | default .Values.ingress.controllerName }}
{{- if (.Values.watchNamespace | default .Values.ingress.watchNamespace) }}
- --ingress-watch-namespace={{ .Values.watchNamespace | default .Values.ingress.watchNamespace }}
+8
View File
@@ -424,6 +424,14 @@ bindings:
## @param bindings.forwarder.topologySpreadConstraints Topology Spread Constraints for the bindings forwarder pod(s)
topologySpreadConstraints: []
##
## @section Compute configuration
##
## @param compute Free-form compute metadata block passed to the KubernetesOperator API resource.
## Used for compute pool scheduling. Pass via --set-json to preserve snake_case keys.
## Example: --set-json 'compute={"pool_join_key":"cp_xxx","scheduler_props":{"region":"us-west-2"}}'
compute: {}
## @section Custom Resource Definitions installation
##
+106 -49
View File
@@ -9,9 +9,8 @@ import (
"time"
"github.com/go-logr/logr"
"github.com/ngrok/ngrok-api-go/v7"
ngrok "github.com/ngrok/ngrok-api-go/v7"
ngrokv1alpha1 "github.com/ngrok/ngrok-operator/api/ngrok/v1alpha1"
"github.com/ngrok/ngrok-operator/internal/ngrokapi"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -21,11 +20,8 @@ import (
)
const (
// LabelNgrokID is the label used to associate managed resources with vault objects.
// LabelNgrokID is the label used to associate managed resources with compute replica objects.
LabelNgrokID = "ngrok-id"
// runnerID is the runner ID this operator instance matches against.
runnerID = "default"
)
// replicaEndpoint represents a single endpoint with its URL and optional traffic policy.
@@ -34,13 +30,26 @@ type replicaEndpoint struct {
TrafficPolicy string `json:"traffic_policy"`
}
// vaultAppReplica is the parsed metadata shape we look for in vault objects.
type vaultAppReplica struct {
// computeReplica represents a replica object from the ngrok Compute Replicas API.
type computeReplica struct {
ID string `json:"id"`
Image string `json:"image"`
Endpoints []replicaEndpoint `json:"endpoints"`
CreatedAt string `json:"created_at"`
AppID string `json:"app_id"`
EnvironmentID string `json:"environment_id"`
DeploymentID string `json:"deployment_id"`
RunnerID string `json:"runner_id"`
State string `json:"state"`
ContainerImage string `json:"container_image"`
Endpoints []replicaEndpoint `json:"endpoints"`
EnvironmentVars map[string]string `json:"environment_vars"`
URI string `json:"uri"`
}
// computeReplicaList represents a paginated list response from the Compute Replicas API.
type computeReplicaList struct {
ComputeReplicas []computeReplica `json:"compute_replicas"`
URI string `json:"uri"`
NextPageURI *string `json:"next_page_uri"`
}
// urlPort extracts the port from a URL string. If no port is specified, returns 443.
@@ -57,19 +66,23 @@ func urlPort(rawURL string) (int, error) {
return 443, nil
}
func k8sName(id string) string {
return strings.ReplaceAll(id, "_", "-")
}
func deploymentName(id string) string {
return "app-replica-" + id
return "app-replica-" + k8sName(id)
}
func agentEndpointName(id string, port int) string {
return fmt.Sprintf("app-replica-%s-%d", id, port)
return fmt.Sprintf("app-replica-%s-%d", k8sName(id), port)
}
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=ngrok.k8s.ngrok.com,resources=agentendpoints,verbs=get;list;watch;create;update;delete
// AppReplicaPoller polls the ngrok Vaults API for app-replica objects
// AppReplicaPoller polls the ngrok Compute Replicas API for app-replica objects
// and reconciles them as Deployments, Services, and AgentEndpoints in the cluster.
type AppReplicaPoller struct {
client.Client
@@ -78,20 +91,33 @@ type AppReplicaPoller struct {
// Namespace is where resources are managed.
Namespace string
// NgrokClientset is the ngrok API clientset.
NgrokClientset ngrokapi.Clientset
// K8sOpName is the name of the KubernetesOperator CR to look up for the runner ID.
K8sOpName string
// K8sOpNamespace is the namespace of the KubernetesOperator CR.
K8sOpNamespace string
// NgrokBaseClient is the ngrok API base client.
NgrokBaseClient *ngrok.BaseClient
// PollingInterval is how often to poll the ngrok API.
PollingInterval time.Duration
stopCh chan struct{}
runnerID string
stopCh chan struct{}
}
// Start implements manager.Runnable.
func (r *AppReplicaPoller) Start(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx).WithName("AppReplicaPoller")
log.Info("Starting app-replica polling routine")
r.runnerID = r.waitForRunnerID(ctx, log)
if r.runnerID == "" {
log.Info("Context canceled before runner ID was available, not starting")
return nil
}
log.Info("Starting app-replica polling routine", "runner_id", r.runnerID)
r.stopCh = make(chan struct{})
defer close(r.stopCh)
@@ -114,7 +140,7 @@ func (r *AppReplicaPoller) pollLoop(ctx context.Context, log logr.Logger) {
for {
select {
case <-ticker.C:
log.V(9).Info("Polling vaults for app-replica objects")
log.V(9).Info("Polling compute replicas API")
if err := r.reconcile(ctx, log); err != nil {
log.Error(err, "reconcile failed")
}
@@ -124,15 +150,15 @@ func (r *AppReplicaPoller) pollLoop(ctx context.Context, log logr.Logger) {
}
}
// reconcile fetches desired state from the Vaults API, fetches current
// reconcile fetches desired state from the Replicas API, fetches current
// managed resources, then creates or deletes to converge.
func (r *AppReplicaPoller) reconcile(ctx context.Context, log logr.Logger) error {
desired, err := r.fetchDesiredReplicas(ctx, log)
if err != nil {
return fmt.Errorf("fetching vault app-replicas: %w", err)
return fmt.Errorf("fetching compute replicas: %w", err)
}
desiredByID := make(map[string]vaultAppReplica, len(desired))
desiredByID := make(map[string]computeReplica, len(desired))
for _, d := range desired {
desiredByID[d.ID] = d
}
@@ -156,7 +182,7 @@ func (r *AppReplicaPoller) reconcile(ctx context.Context, log logr.Logger) error
if _, exists := existingByID[id]; exists {
continue
}
log.Info("Creating resources for app-replica", "id", id, "image", replica.Image)
log.Info("Creating resources for app-replica", "id", id, "image", replica.ContainerImage)
if err := r.createResources(ctx, log, replica); err != nil {
log.Error(err, "failed to create resources", "id", id)
}
@@ -175,43 +201,74 @@ func (r *AppReplicaPoller) reconcile(ctx context.Context, log logr.Logger) error
return nil
}
// fetchDesiredReplicas lists all vaults and parses their metadata for app-replica entries.
func (r *AppReplicaPoller) fetchDesiredReplicas(ctx context.Context, log logr.Logger) ([]vaultAppReplica, error) {
var replicas []vaultAppReplica
// waitForRunnerID polls the KubernetesOperator CR until it has a registered ID to use as the runner ID.
func (r *AppReplicaPoller) waitForRunnerID(ctx context.Context, log logr.Logger) string {
log.Info("Waiting for KubernetesOperator to be registered")
iter := r.NgrokClientset.Vaults().List(&ngrok.Paging{})
for iter.Next(ctx) {
vault := iter.Item()
if vault == nil || vault.Metadata == "" {
continue
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
var parsed vaultAppReplica
if err := json.Unmarshal([]byte(vault.Metadata), &parsed); err != nil {
log.V(5).Info("Skipping vault with non-JSON metadata", "vault_id", vault.ID)
continue
}
for {
select {
case <-ticker.C:
ticker.Stop()
ticker.Reset(30 * time.Second)
if parsed.RunnerID != runnerID {
continue
}
if parsed.Image == "" || parsed.ID == "" || len(parsed.Endpoints) == 0 {
log.V(3).Info("Skipping vault with missing required fields", "vault_id", vault.ID)
continue
}
var ko ngrokv1alpha1.KubernetesOperator
if err := r.Get(ctx, client.ObjectKey{Name: r.K8sOpName, Namespace: r.K8sOpNamespace}, &ko); err != nil {
log.Error(err, "Failed to get KubernetesOperator", "name", r.K8sOpName)
continue
}
parsed.ID = strings.ToLower(parsed.ID)
replicas = append(replicas, parsed)
if ko.Status.ID == "" {
log.V(1).Info("KubernetesOperator not yet registered, waiting...")
continue
}
log.Info("KubernetesOperator registered, using as runner ID", "id", ko.Status.ID)
return ko.Status.ID
case <-ctx.Done():
return ""
}
}
}
// fetchDesiredReplicas lists running compute replicas assigned to this runner via the Compute Replicas API.
func (r *AppReplicaPoller) fetchDesiredReplicas(ctx context.Context, log logr.Logger) ([]computeReplica, error) {
var replicas []computeReplica
filter := fmt.Sprintf(`obj.runner_id == "%s" && obj.state == "running"`, r.runnerID)
nextPage := &url.URL{
Path: "/compute/replicas",
RawQuery: url.Values{"filter": {filter}}.Encode(),
}
if err := iter.Err(); err != nil {
return nil, err
for nextPage != nil {
var resp computeReplicaList
if err := r.NgrokBaseClient.Do(ctx, "GET", nextPage, nil, &resp); err != nil {
return nil, fmt.Errorf("listing compute replicas: %w", err)
}
for _, replica := range resp.ComputeReplicas {
if replica.ContainerImage == "" || replica.ID == "" || len(replica.Endpoints) == 0 {
log.V(3).Info("Skipping replica with missing required fields", "replica_id", replica.ID)
continue
}
replica.ID = strings.ToLower(replica.ID)
replicas = append(replicas, replica)
}
if resp.NextPageURI != nil {
nextPage, _ = url.Parse(*resp.NextPageURI)
} else {
nextPage = nil
}
}
return replicas, nil
}
func (r *AppReplicaPoller) createResources(ctx context.Context, log logr.Logger, replica vaultAppReplica) error {
func (r *AppReplicaPoller) createResources(ctx context.Context, log logr.Logger, replica computeReplica) error {
name := deploymentName(replica.ID)
labels := map[string]string{LabelNgrokID: replica.ID}
@@ -273,7 +330,7 @@ func (r *AppReplicaPoller) createResources(ctx context.Context, log logr.Logger,
Containers: []corev1.Container{
{
Name: "app",
Image: replica.Image,
Image: replica.ContainerImage,
Ports: containerPorts,
Env: envVars,
},
-11
View File
@@ -10,7 +10,6 @@ import (
"github.com/ngrok/ngrok-api-go/v7/kubernetes_operators"
"github.com/ngrok/ngrok-api-go/v7/reserved_addrs"
"github.com/ngrok/ngrok-api-go/v7/reserved_domains"
"github.com/ngrok/ngrok-api-go/v7/vaults"
)
type Clientset interface {
@@ -20,7 +19,6 @@ type Clientset interface {
IPPolicyRules() IPPolicyRulesClient
KubernetesOperators() KubernetesOperatorsClient
TCPAddresses() TCPAddressesClient
Vaults() VaultsClient
}
type DefaultClientset struct {
@@ -30,7 +28,6 @@ type DefaultClientset struct {
ipPolicyRulesClient *ip_policy_rules.Client
kubernetesOperatorsClient *kubernetes_operators.Client
tcpAddrsClient *reserved_addrs.Client
vaultsClient *vaults.Client
}
// NewClientSet creates a new ClientSet from an ngrok client config.
@@ -42,7 +39,6 @@ func NewClientSet(config *ngrok.ClientConfig) *DefaultClientset {
ipPolicyRulesClient: ip_policy_rules.NewClient(config),
kubernetesOperatorsClient: kubernetes_operators.NewClient(config),
tcpAddrsClient: reserved_addrs.NewClient(config),
vaultsClient: vaults.NewClient(config),
}
}
@@ -135,10 +131,3 @@ func (c *DefaultClientset) TCPAddresses() TCPAddressesClient {
return c.tcpAddrsClient
}
type VaultsClient interface {
Lister[*ngrok.Vault]
}
func (c *DefaultClientset) Vaults() VaultsClient {
return c.vaultsClient
}