Fix webrtc session shutdown race (#1468)

This commit is contained in:
Adam Shiervani
2026-05-18 10:22:15 +02:00
committed by GitHub
parent 51e7a95f19
commit 6419d049a2
2 changed files with 195 additions and 74 deletions
+78 -30
View File
@@ -35,6 +35,8 @@ import {
type KeyboardEvent as RAKeyboardEvent,
} from "./remote-agent";
const USB_WAKE_REMOTE_AGENT_TIMEOUT_MS = 120_000;
/** Run a command on the remote host (the machine whose display is captured by the KVM). */
function remoteHostExec(cmd: string, timeoutMs = 15000): string {
const target = process.env.JETKVM_REMOTE_HOST;
@@ -73,6 +75,22 @@ function getRemoteHostTtyACM(): string {
return remoteHostExec('sh -lc "ls -1 /dev/ttyACM* 2>/dev/null | head -1 || true"', 5000).trim();
}
function enableJetKVMUsbWakeOnRemoteHost(): void {
remoteHostExec(
"for d in /sys/bus/usb/devices/*/; do " +
'[ -f "$d/power/wakeup" ] || continue; ' +
'vendor=$(cat "$d/idVendor" 2>/dev/null || true); ' +
'product=$(cat "$d/idProduct" 2>/dev/null || true); ' +
'if grep -q JetKVM "$d/manufacturer" 2>/dev/null || ' +
'grep -q JetKVM "$d/product" 2>/dev/null || ' +
'{ [ "$vendor" = "1d6b" ] && [ "$product" = "0104" ]; }; then ' +
'echo enabled | sudo tee "$d/power/wakeup" > /dev/null; ' +
'p=$(dirname $(readlink -f "$d"))/power/wakeup; ' +
'[ -f "$p" ] && echo enabled | sudo tee "$p" > /dev/null; ' +
"fi; done",
);
}
async function waitForRemoteHostTtyACM(
shouldExist: boolean,
timeoutMs = 15000,
@@ -100,6 +118,48 @@ async function waitForRemoteHostTtyACM(
);
}
async function remoteAgentHealth(timeoutMs = 2000): Promise<boolean> {
return Promise.race([
agent!.health(),
new Promise<boolean>(resolve => setTimeout(() => resolve(false), timeoutMs)),
]);
}
async function waitForRemoteAgentDown(timeoutMs = 30000): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (!(await remoteAgentHealth(1000))) return;
await new Promise(r => setTimeout(r, 500));
}
throw new Error(`Remote agent stayed reachable for ${timeoutMs}ms after S3 suspend request`);
}
function getRemoteHostDefaultNetworkInfo(): { iface: string; isWifi: boolean; driver: string } {
const output = remoteHostExec(
"iface=$(ip route show default 2>/dev/null | sed -n 's/.* dev \\([^ ]*\\).*/\\1/p' | head -1); " +
'driver=""; ' +
'if [ -n "$iface" ]; then ' +
'driver=$(basename "$(readlink -f "/sys/class/net/$iface/device/driver" 2>/dev/null)" 2>/dev/null || true); ' +
"fi; " +
'if [ -n "$iface" ] && [ -d "/sys/class/net/$iface/wireless" ]; then kind=wifi; else kind=nonwifi; fi; ' +
'printf "%s:%s:%s\\n" "$kind" "$iface" "$driver"',
5000,
).trim();
const [kind = "", iface = "", driver = ""] = output.split(":");
return { iface, isWifi: kind === "wifi", driver };
}
function skipUsbWakeIfRemoteHostUsesWifi(): void {
const network = getRemoteHostDefaultNetworkInfo();
if (!network.isWifi) return;
test.skip(
true,
`S3 USB wake verification needs a stable remote-agent route after resume; ` +
`${network.iface} is Wi-Fi (${network.driver || "unknown driver"})`,
);
}
/**
* Retry keyboard round-trip (send Space, verify host received it) until success
* or timeout. Used after USB rebinds where the HID channel needs time to stabilize.
@@ -2719,7 +2779,7 @@ test.describe("Remote Host Agent", () => {
// ═══════════════════════════════════════════
test("usb-wake: S3 suspend and wake via HID keypress", async () => {
test.setTimeout(120_000);
test.setTimeout(180_000);
// Requires system firmware >= 0.2.8 (f_hid wakeup_on_write kernel patch)
const localVersion = (await callJsonRpc(sharedPage, "getLocalVersion")) as {
@@ -2743,17 +2803,11 @@ test.describe("Remote Host Agent", () => {
test.skip(true, `S3 deep sleep not available (mem_sleep: ${memSleep})`);
return;
}
skipUsbWakeIfRemoteHostUsesWifi();
// Enable USB wakeup on JetKVM's USB device (and parent hub)
try {
remoteHostExec(
"for d in /sys/bus/usb/devices/*/; do " +
'if grep -q JetKVM "$d/product" 2>/dev/null; then ' +
'echo enabled | sudo tee "$d/power/wakeup" > /dev/null; ' +
'p=$(dirname $(readlink -f "$d"))/power/wakeup; ' +
'[ -f "$p" ] && echo enabled | sudo tee "$p" > /dev/null; ' +
"fi; done",
);
enableJetKVMUsbWakeOnRemoteHost();
} catch {
/* best effort */
}
@@ -2772,11 +2826,9 @@ test.describe("Remote Host Agent", () => {
/* SSH may drop during suspend, that's expected */
}
// Give the host 10s to fully enter S3
await new Promise(r => setTimeout(r, 10000));
// Sanity check: host should be unreachable
expect(await agent!.health(), "Host should be asleep after 10s").toBe(false);
// Wait only until the host is unreachable, then send wake immediately.
// Some hosts wake from other enabled sources before a fixed 10s sleep window.
await waitForRemoteAgentDown();
// Send wake signal via HID (spacebar press+release)
await callJsonRpc(sharedPage, "keyboardReport", {
@@ -2788,11 +2840,12 @@ test.describe("Remote Host Agent", () => {
modifier: 0,
});
// Poll until host wakes up (remote agent responds again)
const wakeDeadline = Date.now() + 30000;
// Poll until remote-agent responds again. This verifies both wake and the
// post-resume input path used by the rest of the suite.
const wakeDeadline = Date.now() + USB_WAKE_REMOTE_AGENT_TIMEOUT_MS;
let hostUp = false;
while (Date.now() < wakeDeadline) {
if (await agent!.health()) {
if (await remoteAgentHealth()) {
hostUp = true;
break;
}
@@ -2811,7 +2864,7 @@ test.describe("Remote Host Agent", () => {
}
await new Promise(r => setTimeout(r, 2000));
}
expect(hostUp, "Host should wake from S3 after HID wake signal").toBe(true);
expect(hostUp, "Remote agent should be reachable after S3 HID wake").toBe(true);
// Wait for video stream to recover after host resume
await new Promise(r => setTimeout(r, 5000));
@@ -2828,7 +2881,7 @@ test.describe("Remote Host Agent", () => {
});
test("usb-wake: S3 suspend and wake via mouse input", async () => {
test.setTimeout(120_000);
test.setTimeout(180_000);
const localVersion = (await callJsonRpc(sharedPage, "getLocalVersion")) as {
appVersion: string;
@@ -2850,15 +2903,11 @@ test.describe("Remote Host Agent", () => {
test.skip(true, `S3 deep sleep not available (mem_sleep: ${memSleep})`);
return;
}
skipUsbWakeIfRemoteHostUsesWifi();
// Enable USB wakeup
try {
remoteHostExec(
"for d in /sys/bus/usb/devices/*/; do " +
'if grep -q JetKVM "$d/product" 2>/dev/null; then ' +
'echo enabled | sudo tee "$d/power/wakeup" > /dev/null; ' +
"fi; done",
);
enableJetKVMUsbWakeOnRemoteHost();
} catch {
/* best effort */
}
@@ -2876,8 +2925,7 @@ test.describe("Remote Host Agent", () => {
/* SSH may drop */
}
await new Promise(r => setTimeout(r, 10000));
expect(await agent!.health(), "Host should be asleep after 10s").toBe(false);
await waitForRemoteAgentDown();
// Wake via relative mouse activity. This exercises the boot-style mouse HID
// interface, which is typically a more reliable suspend wake source than
@@ -2886,10 +2934,10 @@ test.describe("Remote Host Agent", () => {
await callJsonRpc(sharedPage, "relMouseReport", { dx: 0, dy: 0, buttons: 1 });
await callJsonRpc(sharedPage, "relMouseReport", { dx: 0, dy: 0, buttons: 0 });
const wakeDeadline = Date.now() + 30000;
const wakeDeadline = Date.now() + USB_WAKE_REMOTE_AGENT_TIMEOUT_MS;
let hostUp = false;
while (Date.now() < wakeDeadline) {
if (await agent!.health()) {
if (await remoteAgentHealth()) {
hostUp = true;
break;
}
@@ -2906,7 +2954,7 @@ test.describe("Remote Host Agent", () => {
}
await new Promise(r => setTimeout(r, 2000));
}
expect(hostUp, "Host should wake from S3 after mouse input").toBe(true);
expect(hostUp, "Remote agent should be reachable after S3 mouse wake").toBe(true);
await new Promise(r => setTimeout(r, 5000));
await sharedPage.goto("/", { waitUntil: "networkidle" });
+117 -44
View File
@@ -36,10 +36,11 @@ type Session struct {
lastKeepAliveArrivalTime time.Time // Track when last keep-alive packet arrived
lastTimerResetTime time.Time // Track when auto-release timer was last reset
keepAliveJitterLock sync.Mutex // Protect jitter compensation timing state
hidQueueLock sync.Mutex
hidQueue []chan hidQueueMessage
keysDownStateQueue chan usbgadget.KeysDownState
done chan struct{}
closeOnce sync.Once
codecMimeType string
}
@@ -212,19 +213,48 @@ func (s *Session) ExchangeOffer(offerStr string) (string, error) {
}
func (s *Session) initQueues() {
s.hidQueueLock.Lock()
defer s.hidQueueLock.Unlock()
s.hidQueue = make([]chan hidQueueMessage, 0)
for i := 0; i < 4; i++ {
q := make(chan hidQueueMessage, 256)
s.hidQueue = append(s.hidQueue, q)
s.hidQueue = append(s.hidQueue, make(chan hidQueueMessage, 256))
}
}
func (s *Session) handleQueues(index int) {
for msg := range s.hidQueue[index] {
onHidMessage(msg, s)
func (s *Session) handleHidQueue(queue <-chan hidQueueMessage) {
for {
select {
case <-s.done:
return
default:
}
select {
case <-s.done:
return
case msg := <-queue:
onHidMessage(msg, s)
}
}
}
func (s *Session) enqueueHidMessage(queueIndex int, msg hidQueueMessage) bool {
if s == nil || s.isClosed() {
return false
}
if queueIndex >= len(s.hidQueue) || queueIndex < 0 {
return false
}
queue := s.hidQueue[queueIndex]
if queue == nil {
return false
}
select {
case queue <- msg:
return true
case <-s.done:
return false
}
}
@@ -232,18 +262,34 @@ const keysDownStateQueueSize = 64
func (s *Session) initKeysDownStateQueue() {
// serialise outbound key state reports so unreliable links can't stall input handling
s.keysDownStateQueue = make(chan usbgadget.KeysDownState, keysDownStateQueueSize)
go s.handleKeysDownStateQueue()
queue := make(chan usbgadget.KeysDownState, keysDownStateQueueSize)
s.keysDownStateQueue = queue
go s.handleKeysDownStateQueue(queue)
}
func (s *Session) handleKeysDownStateQueue() {
for state := range s.keysDownStateQueue {
s.reportHidRPCKeysDownState(state)
func (s *Session) handleKeysDownStateQueue(queue <-chan usbgadget.KeysDownState) {
for {
select {
case <-s.done:
return
default:
}
select {
case <-s.done:
return
case state := <-queue:
s.reportHidRPCKeysDownState(state)
}
}
}
func (s *Session) enqueueKeysDownState(state usbgadget.KeysDownState) {
if s == nil || s.keysDownStateQueue == nil {
if s == nil || s.isClosed() {
return
}
if s.keysDownStateQueue == nil {
return
}
@@ -254,6 +300,34 @@ func (s *Session) enqueueKeysDownState(state usbgadget.KeysDownState) {
}
}
func (s *Session) enqueueRPCMessage(msg webrtc.DataChannelMessage) bool {
if s == nil || s.rpcQueue == nil || s.isClosed() {
return false
}
select {
case s.rpcQueue <- msg:
return true
case <-s.done:
return false
}
}
func (s *Session) isClosed() bool {
select {
case <-s.done:
return true
default:
return false
}
}
func (s *Session) close() {
s.closeOnce.Do(func() {
close(s.done)
})
}
func getOnHidMessageHandler(session *Session, scopedLogger *zerolog.Logger, channel string) func(msg webrtc.DataChannelMessage) {
return func(msg webrtc.DataChannelMessage) {
l := scopedLogger.With().
@@ -284,13 +358,10 @@ func getOnHidMessageHandler(session *Session, scopedLogger *zerolog.Logger, chan
queueIndex = 3
}
queue := session.hidQueue[queueIndex]
if queue != nil {
queue <- hidQueueMessage{
DataChannelMessage: msg,
channel: channel,
}
} else {
if ok := session.enqueueHidMessage(queueIndex, hidQueueMessage{
DataChannelMessage: msg,
channel: channel,
}); !ok {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue is nil")
return
}
@@ -354,20 +425,35 @@ func newSession(config SessionConfig) (*Session, error) {
return nil, err
}
session := &Session{peerConnection: peerConnection}
session.rpcQueue = make(chan webrtc.DataChannelMessage, 256)
session := &Session{
peerConnection: peerConnection,
done: make(chan struct{}),
rpcQueue: make(chan webrtc.DataChannelMessage, 256),
}
session.initQueues()
session.initKeysDownStateQueue()
rpcQueue := session.rpcQueue
go func() {
for msg := range session.rpcQueue {
// TODO: only use goroutine if the task is asynchronous
go onRPCMessage(msg, session)
for {
select {
case <-session.done:
return
default:
}
select {
case <-session.done:
return
case msg := <-rpcQueue:
// TODO: only use goroutine if the task is asynchronous
go onRPCMessage(msg, session)
}
}
}()
for i := 0; i < len(session.hidQueue); i++ {
go session.handleQueues(i)
for _, queue := range session.hidQueue {
go session.handleHidQueue(queue)
}
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
@@ -392,7 +478,7 @@ func newSession(config SessionConfig) (*Session, error) {
session.RPCChannel = d
d.OnMessage(func(msg webrtc.DataChannelMessage) {
// Enqueue to ensure ordered processing
session.rpcQueue <- msg
session.enqueueRPCMessage(msg)
})
// Wait for channel to be open before sending initial state
d.OnOpen(func() {
@@ -459,20 +545,7 @@ func newSession(config SessionConfig) (*Session, error) {
_ = rpcKeyboardReport(0, keyboardClearStateKeys)
currentSession = nil
}
// Stop RPC processor
if session.rpcQueue != nil {
close(session.rpcQueue)
session.rpcQueue = nil
}
// Stop HID RPC processor
for i := 0; i < len(session.hidQueue); i++ {
close(session.hidQueue[i])
session.hidQueue[i] = nil
}
close(session.keysDownStateQueue)
session.keysDownStateQueue = nil
session.close()
if session.shouldUmountVirtualMedia {
if err := rpcUnmountImage(); err != nil {