Files
2022-01-04 10:26:06 +01:00

290 lines
9.6 KiB
Go

package controller
import (
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"github.com/traefik/mesh/pkg/annotations"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
)
// PortMapper is capable of storing and retrieving a port mapping for a given service.
type PortMapper interface {
Find(namespace, name string, port int32) (int32, bool)
Add(namespace, name string, port int32) (int32, error)
Remove(namespace, name string, port int32) (int32, error)
}
// ShadowServiceManager manages shadow services.
type ShadowServiceManager struct {
logger logrus.FieldLogger
serviceLister listers.ServiceLister
namespace string
tcpStateTable PortMapper
udpStateTable PortMapper
defaultTrafficType string
minHTTPPort int32
maxHTTPPort int32
kubeClient kubernetes.Interface
}
// NewShadowServiceManager returns new shadow service manager.
func NewShadowServiceManager(logger logrus.FieldLogger, serviceLister listers.ServiceLister, namespace string, tcpStateTable, udpStateTable PortMapper, defaultTrafficType string, minHTTPPort, maxHTTPPort int32, kubeClient kubernetes.Interface) *ShadowServiceManager {
return &ShadowServiceManager{
logger: logger,
serviceLister: serviceLister,
namespace: namespace,
tcpStateTable: tcpStateTable,
udpStateTable: udpStateTable,
defaultTrafficType: defaultTrafficType,
minHTTPPort: minHTTPPort,
maxHTTPPort: maxHTTPPort,
kubeClient: kubeClient,
}
}
// CreateOrUpdate creates or updates the shadow service corresponding to the given service.
func (s *ShadowServiceManager) CreateOrUpdate(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) {
shadowSvcName := s.getShadowServiceName(svc.Namespace, svc.Name)
shadowSvc, err := s.serviceLister.Services(s.namespace).Get(shadowSvcName)
if err != nil && !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("unable to get shadow service %q: %w", shadowSvcName, err)
}
// Removes the current mappings for the ports that are not present in the new service version.
// Current shadow service ports are equal to the ports mapped for the previous service version.
// This step is required to free up some ports before allocation.
s.removeUnusedPortMappings(shadowSvc, svc)
ports, err := s.getShadowServicePorts(svc)
if err != nil {
return nil, fmt.Errorf("unable to get shadow service ports for service %s/%s: %w", svc.Namespace, svc.Name, err)
}
newShadowSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: shadowSvcName,
Namespace: s.namespace,
Labels: map[string]string{
"app": "maesh",
"type": "shadow",
},
},
Spec: corev1.ServiceSpec{
Ports: ports,
Selector: map[string]string{
"component": "maesh-mesh",
},
},
}
if shadowSvc == nil {
return s.kubeClient.CoreV1().Services(s.namespace).Create(ctx, newShadowSvc, metav1.CreateOptions{})
}
// Ensure that we are not leaking some port mappings if the traffic type of the new service version has been updated.
// If the traffic has been updated, some ports may be missing if they are not suitable, and some target port values may not match.
s.cleanupPortMappings(svc.Namespace, svc.Name, shadowSvc, newShadowSvc)
shadowSvc = shadowSvc.DeepCopy()
shadowSvc.Spec.Ports = newShadowSvc.Spec.Ports
return s.kubeClient.CoreV1().Services(s.namespace).Update(ctx, shadowSvc, metav1.UpdateOptions{})
}
// Delete deletes the shadow service associated with the given service.
func (s *ShadowServiceManager) Delete(ctx context.Context, namespace, name string) error {
shadowSvcName := s.getShadowServiceName(namespace, name)
shadowSvc, err := s.serviceLister.Services(s.namespace).Get(shadowSvcName)
if kerrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Removes all port mappings for the deleted service.
// Current shadow service ports are equal to the deleted service ports.
for _, svcPort := range shadowSvc.Spec.Ports {
s.removeServicePortMapping(namespace, name, svcPort)
}
return s.kubeClient.CoreV1().Services(s.namespace).Delete(ctx, shadowSvcName, metav1.DeleteOptions{})
}
func (s *ShadowServiceManager) cleanupPortMappings(namespace, name string, oldShadowSvc, newShadowSvc *corev1.Service) {
for _, oldPort := range oldShadowSvc.Spec.Ports {
if !needsCleanup(newShadowSvc.Spec.Ports, oldPort) {
continue
}
s.removeServicePortMapping(namespace, name, oldPort)
}
}
func (s *ShadowServiceManager) removeUnusedPortMappings(shadowSvc, svc *corev1.Service) {
if svc == nil || shadowSvc == nil {
return
}
for _, shadowSvcPort := range shadowSvc.Spec.Ports {
if containsPort(svc.Spec.Ports, shadowSvcPort) {
continue
}
s.removeServicePortMapping(svc.Namespace, svc.Name, shadowSvcPort)
}
}
func (s *ShadowServiceManager) removeServicePortMapping(namespace, name string, svcPort corev1.ServicePort) {
// Nothing to do here as there is no port table for HTTP ports.
if svcPort.TargetPort.IntVal <= s.maxHTTPPort {
return
}
switch svcPort.Protocol {
case corev1.ProtocolTCP:
if _, err := s.tcpStateTable.Remove(namespace, name, svcPort.Port); err != nil {
s.logger.Warnf("Unable to remove TCP port mapping for %s/%s on port %d", namespace, name, svcPort.Port)
}
case corev1.ProtocolUDP:
if _, err := s.udpStateTable.Remove(namespace, name, svcPort.Port); err != nil {
s.logger.Warnf("Unable to remove UDP port mapping for %s/%s on port %d", namespace, name, svcPort.Port)
}
}
}
// getShadowServiceName returns the shadow service shadowSvcName corresponding to the given service shadowSvcName and namespace.
func (s *ShadowServiceManager) getShadowServiceName(namespace, name string) string {
return fmt.Sprintf("%s-%s-6d61657368-%s", s.namespace, name, namespace)
}
func (s *ShadowServiceManager) getShadowServicePorts(svc *corev1.Service) ([]corev1.ServicePort, error) {
var ports []corev1.ServicePort
trafficType, err := annotations.GetTrafficType(s.defaultTrafficType, svc.Annotations)
if err != nil {
return nil, fmt.Errorf("unable to get service traffic-type: %w", err)
}
for i, sp := range svc.Spec.Ports {
if !isPortSuitable(trafficType, sp) {
s.logger.Warnf("Unsupported port type %q on %q service %s/%s, skipping port %q", sp.Protocol, trafficType, svc.Namespace, svc.Name, sp.Name)
continue
}
targetPort, err := s.getTargetPort(trafficType, i, svc.Name, svc.Namespace, sp.Port)
if err != nil {
s.logger.Errorf("Unable to find available %s port: %v, skipping port %s on service %s/%s", sp.Name, err, sp.Name, svc.Namespace, svc.Name)
continue
}
ports = append(ports, corev1.ServicePort{
Name: sp.Name,
Port: sp.Port,
Protocol: sp.Protocol,
TargetPort: intstr.FromInt(int(targetPort)),
})
}
return ports, nil
}
func (s *ShadowServiceManager) getTargetPort(trafficType string, portID int, name, namespace string, port int32) (int32, error) {
switch trafficType {
case annotations.ServiceTypeHTTP:
return s.getHTTPPort(portID)
case annotations.ServiceTypeTCP:
mappedPort, err := s.getMappedPort(s.tcpStateTable, name, namespace, port)
if err != nil {
return 0, fmt.Errorf("unable to map TCP service port: %w", err)
}
return mappedPort, nil
case annotations.ServiceTypeUDP:
mappedPort, err := s.getMappedPort(s.udpStateTable, name, namespace, port)
if err != nil {
return 0, fmt.Errorf("unable to map UDP service port: %w", err)
}
return mappedPort, nil
default:
return 0, errors.New("unknown service mode")
}
}
// getHTTPPort returns the HTTP port associated with the given portID.
func (s *ShadowServiceManager) getHTTPPort(portID int) (int32, error) {
if s.minHTTPPort+int32(portID) > s.maxHTTPPort {
return 0, errors.New("unable to find an available HTTP port")
}
return s.minHTTPPort + int32(portID), nil
}
// getMappedPort returns the port associated with the given service information in the given port mapper.
func (s *ShadowServiceManager) getMappedPort(stateTable PortMapper, name, namespace string, port int32) (int32, error) {
if mappedPort, ok := stateTable.Find(namespace, name, port); ok {
return mappedPort, nil
}
s.logger.Debugf("No match found for %s/%s %d - Add a new port", namespace, name, port)
mappedPort, err := stateTable.Add(namespace, name, port)
if err != nil {
return 0, fmt.Errorf("unable to add service port to the state table: %w", err)
}
s.logger.Debugf("Service %s/%s %d has been assigned port %d", namespace, name, port, mappedPort)
return mappedPort, nil
}
func isPortSuitable(trafficType string, sp corev1.ServicePort) bool {
if trafficType == annotations.ServiceTypeUDP {
return sp.Protocol == corev1.ProtocolUDP
}
if trafficType == annotations.ServiceTypeTCP || trafficType == annotations.ServiceTypeHTTP {
return sp.Protocol == corev1.ProtocolTCP
}
return false
}
// containsPort returns true if a service port with the same port and protocol value exist in the given port list, false otherwise.
func containsPort(ports []corev1.ServicePort, port corev1.ServicePort) bool {
for _, onePort := range ports {
if onePort.Port == port.Port && onePort.Protocol == port.Protocol {
return true
}
}
return false
}
// needsCleanup returns true if the given shadow service port have to be cleaned up, false otherwise.
func needsCleanup(ports []corev1.ServicePort, port corev1.ServicePort) bool {
for _, onePort := range ports {
if onePort.Port == port.Port && onePort.Protocol == port.Protocol && onePort.TargetPort == port.TargetPort {
return false
}
}
return true
}