mirror of
https://github.com/traefik/mesh.git
synced 2026-05-02 18:32:32 +00:00
384 lines
12 KiB
Go
384 lines
12 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
accessinformer "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/informers/externalversions"
|
|
accesslister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/listers/access/v1alpha2"
|
|
specsinformer "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/informers/externalversions"
|
|
specslister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/listers/specs/v1alpha3"
|
|
splitinformer "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/informers/externalversions"
|
|
splitlister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/listers/split/v1alpha3"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/traefik/mesh/v2/cmd"
|
|
"github.com/traefik/mesh/v2/pkg/annotations"
|
|
"github.com/traefik/mesh/v2/pkg/k8s"
|
|
"github.com/traefik/mesh/v2/pkg/portmapping"
|
|
"github.com/traefik/mesh/v2/pkg/provider"
|
|
"github.com/traefik/mesh/v2/pkg/topology"
|
|
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers"
|
|
listers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
)
|
|
|
|
const (
|
|
// configRefreshKey is the work queue key used to indicate that config has to be refreshed.
|
|
configRefreshKey = "refresh"
|
|
|
|
// maxRetries is the number of times a work task will be retried before it is dropped out of the queue.
|
|
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times a
|
|
// work task is going to be re-queued: 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s.
|
|
maxRetries = 12
|
|
)
|
|
|
|
// SharedStore is used to share the controller state.
|
|
type SharedStore interface {
|
|
SetConfiguration(cfg *dynamic.Configuration)
|
|
SetTopology(topo *topology.Topology)
|
|
SetReadiness(isReady bool)
|
|
}
|
|
|
|
// TopologyBuilder builds Topologies.
|
|
type TopologyBuilder interface {
|
|
Build(resourceFilter *k8s.ResourceFilter) (*topology.Topology, error)
|
|
}
|
|
|
|
// Config holds the configuration of the controller.
|
|
type Config struct {
|
|
ACLEnabled bool
|
|
DefaultMode string
|
|
Namespace string
|
|
WatchNamespaces []string
|
|
IgnoreNamespaces []string
|
|
MinHTTPPort int32
|
|
MaxHTTPPort int32
|
|
MinTCPPort int32
|
|
MaxTCPPort int32
|
|
MinUDPPort int32
|
|
MaxUDPPort int32
|
|
}
|
|
|
|
// Controller hold controller configuration.
|
|
type Controller struct {
|
|
mu sync.Mutex
|
|
stopCh chan struct{}
|
|
|
|
cfg Config
|
|
workQueue workqueue.RateLimitingInterface
|
|
shadowServiceManager *ShadowServiceManager
|
|
provider *provider.Provider
|
|
resourceFilter *k8s.ResourceFilter
|
|
httpStateTable *portmapping.MultiplexedPortMapping
|
|
tcpStateTable *portmapping.PortMapping
|
|
udpStateTable *portmapping.PortMapping
|
|
topologyBuilder TopologyBuilder
|
|
store SharedStore
|
|
logger logrus.FieldLogger
|
|
|
|
clients k8s.Client
|
|
kubernetesFactory informers.SharedInformerFactory
|
|
accessFactory accessinformer.SharedInformerFactory
|
|
specsFactory specsinformer.SharedInformerFactory
|
|
splitFactory splitinformer.SharedInformerFactory
|
|
podLister listers.PodLister
|
|
serviceLister listers.ServiceLister
|
|
endpointsLister listers.EndpointsLister
|
|
trafficTargetLister accesslister.TrafficTargetLister
|
|
httpRouteGroupLister specslister.HTTPRouteGroupLister
|
|
tcpRouteLister specslister.TCPRouteLister
|
|
trafficSplitLister splitlister.TrafficSplitLister
|
|
}
|
|
|
|
// NewMeshController builds the informers and other required components of the mesh controller, and returns an
|
|
// initialized mesh controller object.
|
|
func NewMeshController(clients k8s.Client, cfg Config, store SharedStore, logger logrus.FieldLogger) *Controller {
|
|
c := &Controller{
|
|
logger: logger,
|
|
cfg: cfg,
|
|
clients: clients,
|
|
store: store,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// Initialize the ignored and watched resources.
|
|
c.resourceFilter = k8s.NewResourceFilter(
|
|
k8s.WatchNamespaces(cfg.WatchNamespaces...),
|
|
k8s.IgnoreNamespaces(cfg.IgnoreNamespaces...),
|
|
k8s.IgnoreNamespaces(metav1.NamespaceSystem),
|
|
k8s.IgnoreService(metav1.NamespaceDefault, "kubernetes"),
|
|
k8s.IgnoreLabel(k8s.LabelPartOf, k8s.AppName),
|
|
)
|
|
|
|
// Create the work queue and the enqueue handler.
|
|
c.workQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
|
handler := cache.FilteringResourceEventHandler{
|
|
FilterFunc: c.isWatchedResource,
|
|
Handler: &enqueueWorkHandler{logger: c.logger, workQueue: c.workQueue},
|
|
}
|
|
|
|
// Create SharedInformers, listers and register the event handler to informers that are not ACL related.
|
|
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.KubernetesClient(), k8s.ResyncPeriod)
|
|
c.splitFactory = splitinformer.NewSharedInformerFactoryWithOptions(c.clients.SplitClient(), k8s.ResyncPeriod)
|
|
c.specsFactory = specsinformer.NewSharedInformerFactoryWithOptions(c.clients.SpecsClient(), k8s.ResyncPeriod)
|
|
|
|
c.podLister = c.kubernetesFactory.Core().V1().Pods().Lister()
|
|
c.endpointsLister = c.kubernetesFactory.Core().V1().Endpoints().Lister()
|
|
c.serviceLister = c.kubernetesFactory.Core().V1().Services().Lister()
|
|
c.trafficSplitLister = c.splitFactory.Split().V1alpha3().TrafficSplits().Lister()
|
|
c.httpRouteGroupLister = c.specsFactory.Specs().V1alpha3().HTTPRouteGroups().Lister()
|
|
c.tcpRouteLister = c.specsFactory.Specs().V1alpha3().TCPRoutes().Lister()
|
|
|
|
c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(handler)
|
|
c.kubernetesFactory.Core().V1().Endpoints().Informer().AddEventHandler(handler)
|
|
c.splitFactory.Split().V1alpha3().TrafficSplits().Informer().AddEventHandler(handler)
|
|
c.specsFactory.Specs().V1alpha3().HTTPRouteGroups().Informer().AddEventHandler(handler)
|
|
c.specsFactory.Specs().V1alpha3().TCPRoutes().Informer().AddEventHandler(handler)
|
|
|
|
// Create SharedInformers, listers and register the event handler for ACL related resources.
|
|
if c.cfg.ACLEnabled {
|
|
c.accessFactory = accessinformer.NewSharedInformerFactoryWithOptions(c.clients.AccessClient(), k8s.ResyncPeriod)
|
|
|
|
c.trafficTargetLister = c.accessFactory.Access().V1alpha2().TrafficTargets().Lister()
|
|
|
|
c.accessFactory.Access().V1alpha2().TrafficTargets().Informer().AddEventHandler(handler)
|
|
c.kubernetesFactory.Core().V1().Pods().Informer().AddEventHandler(handler)
|
|
}
|
|
|
|
c.httpStateTable = portmapping.NewMultiplexedPortMapping(c.cfg.MinHTTPPort, c.cfg.MaxHTTPPort)
|
|
c.tcpStateTable = portmapping.NewPortMapping(c.cfg.MinTCPPort, c.cfg.MaxTCPPort)
|
|
c.udpStateTable = portmapping.NewPortMapping(c.cfg.MinUDPPort, c.cfg.MaxUDPPort)
|
|
|
|
c.shadowServiceManager = &ShadowServiceManager{
|
|
namespace: c.cfg.Namespace,
|
|
serviceLister: c.serviceLister,
|
|
httpStateTable: c.httpStateTable,
|
|
tcpStateTable: c.tcpStateTable,
|
|
udpStateTable: c.udpStateTable,
|
|
defaultTrafficType: c.cfg.DefaultMode,
|
|
kubeClient: c.clients.KubernetesClient(),
|
|
logger: c.logger,
|
|
}
|
|
|
|
c.topologyBuilder = topology.NewBuilder(
|
|
c.serviceLister,
|
|
c.endpointsLister,
|
|
c.podLister,
|
|
c.trafficTargetLister,
|
|
c.trafficSplitLister,
|
|
c.httpRouteGroupLister,
|
|
c.tcpRouteLister,
|
|
c.logger,
|
|
)
|
|
|
|
providerCfg := provider.Config{
|
|
ACL: c.cfg.ACLEnabled,
|
|
DefaultTrafficType: c.cfg.DefaultMode,
|
|
}
|
|
|
|
c.provider = provider.New(
|
|
c.httpStateTable,
|
|
c.tcpStateTable,
|
|
c.udpStateTable,
|
|
annotations.BuildMiddlewares,
|
|
providerCfg,
|
|
c.logger,
|
|
)
|
|
|
|
return c
|
|
}
|
|
|
|
// Run is the main controller loop.
|
|
func (c *Controller) Run() error {
|
|
// Handle a panic with logging and exiting.
|
|
defer utilruntime.HandleCrash()
|
|
|
|
waitGroup := sync.WaitGroup{}
|
|
|
|
defer func() {
|
|
c.logger.Info("Shutting down workers")
|
|
c.workQueue.ShutDown()
|
|
|
|
waitGroup.Wait()
|
|
}()
|
|
|
|
c.logger.Debug("Initializing mesh controller")
|
|
|
|
// Start the informers.
|
|
if err := c.startInformers(10 * time.Second); err != nil {
|
|
return fmt.Errorf("could not start informers: %w", err)
|
|
}
|
|
|
|
// Load port mappings.
|
|
if err := c.shadowServiceManager.LoadPortMapping(); err != nil {
|
|
return fmt.Errorf("could not load port mapper states: %w", err)
|
|
}
|
|
|
|
// Enable API readiness endpoint, informers are started and default conf is available.
|
|
c.store.SetReadiness(true)
|
|
|
|
// Start to poll work from the queue.
|
|
waitGroup.Add(1)
|
|
|
|
runWorker := func() {
|
|
defer waitGroup.Done()
|
|
c.runWorker()
|
|
}
|
|
|
|
go wait.Until(runWorker, time.Second, c.stopCh)
|
|
|
|
<-c.stopCh
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown shut downs the controller.
|
|
func (c *Controller) Shutdown() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
select {
|
|
case <-c.stopCh:
|
|
// Already closed. Don't close again.
|
|
default:
|
|
// Safe to close. We're the only closer, guarded by c.mu.
|
|
close(c.stopCh)
|
|
}
|
|
}
|
|
|
|
// startInformers starts the controller informers.
|
|
func (c *Controller) startInformers(syncTimeout time.Duration) error {
|
|
ctx, cancel := context.WithTimeout(cmd.ContextWithStopChan(context.Background(), c.stopCh), syncTimeout)
|
|
defer cancel()
|
|
|
|
c.logger.Debug("Starting Informers")
|
|
|
|
if err := c.startBaseInformers(ctx.Done()); err != nil {
|
|
return err
|
|
}
|
|
|
|
if c.cfg.ACLEnabled {
|
|
if err := c.startACLInformers(ctx.Done()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Controller) startBaseInformers(stopCh <-chan struct{}) error {
|
|
c.kubernetesFactory.Start(c.stopCh)
|
|
|
|
for t, ok := range c.kubernetesFactory.WaitForCacheSync(stopCh) {
|
|
if !ok {
|
|
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
|
|
}
|
|
}
|
|
|
|
c.splitFactory.Start(c.stopCh)
|
|
|
|
for t, ok := range c.splitFactory.WaitForCacheSync(stopCh) {
|
|
if !ok {
|
|
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
|
|
}
|
|
}
|
|
|
|
c.specsFactory.Start(c.stopCh)
|
|
|
|
for t, ok := range c.specsFactory.WaitForCacheSync(stopCh) {
|
|
if !ok {
|
|
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Controller) startACLInformers(stopCh <-chan struct{}) error {
|
|
c.accessFactory.Start(c.stopCh)
|
|
|
|
for t, ok := range c.accessFactory.WaitForCacheSync(stopCh) {
|
|
if !ok {
|
|
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isWatchedResource returns true if the given resource is not ignored, false otherwise.
|
|
func (c *Controller) isWatchedResource(obj interface{}) bool {
|
|
return !c.resourceFilter.IsIgnored(obj)
|
|
}
|
|
|
|
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and
|
|
// process a message on the work queue.
|
|
func (c *Controller) runWorker() {
|
|
for c.processNextWorkItem() {
|
|
}
|
|
}
|
|
|
|
// processNextWorkItem will read a single work item off the work queue and attempt to process it.
|
|
func (c *Controller) processNextWorkItem() bool {
|
|
key, quit := c.workQueue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
|
|
defer c.workQueue.Done(key)
|
|
|
|
if key != configRefreshKey {
|
|
if err := c.syncShadowService(key.(string)); err != nil {
|
|
c.handleErr(key, fmt.Errorf("unable to sync shadow service: %w", err))
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Build and store config.
|
|
topo, err := c.topologyBuilder.Build(c.resourceFilter)
|
|
if err != nil {
|
|
c.handleErr(key, fmt.Errorf("unable to build topology: %w", err))
|
|
return true
|
|
}
|
|
|
|
conf := c.provider.BuildConfig(topo)
|
|
|
|
c.store.SetTopology(topo)
|
|
c.store.SetConfiguration(conf)
|
|
|
|
c.workQueue.Forget(key)
|
|
|
|
return true
|
|
}
|
|
|
|
// syncShadowService calls the shadow service manager to keep the shadow service state in sync with the service events received.
|
|
func (c *Controller) syncShadowService(key string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.shadowServiceManager.SyncService(ctx, namespace, name)
|
|
}
|
|
|
|
// handleErr re-queues the given work key only if the maximum number of attempts is not exceeded.
|
|
func (c *Controller) handleErr(key interface{}, err error) {
|
|
if c.workQueue.NumRequeues(key) < maxRetries {
|
|
c.workQueue.AddRateLimited(key)
|
|
return
|
|
}
|
|
|
|
c.logger.Errorf("Unable to complete work %q: %v", key, err)
|
|
c.workQueue.Forget(key)
|
|
}
|