Files
Patrick Hofmann e7e1a289df MQTT Support / Home Assistant Integration (#1195)
* MQTT support

* feat(mqtt): redesign MQTT settings UI with improved UX

Restructure the MQTT settings page for clarity and usability:

UI Structure:
- Organize settings into logical sections (Auth, Home Assistant, Advanced)
- Use progressive disclosure for port (Auto/Custom) and base topic (Default/Custom)
- Move connection status badge into page header
- Conditionally show HDD debounce only when ATX extension is active
- Add inline validation for required broker field

Connection & Error Handling:
- Add test-then-save flow: Save & Reconnect validates connectivity before persisting
- Add standalone Test Connection button for dry-run validation
- Add testMqttConnection RPC with 5s timeout (no retry, no side effects)
- Surface friendly i18n-ready error messages for common failures (auth, timeout, TLS, DNS)
- Track last connection error on MQTTManager for status reporting

Copy:
- Rewrite all descriptions for clarity and brevity
- Use benefit-oriented, active-voice microcopy throughout

---------

Co-authored-by: Adam Shiervani <adam.shiervani@gmail.com>
2026-03-24 13:49:07 +01:00

451 lines
12 KiB
Go

package kvm
import (
"crypto/tls"
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/gwatts/rootcerts"
"github.com/jetkvm/kvm/internal/logging"
"github.com/jetkvm/kvm/internal/sync"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var mqttLogger = logging.GetSubsystemLogger("mqtt")
// publishTimeout is the maximum time to wait for a single MQTT publish to complete.
const publishTimeout = 5 * time.Second
type MQTTConfig struct {
Enabled bool `json:"enabled"`
Broker string `json:"broker"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
BaseTopic string `json:"base_topic"`
UseTLS bool `json:"use_tls"`
TLSInsecure bool `json:"tls_insecure"`
EnableHADiscovery bool `json:"enable_ha_discovery"`
EnableActions bool `json:"enable_actions"`
DebounceMs int `json:"debounce_ms"`
}
var mqttManager *MQTTManager
type MQTTManager struct {
client mqtt.Client
deviceID string
baseTopic string
connected atomic.Bool
lastError atomic.Value // stores string; cleared on successful connect
updateRequested atomic.Bool // set when an update is triggered via MQTT, cleared when OTA finishes
debounceMs int
done chan struct{} // closed on Close() to stop background goroutines
// Debounce state for ATX HDD LED OFF transitions.
// When the HDD LED turns off, publishing is delayed by debounceMs.
// If it turns back on within that window, the OFF is suppressed,
// keeping the published state as ON during rapid flickering.
atxDebounceMu sync.Mutex
atxDebounceTimer *time.Timer
atxLastPublished *ATXState
// Cached virtual media options to avoid redundant discovery republishes.
lastVMOptions []string
// Cached update state to avoid calling getUpdateStatus on every tick.
lastUpdateCheck time.Time
lastUpdatePayload *mqttUpdateState
}
type mqttStatusPayload struct {
Online bool `json:"online"`
}
// topic returns a fully qualified topic string using baseTopic.
func (m *MQTTManager) topic(parts ...string) string {
return m.baseTopic + "/" + strings.Join(parts, "/")
}
// validateBaseTopic checks that the base topic does not contain MQTT wildcards or invalid characters.
func validateBaseTopic(topic string) error {
if strings.ContainsAny(topic, "+#") {
return fmt.Errorf("base topic must not contain MQTT wildcards (+ or #)")
}
if strings.Contains(topic, " ") {
return fmt.Errorf("base topic must not contain spaces")
}
if topic == "" {
return fmt.Errorf("base topic must not be empty")
}
return nil
}
func NewMQTTManager(cfg *MQTTConfig, deviceID string) (*MQTTManager, error) {
if cfg == nil || !cfg.Enabled {
return nil, fmt.Errorf("MQTT is not enabled")
}
baseTopic := cfg.BaseTopic
if baseTopic == "" {
baseTopic = "jetkvm"
}
if err := validateBaseTopic(baseTopic); err != nil {
return nil, err
}
// Ensure baseTopic includes deviceID
if !strings.Contains(baseTopic, deviceID) {
baseTopic = baseTopic + "/" + deviceID
}
m := &MQTTManager{
deviceID: deviceID,
baseTopic: baseTopic,
debounceMs: cfg.DebounceMs,
done: make(chan struct{}),
}
scheme := "tcp"
port := cfg.Port
if port == 0 {
port = 1883
}
if cfg.UseTLS {
scheme = "ssl"
}
brokerURL := fmt.Sprintf("%s://%s:%d", scheme, cfg.Broker, port)
opts := mqtt.NewClientOptions()
opts.AddBroker(brokerURL)
opts.SetClientID(fmt.Sprintf("jetkvm-%s", deviceID))
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetCleanSession(false)
opts.SetConnectRetryInterval(10 * time.Second)
opts.SetConnectTimeout(10 * time.Second)
if cfg.UseTLS {
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.TLSInsecure, //nolint:gosec
}
if !cfg.TLSInsecure {
tlsConfig.RootCAs = rootcerts.ServerCertPool()
}
opts.SetTLSConfig(tlsConfig)
}
// Will message: offline status
willPayload, _ := json.Marshal(mqttStatusPayload{Online: false})
opts.SetWill(m.topic("status"), string(willPayload), 1, true)
opts.OnConnect = m.onConnect
opts.OnConnectionLost = m.onConnectionLost
m.client = mqtt.NewClient(opts)
// Connect in the background — with ConnectRetry(true) this will keep
// retrying automatically without blocking the caller (startup).
token := m.client.Connect()
go func() {
token.Wait()
if token.Error() != nil {
m.setLastError(token.Error())
mqttLogger.Warn().Err(token.Error()).Str("broker", brokerURL).Msg("initial MQTT connection attempt failed, will retry")
}
}()
return m, nil
}
func (m *MQTTManager) setLastError(err error) {
if err != nil {
m.lastError.Store(err.Error())
}
}
func (m *MQTTManager) clearLastError() {
m.lastError.Store("")
}
// LastError returns the last connection error, or empty string if none.
func (m *MQTTManager) LastError() string {
v := m.lastError.Load()
if v == nil {
return ""
}
return v.(string)
}
func (m *MQTTManager) onConnect(client mqtt.Client) {
mqttLogger.Info().Str("deviceID", m.deviceID).Msg("connected to MQTT broker")
m.connected.Store(true)
m.clearLastError()
// Publish online status
m.publish(m.topic("status"), mqttStatusPayload{Online: true}, true)
// Publish Home Assistant discovery configs if enabled
if config.MqttConfig != nil && config.MqttConfig.EnableHADiscovery {
m.publishHADiscovery()
}
// Subscribe to command topics
m.subscribeCommands()
// Immediately publish all current states so Home Assistant knows
// the current state of all switches and sensors right away.
if config.ActiveExtension == "atx-power" {
m.publishATXState(ATXState{
Power: ledPWRState.Load(),
HDD: ledHDDState.Load(),
})
}
if config.ActiveExtension == "dc-power" {
m.publishDCState(getDCState())
}
m.publishExtendedStates()
}
func (m *MQTTManager) onConnectionLost(client mqtt.Client, err error) {
mqttLogger.Warn().Err(err).Msg("MQTT connection lost")
m.connected.Store(false)
m.setLastError(err)
}
// IsConnected returns the current connection state.
func (m *MQTTManager) IsConnected() bool {
return m.connected.Load() && m.client.IsConnected()
}
// Close disconnects from the MQTT broker gracefully and stops background goroutines.
func (m *MQTTManager) Close() {
// Signal all background goroutines to stop.
close(m.done)
m.atxDebounceMu.Lock()
m.cancelATXDebounceTimerLocked()
m.atxDebounceMu.Unlock()
if m.client != nil && m.client.IsConnected() {
m.publish(m.topic("status"), mqttStatusPayload{Online: false}, true)
m.client.Disconnect(500)
}
m.connected.Store(false)
}
// publish marshals the payload to JSON and publishes to the topic.
func (m *MQTTManager) publish(topic string, payload interface{}, retained bool) {
data, err := json.Marshal(payload)
if err != nil {
mqttLogger.Error().Err(err).Str("topic", topic).Msg("failed to marshal MQTT payload")
return
}
token := m.client.Publish(topic, 1, retained, data)
if !token.WaitTimeout(publishTimeout) {
mqttLogger.Warn().Str("topic", topic).Msg("MQTT publish timed out")
return
}
if token.Error() != nil {
mqttLogger.Error().Err(token.Error()).Str("topic", topic).Msg("failed to publish MQTT message")
}
}
// publishString publishes a raw string payload.
func (m *MQTTManager) publishString(topic string, payload string, retained bool) {
token := m.client.Publish(topic, 1, retained, payload)
if !token.WaitTimeout(publishTimeout) {
mqttLogger.Warn().Str("topic", topic).Msg("MQTT publish timed out")
return
}
if token.Error() != nil {
mqttLogger.Error().Err(token.Error()).Str("topic", topic).Msg("failed to publish MQTT message")
}
}
// actionsAllowed checks if MQTT actions are enabled in the config.
func (m *MQTTManager) actionsAllowed() bool {
return config.MqttConfig != nil && config.MqttConfig.EnableActions
}
// --- JSON-RPC Handlers ---
type MQTTStatusResponse struct {
Connected bool `json:"connected"`
Error string `json:"error,omitempty"`
}
const mqttPasswordMask = "********"
func rpcGetMqttSettings() (*MQTTConfig, error) {
cfg := config.MqttConfig
if cfg == nil {
return &MQTTConfig{}, nil
}
// Return a copy with the password masked to avoid leaking credentials.
masked := *cfg
if masked.Password != "" {
masked.Password = mqttPasswordMask
}
return &masked, nil
}
func rpcSetMqttSettings(settings MQTTConfig) error {
if settings.Enabled && settings.Broker == "" {
return fmt.Errorf("broker address is required when MQTT is enabled")
}
if settings.Port < 1 || settings.Port > 65535 {
return fmt.Errorf("port must be between 1 and 65535")
}
if settings.BaseTopic == "" {
settings.BaseTopic = "jetkvm"
}
if err := validateBaseTopic(settings.BaseTopic); err != nil {
return err
}
oldConfig := config.MqttConfig
// If the password is the mask placeholder, preserve the existing password.
if settings.Password == mqttPasswordMask && oldConfig != nil {
settings.Password = oldConfig.Password
}
// Cleanup before applying new settings
if mqttManager != nil && mqttManager.IsConnected() {
wasEnabled := oldConfig != nil && oldConfig.Enabled
wasHADiscovery := oldConfig != nil && oldConfig.EnableHADiscovery
// If MQTT is being disabled, clean up all topics and discovery entries
if wasEnabled && !settings.Enabled {
mqttManager.cleanupAllTopics()
} else if wasHADiscovery && !settings.EnableHADiscovery {
// If only HA Discovery is being disabled, remove discovery entries
mqttManager.removeAllDiscovery()
}
}
if settings.DebounceMs < 0 {
settings.DebounceMs = 0
}
cfg := settings
config.MqttConfig = &cfg
if err := SaveConfig(); err != nil {
return fmt.Errorf("failed to save config: %w", err)
}
// Reconnect MQTT
restartMQTT()
return nil
}
func rpcGetMqttStatus() (MQTTStatusResponse, error) {
connected := false
lastError := ""
if mqttManager != nil {
connected = mqttManager.IsConnected()
lastError = mqttManager.LastError()
}
return MQTTStatusResponse{Connected: connected, Error: lastError}, nil
}
// testMqttConnectionTimeout is the maximum time to wait for a test connection attempt.
const testMqttConnectionTimeout = 5 * time.Second
type MQTTTestResult struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
func rpcTestMqttConnection(settings MQTTConfig) (MQTTTestResult, error) {
if settings.Broker == "" {
return MQTTTestResult{Error: "broker address is required"}, nil
}
// If the password is the mask placeholder, use the existing password.
if settings.Password == mqttPasswordMask && config.MqttConfig != nil {
settings.Password = config.MqttConfig.Password
}
scheme := "tcp"
port := settings.Port
if port == 0 {
port = 1883
}
if settings.UseTLS {
scheme = "ssl"
}
brokerURL := fmt.Sprintf("%s://%s:%d", scheme, settings.Broker, port)
opts := mqtt.NewClientOptions()
opts.AddBroker(brokerURL)
opts.SetClientID(fmt.Sprintf("jetkvm-%s-test", GetDeviceID()))
opts.SetUsername(settings.Username)
opts.SetPassword(settings.Password)
opts.SetAutoReconnect(false)
opts.SetConnectRetry(false)
opts.SetConnectTimeout(testMqttConnectionTimeout)
if settings.UseTLS {
tlsConfig := &tls.Config{
InsecureSkipVerify: settings.TLSInsecure, //nolint:gosec
}
if !settings.TLSInsecure {
tlsConfig.RootCAs = rootcerts.ServerCertPool()
}
opts.SetTLSConfig(tlsConfig)
}
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()
if err := token.Error(); err != nil {
return MQTTTestResult{Error: err.Error()}, nil
}
client.Disconnect(250)
return MQTTTestResult{Success: true}, nil
}
// restartMQTT stops the existing MQTT connection and starts a new one if enabled.
func restartMQTT() {
if mqttManager != nil {
mqttManager.Close()
mqttManager = nil
}
startMQTT()
}
func startMQTT() {
if config.MqttConfig == nil || !config.MqttConfig.Enabled {
mqttLogger.Info().Msg("MQTT is disabled")
return
}
var err error
mqttManager, err = NewMQTTManager(config.MqttConfig, GetDeviceID())
if err != nil {
mqttLogger.Warn().Err(err).Msg("failed to start MQTT")
return
}
mqttManager.startPeriodicStatusUpdates(15 * time.Second)
mqttLogger.Info().Msg("MQTT started")
}
// initMQTT initializes MQTT if enabled in config. Called from main.go.
func initMQTT() {
startMQTT()
}