Files
2020-10-20 15:54:04 +02:00

384 lines
12 KiB
Go

package controller
import (
"context"
"fmt"
"sync"
"time"
accessinformer "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/informers/externalversions"
accesslister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/listers/access/v1alpha2"
specsinformer "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/informers/externalversions"
specslister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/listers/specs/v1alpha3"
splitinformer "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/informers/externalversions"
splitlister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/listers/split/v1alpha3"
"github.com/sirupsen/logrus"
"github.com/traefik/mesh/v2/cmd"
"github.com/traefik/mesh/v2/pkg/annotations"
"github.com/traefik/mesh/v2/pkg/k8s"
"github.com/traefik/mesh/v2/pkg/portmapping"
"github.com/traefik/mesh/v2/pkg/provider"
"github.com/traefik/mesh/v2/pkg/topology"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
// configRefreshKey is the work queue key used to indicate that config has to be refreshed.
configRefreshKey = "refresh"
// maxRetries is the number of times a work task will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times a
// work task is going to be re-queued: 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s.
maxRetries = 12
)
// SharedStore is used to share the controller state.
type SharedStore interface {
SetConfiguration(cfg *dynamic.Configuration)
SetTopology(topo *topology.Topology)
SetReadiness(isReady bool)
}
// TopologyBuilder builds Topologies.
type TopologyBuilder interface {
Build(resourceFilter *k8s.ResourceFilter) (*topology.Topology, error)
}
// Config holds the configuration of the controller.
type Config struct {
ACLEnabled bool
DefaultMode string
Namespace string
WatchNamespaces []string
IgnoreNamespaces []string
MinHTTPPort int32
MaxHTTPPort int32
MinTCPPort int32
MaxTCPPort int32
MinUDPPort int32
MaxUDPPort int32
}
// Controller hold controller configuration.
type Controller struct {
mu sync.Mutex
stopCh chan struct{}
cfg Config
workQueue workqueue.RateLimitingInterface
shadowServiceManager *ShadowServiceManager
provider *provider.Provider
resourceFilter *k8s.ResourceFilter
httpStateTable *portmapping.MultiplexedPortMapping
tcpStateTable *portmapping.PortMapping
udpStateTable *portmapping.PortMapping
topologyBuilder TopologyBuilder
store SharedStore
logger logrus.FieldLogger
clients k8s.Client
kubernetesFactory informers.SharedInformerFactory
accessFactory accessinformer.SharedInformerFactory
specsFactory specsinformer.SharedInformerFactory
splitFactory splitinformer.SharedInformerFactory
podLister listers.PodLister
serviceLister listers.ServiceLister
endpointsLister listers.EndpointsLister
trafficTargetLister accesslister.TrafficTargetLister
httpRouteGroupLister specslister.HTTPRouteGroupLister
tcpRouteLister specslister.TCPRouteLister
trafficSplitLister splitlister.TrafficSplitLister
}
// NewMeshController builds the informers and other required components of the mesh controller, and returns an
// initialized mesh controller object.
func NewMeshController(clients k8s.Client, cfg Config, store SharedStore, logger logrus.FieldLogger) *Controller {
c := &Controller{
logger: logger,
cfg: cfg,
clients: clients,
store: store,
stopCh: make(chan struct{}),
}
// Initialize the ignored and watched resources.
c.resourceFilter = k8s.NewResourceFilter(
k8s.WatchNamespaces(cfg.WatchNamespaces...),
k8s.IgnoreNamespaces(cfg.IgnoreNamespaces...),
k8s.IgnoreNamespaces(metav1.NamespaceSystem),
k8s.IgnoreService(metav1.NamespaceDefault, "kubernetes"),
k8s.IgnoreLabel(k8s.LabelPartOf, k8s.AppName),
)
// Create the work queue and the enqueue handler.
c.workQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
handler := cache.FilteringResourceEventHandler{
FilterFunc: c.isWatchedResource,
Handler: &enqueueWorkHandler{logger: c.logger, workQueue: c.workQueue},
}
// Create SharedInformers, listers and register the event handler to informers that are not ACL related.
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.KubernetesClient(), k8s.ResyncPeriod)
c.splitFactory = splitinformer.NewSharedInformerFactoryWithOptions(c.clients.SplitClient(), k8s.ResyncPeriod)
c.specsFactory = specsinformer.NewSharedInformerFactoryWithOptions(c.clients.SpecsClient(), k8s.ResyncPeriod)
c.podLister = c.kubernetesFactory.Core().V1().Pods().Lister()
c.endpointsLister = c.kubernetesFactory.Core().V1().Endpoints().Lister()
c.serviceLister = c.kubernetesFactory.Core().V1().Services().Lister()
c.trafficSplitLister = c.splitFactory.Split().V1alpha3().TrafficSplits().Lister()
c.httpRouteGroupLister = c.specsFactory.Specs().V1alpha3().HTTPRouteGroups().Lister()
c.tcpRouteLister = c.specsFactory.Specs().V1alpha3().TCPRoutes().Lister()
c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(handler)
c.kubernetesFactory.Core().V1().Endpoints().Informer().AddEventHandler(handler)
c.splitFactory.Split().V1alpha3().TrafficSplits().Informer().AddEventHandler(handler)
c.specsFactory.Specs().V1alpha3().HTTPRouteGroups().Informer().AddEventHandler(handler)
c.specsFactory.Specs().V1alpha3().TCPRoutes().Informer().AddEventHandler(handler)
// Create SharedInformers, listers and register the event handler for ACL related resources.
if c.cfg.ACLEnabled {
c.accessFactory = accessinformer.NewSharedInformerFactoryWithOptions(c.clients.AccessClient(), k8s.ResyncPeriod)
c.trafficTargetLister = c.accessFactory.Access().V1alpha2().TrafficTargets().Lister()
c.accessFactory.Access().V1alpha2().TrafficTargets().Informer().AddEventHandler(handler)
c.kubernetesFactory.Core().V1().Pods().Informer().AddEventHandler(handler)
}
c.httpStateTable = portmapping.NewMultiplexedPortMapping(c.cfg.MinHTTPPort, c.cfg.MaxHTTPPort)
c.tcpStateTable = portmapping.NewPortMapping(c.cfg.MinTCPPort, c.cfg.MaxTCPPort)
c.udpStateTable = portmapping.NewPortMapping(c.cfg.MinUDPPort, c.cfg.MaxUDPPort)
c.shadowServiceManager = &ShadowServiceManager{
namespace: c.cfg.Namespace,
serviceLister: c.serviceLister,
httpStateTable: c.httpStateTable,
tcpStateTable: c.tcpStateTable,
udpStateTable: c.udpStateTable,
defaultTrafficType: c.cfg.DefaultMode,
kubeClient: c.clients.KubernetesClient(),
logger: c.logger,
}
c.topologyBuilder = topology.NewBuilder(
c.serviceLister,
c.endpointsLister,
c.podLister,
c.trafficTargetLister,
c.trafficSplitLister,
c.httpRouteGroupLister,
c.tcpRouteLister,
c.logger,
)
providerCfg := provider.Config{
ACL: c.cfg.ACLEnabled,
DefaultTrafficType: c.cfg.DefaultMode,
}
c.provider = provider.New(
c.httpStateTable,
c.tcpStateTable,
c.udpStateTable,
annotations.BuildMiddlewares,
providerCfg,
c.logger,
)
return c
}
// Run is the main controller loop.
func (c *Controller) Run() error {
// Handle a panic with logging and exiting.
defer utilruntime.HandleCrash()
waitGroup := sync.WaitGroup{}
defer func() {
c.logger.Info("Shutting down workers")
c.workQueue.ShutDown()
waitGroup.Wait()
}()
c.logger.Debug("Initializing mesh controller")
// Start the informers.
if err := c.startInformers(10 * time.Second); err != nil {
return fmt.Errorf("could not start informers: %w", err)
}
// Load port mappings.
if err := c.shadowServiceManager.LoadPortMapping(); err != nil {
return fmt.Errorf("could not load port mapper states: %w", err)
}
// Enable API readiness endpoint, informers are started and default conf is available.
c.store.SetReadiness(true)
// Start to poll work from the queue.
waitGroup.Add(1)
runWorker := func() {
defer waitGroup.Done()
c.runWorker()
}
go wait.Until(runWorker, time.Second, c.stopCh)
<-c.stopCh
return nil
}
// Shutdown shut downs the controller.
func (c *Controller) Shutdown() {
c.mu.Lock()
defer c.mu.Unlock()
select {
case <-c.stopCh:
// Already closed. Don't close again.
default:
// Safe to close. We're the only closer, guarded by c.mu.
close(c.stopCh)
}
}
// startInformers starts the controller informers.
func (c *Controller) startInformers(syncTimeout time.Duration) error {
ctx, cancel := context.WithTimeout(cmd.ContextWithStopChan(context.Background(), c.stopCh), syncTimeout)
defer cancel()
c.logger.Debug("Starting Informers")
if err := c.startBaseInformers(ctx.Done()); err != nil {
return err
}
if c.cfg.ACLEnabled {
if err := c.startACLInformers(ctx.Done()); err != nil {
return err
}
}
return nil
}
func (c *Controller) startBaseInformers(stopCh <-chan struct{}) error {
c.kubernetesFactory.Start(c.stopCh)
for t, ok := range c.kubernetesFactory.WaitForCacheSync(stopCh) {
if !ok {
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
}
}
c.splitFactory.Start(c.stopCh)
for t, ok := range c.splitFactory.WaitForCacheSync(stopCh) {
if !ok {
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
}
}
c.specsFactory.Start(c.stopCh)
for t, ok := range c.specsFactory.WaitForCacheSync(stopCh) {
if !ok {
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
}
}
return nil
}
func (c *Controller) startACLInformers(stopCh <-chan struct{}) error {
c.accessFactory.Start(c.stopCh)
for t, ok := range c.accessFactory.WaitForCacheSync(stopCh) {
if !ok {
return fmt.Errorf("timed out waiting for controller caches to sync: %s", t)
}
}
return nil
}
// isWatchedResource returns true if the given resource is not ignored, false otherwise.
func (c *Controller) isWatchedResource(obj interface{}) bool {
return !c.resourceFilter.IsIgnored(obj)
}
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and
// process a message on the work queue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the work queue and attempt to process it.
func (c *Controller) processNextWorkItem() bool {
key, quit := c.workQueue.Get()
if quit {
return false
}
defer c.workQueue.Done(key)
if key != configRefreshKey {
if err := c.syncShadowService(key.(string)); err != nil {
c.handleErr(key, fmt.Errorf("unable to sync shadow service: %w", err))
return true
}
}
// Build and store config.
topo, err := c.topologyBuilder.Build(c.resourceFilter)
if err != nil {
c.handleErr(key, fmt.Errorf("unable to build topology: %w", err))
return true
}
conf := c.provider.BuildConfig(topo)
c.store.SetTopology(topo)
c.store.SetConfiguration(conf)
c.workQueue.Forget(key)
return true
}
// syncShadowService calls the shadow service manager to keep the shadow service state in sync with the service events received.
func (c *Controller) syncShadowService(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
return c.shadowServiceManager.SyncService(ctx, namespace, name)
}
// handleErr re-queues the given work key only if the maximum number of attempts is not exceeded.
func (c *Controller) handleErr(key interface{}, err error) {
if c.workQueue.NumRequeues(key) < maxRetries {
c.workQueue.AddRateLimited(key)
return
}
c.logger.Errorf("Unable to complete work %q: %v", key, err)
c.workQueue.Forget(key)
}