From 6419d049a217c35e48d269e5b3ddc049b69569a3 Mon Sep 17 00:00:00 2001 From: Adam Shiervani Date: Mon, 18 May 2026 10:22:15 +0200 Subject: [PATCH] Fix webrtc session shutdown race (#1468) --- ui/e2e/remote-agent/ra-all.spec.ts | 108 +++++++++++++------ webrtc.go | 161 +++++++++++++++++++++-------- 2 files changed, 195 insertions(+), 74 deletions(-) diff --git a/ui/e2e/remote-agent/ra-all.spec.ts b/ui/e2e/remote-agent/ra-all.spec.ts index 8b9216d1..04f2532b 100644 --- a/ui/e2e/remote-agent/ra-all.spec.ts +++ b/ui/e2e/remote-agent/ra-all.spec.ts @@ -35,6 +35,8 @@ import { type KeyboardEvent as RAKeyboardEvent, } from "./remote-agent"; +const USB_WAKE_REMOTE_AGENT_TIMEOUT_MS = 120_000; + /** Run a command on the remote host (the machine whose display is captured by the KVM). */ function remoteHostExec(cmd: string, timeoutMs = 15000): string { const target = process.env.JETKVM_REMOTE_HOST; @@ -73,6 +75,22 @@ function getRemoteHostTtyACM(): string { return remoteHostExec('sh -lc "ls -1 /dev/ttyACM* 2>/dev/null | head -1 || true"', 5000).trim(); } +function enableJetKVMUsbWakeOnRemoteHost(): void { + remoteHostExec( + "for d in /sys/bus/usb/devices/*/; do " + + '[ -f "$d/power/wakeup" ] || continue; ' + + 'vendor=$(cat "$d/idVendor" 2>/dev/null || true); ' + + 'product=$(cat "$d/idProduct" 2>/dev/null || true); ' + + 'if grep -q JetKVM "$d/manufacturer" 2>/dev/null || ' + + 'grep -q JetKVM "$d/product" 2>/dev/null || ' + + '{ [ "$vendor" = "1d6b" ] && [ "$product" = "0104" ]; }; then ' + + 'echo enabled | sudo tee "$d/power/wakeup" > /dev/null; ' + + 'p=$(dirname $(readlink -f "$d"))/power/wakeup; ' + + '[ -f "$p" ] && echo enabled | sudo tee "$p" > /dev/null; ' + + "fi; done", + ); +} + async function waitForRemoteHostTtyACM( shouldExist: boolean, timeoutMs = 15000, @@ -100,6 +118,48 @@ async function waitForRemoteHostTtyACM( ); } +async function remoteAgentHealth(timeoutMs = 2000): Promise { + return Promise.race([ + agent!.health(), + new Promise(resolve => setTimeout(() => resolve(false), timeoutMs)), + ]); +} + +async function waitForRemoteAgentDown(timeoutMs = 30000): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!(await remoteAgentHealth(1000))) return; + await new Promise(r => setTimeout(r, 500)); + } + throw new Error(`Remote agent stayed reachable for ${timeoutMs}ms after S3 suspend request`); +} + +function getRemoteHostDefaultNetworkInfo(): { iface: string; isWifi: boolean; driver: string } { + const output = remoteHostExec( + "iface=$(ip route show default 2>/dev/null | sed -n 's/.* dev \\([^ ]*\\).*/\\1/p' | head -1); " + + 'driver=""; ' + + 'if [ -n "$iface" ]; then ' + + 'driver=$(basename "$(readlink -f "/sys/class/net/$iface/device/driver" 2>/dev/null)" 2>/dev/null || true); ' + + "fi; " + + 'if [ -n "$iface" ] && [ -d "/sys/class/net/$iface/wireless" ]; then kind=wifi; else kind=nonwifi; fi; ' + + 'printf "%s:%s:%s\\n" "$kind" "$iface" "$driver"', + 5000, + ).trim(); + const [kind = "", iface = "", driver = ""] = output.split(":"); + return { iface, isWifi: kind === "wifi", driver }; +} + +function skipUsbWakeIfRemoteHostUsesWifi(): void { + const network = getRemoteHostDefaultNetworkInfo(); + if (!network.isWifi) return; + + test.skip( + true, + `S3 USB wake verification needs a stable remote-agent route after resume; ` + + `${network.iface} is Wi-Fi (${network.driver || "unknown driver"})`, + ); +} + /** * Retry keyboard round-trip (send Space, verify host received it) until success * or timeout. Used after USB rebinds where the HID channel needs time to stabilize. @@ -2719,7 +2779,7 @@ test.describe("Remote Host Agent", () => { // ═══════════════════════════════════════════ test("usb-wake: S3 suspend and wake via HID keypress", async () => { - test.setTimeout(120_000); + test.setTimeout(180_000); // Requires system firmware >= 0.2.8 (f_hid wakeup_on_write kernel patch) const localVersion = (await callJsonRpc(sharedPage, "getLocalVersion")) as { @@ -2743,17 +2803,11 @@ test.describe("Remote Host Agent", () => { test.skip(true, `S3 deep sleep not available (mem_sleep: ${memSleep})`); return; } + skipUsbWakeIfRemoteHostUsesWifi(); // Enable USB wakeup on JetKVM's USB device (and parent hub) try { - remoteHostExec( - "for d in /sys/bus/usb/devices/*/; do " + - 'if grep -q JetKVM "$d/product" 2>/dev/null; then ' + - 'echo enabled | sudo tee "$d/power/wakeup" > /dev/null; ' + - 'p=$(dirname $(readlink -f "$d"))/power/wakeup; ' + - '[ -f "$p" ] && echo enabled | sudo tee "$p" > /dev/null; ' + - "fi; done", - ); + enableJetKVMUsbWakeOnRemoteHost(); } catch { /* best effort */ } @@ -2772,11 +2826,9 @@ test.describe("Remote Host Agent", () => { /* SSH may drop during suspend, that's expected */ } - // Give the host 10s to fully enter S3 - await new Promise(r => setTimeout(r, 10000)); - - // Sanity check: host should be unreachable - expect(await agent!.health(), "Host should be asleep after 10s").toBe(false); + // Wait only until the host is unreachable, then send wake immediately. + // Some hosts wake from other enabled sources before a fixed 10s sleep window. + await waitForRemoteAgentDown(); // Send wake signal via HID (spacebar press+release) await callJsonRpc(sharedPage, "keyboardReport", { @@ -2788,11 +2840,12 @@ test.describe("Remote Host Agent", () => { modifier: 0, }); - // Poll until host wakes up (remote agent responds again) - const wakeDeadline = Date.now() + 30000; + // Poll until remote-agent responds again. This verifies both wake and the + // post-resume input path used by the rest of the suite. + const wakeDeadline = Date.now() + USB_WAKE_REMOTE_AGENT_TIMEOUT_MS; let hostUp = false; while (Date.now() < wakeDeadline) { - if (await agent!.health()) { + if (await remoteAgentHealth()) { hostUp = true; break; } @@ -2811,7 +2864,7 @@ test.describe("Remote Host Agent", () => { } await new Promise(r => setTimeout(r, 2000)); } - expect(hostUp, "Host should wake from S3 after HID wake signal").toBe(true); + expect(hostUp, "Remote agent should be reachable after S3 HID wake").toBe(true); // Wait for video stream to recover after host resume await new Promise(r => setTimeout(r, 5000)); @@ -2828,7 +2881,7 @@ test.describe("Remote Host Agent", () => { }); test("usb-wake: S3 suspend and wake via mouse input", async () => { - test.setTimeout(120_000); + test.setTimeout(180_000); const localVersion = (await callJsonRpc(sharedPage, "getLocalVersion")) as { appVersion: string; @@ -2850,15 +2903,11 @@ test.describe("Remote Host Agent", () => { test.skip(true, `S3 deep sleep not available (mem_sleep: ${memSleep})`); return; } + skipUsbWakeIfRemoteHostUsesWifi(); // Enable USB wakeup try { - remoteHostExec( - "for d in /sys/bus/usb/devices/*/; do " + - 'if grep -q JetKVM "$d/product" 2>/dev/null; then ' + - 'echo enabled | sudo tee "$d/power/wakeup" > /dev/null; ' + - "fi; done", - ); + enableJetKVMUsbWakeOnRemoteHost(); } catch { /* best effort */ } @@ -2876,8 +2925,7 @@ test.describe("Remote Host Agent", () => { /* SSH may drop */ } - await new Promise(r => setTimeout(r, 10000)); - expect(await agent!.health(), "Host should be asleep after 10s").toBe(false); + await waitForRemoteAgentDown(); // Wake via relative mouse activity. This exercises the boot-style mouse HID // interface, which is typically a more reliable suspend wake source than @@ -2886,10 +2934,10 @@ test.describe("Remote Host Agent", () => { await callJsonRpc(sharedPage, "relMouseReport", { dx: 0, dy: 0, buttons: 1 }); await callJsonRpc(sharedPage, "relMouseReport", { dx: 0, dy: 0, buttons: 0 }); - const wakeDeadline = Date.now() + 30000; + const wakeDeadline = Date.now() + USB_WAKE_REMOTE_AGENT_TIMEOUT_MS; let hostUp = false; while (Date.now() < wakeDeadline) { - if (await agent!.health()) { + if (await remoteAgentHealth()) { hostUp = true; break; } @@ -2906,7 +2954,7 @@ test.describe("Remote Host Agent", () => { } await new Promise(r => setTimeout(r, 2000)); } - expect(hostUp, "Host should wake from S3 after mouse input").toBe(true); + expect(hostUp, "Remote agent should be reachable after S3 mouse wake").toBe(true); await new Promise(r => setTimeout(r, 5000)); await sharedPage.goto("/", { waitUntil: "networkidle" }); diff --git a/webrtc.go b/webrtc.go index d8ace07c..41f1ac87 100644 --- a/webrtc.go +++ b/webrtc.go @@ -36,10 +36,11 @@ type Session struct { lastKeepAliveArrivalTime time.Time // Track when last keep-alive packet arrived lastTimerResetTime time.Time // Track when auto-release timer was last reset keepAliveJitterLock sync.Mutex // Protect jitter compensation timing state - hidQueueLock sync.Mutex hidQueue []chan hidQueueMessage keysDownStateQueue chan usbgadget.KeysDownState + done chan struct{} + closeOnce sync.Once codecMimeType string } @@ -212,19 +213,48 @@ func (s *Session) ExchangeOffer(offerStr string) (string, error) { } func (s *Session) initQueues() { - s.hidQueueLock.Lock() - defer s.hidQueueLock.Unlock() - s.hidQueue = make([]chan hidQueueMessage, 0) for i := 0; i < 4; i++ { - q := make(chan hidQueueMessage, 256) - s.hidQueue = append(s.hidQueue, q) + s.hidQueue = append(s.hidQueue, make(chan hidQueueMessage, 256)) } } -func (s *Session) handleQueues(index int) { - for msg := range s.hidQueue[index] { - onHidMessage(msg, s) +func (s *Session) handleHidQueue(queue <-chan hidQueueMessage) { + for { + select { + case <-s.done: + return + default: + } + + select { + case <-s.done: + return + case msg := <-queue: + onHidMessage(msg, s) + } + } +} + +func (s *Session) enqueueHidMessage(queueIndex int, msg hidQueueMessage) bool { + if s == nil || s.isClosed() { + return false + } + + if queueIndex >= len(s.hidQueue) || queueIndex < 0 { + return false + } + + queue := s.hidQueue[queueIndex] + if queue == nil { + return false + } + + select { + case queue <- msg: + return true + case <-s.done: + return false } } @@ -232,18 +262,34 @@ const keysDownStateQueueSize = 64 func (s *Session) initKeysDownStateQueue() { // serialise outbound key state reports so unreliable links can't stall input handling - s.keysDownStateQueue = make(chan usbgadget.KeysDownState, keysDownStateQueueSize) - go s.handleKeysDownStateQueue() + queue := make(chan usbgadget.KeysDownState, keysDownStateQueueSize) + s.keysDownStateQueue = queue + go s.handleKeysDownStateQueue(queue) } -func (s *Session) handleKeysDownStateQueue() { - for state := range s.keysDownStateQueue { - s.reportHidRPCKeysDownState(state) +func (s *Session) handleKeysDownStateQueue(queue <-chan usbgadget.KeysDownState) { + for { + select { + case <-s.done: + return + default: + } + + select { + case <-s.done: + return + case state := <-queue: + s.reportHidRPCKeysDownState(state) + } } } func (s *Session) enqueueKeysDownState(state usbgadget.KeysDownState) { - if s == nil || s.keysDownStateQueue == nil { + if s == nil || s.isClosed() { + return + } + + if s.keysDownStateQueue == nil { return } @@ -254,6 +300,34 @@ func (s *Session) enqueueKeysDownState(state usbgadget.KeysDownState) { } } +func (s *Session) enqueueRPCMessage(msg webrtc.DataChannelMessage) bool { + if s == nil || s.rpcQueue == nil || s.isClosed() { + return false + } + + select { + case s.rpcQueue <- msg: + return true + case <-s.done: + return false + } +} + +func (s *Session) isClosed() bool { + select { + case <-s.done: + return true + default: + return false + } +} + +func (s *Session) close() { + s.closeOnce.Do(func() { + close(s.done) + }) +} + func getOnHidMessageHandler(session *Session, scopedLogger *zerolog.Logger, channel string) func(msg webrtc.DataChannelMessage) { return func(msg webrtc.DataChannelMessage) { l := scopedLogger.With(). @@ -284,13 +358,10 @@ func getOnHidMessageHandler(session *Session, scopedLogger *zerolog.Logger, chan queueIndex = 3 } - queue := session.hidQueue[queueIndex] - if queue != nil { - queue <- hidQueueMessage{ - DataChannelMessage: msg, - channel: channel, - } - } else { + if ok := session.enqueueHidMessage(queueIndex, hidQueueMessage{ + DataChannelMessage: msg, + channel: channel, + }); !ok { l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue is nil") return } @@ -354,20 +425,35 @@ func newSession(config SessionConfig) (*Session, error) { return nil, err } - session := &Session{peerConnection: peerConnection} - session.rpcQueue = make(chan webrtc.DataChannelMessage, 256) + session := &Session{ + peerConnection: peerConnection, + done: make(chan struct{}), + rpcQueue: make(chan webrtc.DataChannelMessage, 256), + } session.initQueues() session.initKeysDownStateQueue() + rpcQueue := session.rpcQueue go func() { - for msg := range session.rpcQueue { - // TODO: only use goroutine if the task is asynchronous - go onRPCMessage(msg, session) + for { + select { + case <-session.done: + return + default: + } + + select { + case <-session.done: + return + case msg := <-rpcQueue: + // TODO: only use goroutine if the task is asynchronous + go onRPCMessage(msg, session) + } } }() - for i := 0; i < len(session.hidQueue); i++ { - go session.handleQueues(i) + for _, queue := range session.hidQueue { + go session.handleHidQueue(queue) } peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { @@ -392,7 +478,7 @@ func newSession(config SessionConfig) (*Session, error) { session.RPCChannel = d d.OnMessage(func(msg webrtc.DataChannelMessage) { // Enqueue to ensure ordered processing - session.rpcQueue <- msg + session.enqueueRPCMessage(msg) }) // Wait for channel to be open before sending initial state d.OnOpen(func() { @@ -459,20 +545,7 @@ func newSession(config SessionConfig) (*Session, error) { _ = rpcKeyboardReport(0, keyboardClearStateKeys) currentSession = nil } - // Stop RPC processor - if session.rpcQueue != nil { - close(session.rpcQueue) - session.rpcQueue = nil - } - - // Stop HID RPC processor - for i := 0; i < len(session.hidQueue); i++ { - close(session.hidQueue[i]) - session.hidQueue[i] = nil - } - - close(session.keysDownStateQueue) - session.keysDownStateQueue = nil + session.close() if session.shouldUmountVirtualMedia { if err := rpcUnmountImage(); err != nil {