Fix USB/IP control status updates

This commit is contained in:
世界
2026-04-24 07:56:16 +08:00
parent d819cf33f3
commit 42339bdff1
3 changed files with 107 additions and 15 deletions
+78
View File
@@ -1249,6 +1249,84 @@ func TestServerDispatchConnHandlesControlPingAndChanged(t *testing.T) {
require.Equal(t, uint64(1), delta.Sequence)
}
func TestServerReconcileBroadcastsStatusOnlyDeviceDelta(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
device := newTestDevice("1-1", 0x1d6b, 0x0002, "serial-1", SpeedHigh)
store := newTestDeviceStore(device)
store.setStatus("1-1", usbipStatusUsed)
serverOps := newTestUSBIPOps(t)
serverOps.listUSBDevices = store.listUSBDevices
serverOps.readUsbipStatus = store.readUsbipStatus
serverOps.readSysfsDevice = store.readSysfsDevice
server := &ServerService{
ctx: ctx,
cancel: cancel,
logger: newTestLogger(),
matches: []option.USBIPDeviceMatch{{BusID: "1-1"}},
exports: map[string]serverExport{"1-1": {busid: "1-1"}},
controlSubs: make(map[uint64]*serverControlConn),
ops: serverOps,
}
server.refreshControlState()
serverAddr, closeServer := startDispatchServer(t, server)
defer closeServer()
conn, err := net.Dial("tcp", serverAddr.String())
require.NoError(t, err)
defer conn.Close()
setConnDeadline(t, conn)
require.NoError(t, WriteControlPreface(conn))
require.NoError(t, WriteControlHello(conn))
ack, err := ReadControlFrame(conn)
require.NoError(t, err)
require.Equal(t, controlFrameAck, ack.Type)
snapshotMessage, err := readControlMessage(conn)
require.NoError(t, err)
require.Equal(t, controlFrameDeviceSnapshot, snapshotMessage.Frame.Type)
var snapshot controlDeviceSnapshot
require.NoError(t, unmarshalControlPayload(snapshotMessage.Payload, &snapshot))
require.Len(t, snapshot.Devices, 1)
require.Equal(t, "1-1", snapshot.Devices[0].BusID)
require.Equal(t, deviceStateBusy, snapshot.Devices[0].State)
require.Equal(t, usbipStatusUsed, snapshot.Devices[0].StatusCode)
store.setStatus("1-1", usbipStatusAvailable)
require.NoError(t, server.reconcileAndBroadcast(true))
changed, err := readControlMessage(conn)
require.NoError(t, err)
require.Equal(t, controlFrameDeviceDelta, changed.Frame.Type)
require.Equal(t, uint64(1), changed.Frame.Sequence)
var delta controlDeviceDelta
require.NoError(t, unmarshalControlPayload(changed.Payload, &delta))
require.Equal(t, uint64(1), delta.Sequence)
require.Empty(t, delta.Added)
require.Empty(t, delta.Removed)
require.Len(t, delta.Updated, 1)
require.Equal(t, "1-1", delta.Updated[0].BusID)
require.Equal(t, deviceStateAvailable, delta.Updated[0].State)
require.Equal(t, usbipStatusAvailable, delta.Updated[0].StatusCode)
sequence := server.currentControlSequence()
require.NoError(t, server.reconcileAndBroadcast(true))
require.Equal(t, sequence, server.currentControlSequence())
require.NoError(t, conn.SetReadDeadline(time.Now().Add(100*time.Millisecond)))
_, err = readControlMessage(conn)
require.Error(t, err)
var netErr net.Error
require.ErrorAs(t, err, &netErr)
require.True(t, netErr.Timeout())
}
func TestServerControlLeaseEnablesImportExt(t *testing.T) {
t.Parallel()
+28 -14
View File
@@ -269,14 +269,15 @@ func (s *ServerService) reconcileAndBroadcast(notify bool) error {
s.reconcileMu.Lock()
defer s.reconcileMu.Unlock()
changed, err := s.reconcileExports()
if err != nil {
if _, err := s.reconcileExports(); err != nil {
return err
}
if notify && changed {
s.broadcastChanged()
nextState := deviceInfoV2Map(s.buildDeviceStateV2())
if notify {
s.broadcastControlState(nextState, false)
} else {
s.refreshControlState()
s.setControlState(nextState)
}
return nil
}
@@ -658,13 +659,20 @@ func (s *ServerService) closeControlSubscribers() {
}
func (s *ServerService) broadcastChanged() {
devices := s.buildDeviceStateV2()
nextState := deviceInfoV2Map(devices)
s.broadcastControlState(deviceInfoV2Map(s.buildDeviceStateV2()), true)
}
func (s *ServerService) broadcastControlState(nextState map[string]DeviceInfoV2, force bool) bool {
s.controlMu.Lock()
s.controlSeq++
nextSequence := s.controlSeq + 1
delta := buildControlDeviceDelta(nextSequence, s.controlState, nextState)
if !force && controlDeviceDeltaEmpty(delta) {
s.controlState = nextState
s.controlMu.Unlock()
return false
}
s.controlSeq = nextSequence
sequence := s.controlSeq
delta := buildControlDeviceDelta(sequence, s.controlState, nextState)
s.controlState = nextState
subs := make([]*serverControlConn, 0, len(s.controlSubs))
for _, sub := range s.controlSubs {
@@ -688,6 +696,11 @@ func (s *ServerService) broadcastChanged() {
}
s.enqueueControlFrame(sub, frame)
}
return true
}
func controlDeviceDeltaEmpty(delta controlDeviceDelta) bool {
return len(delta.Added) == 0 && len(delta.Updated) == 0 && len(delta.Removed) == 0
}
func (s *ServerService) enqueueControlFrame(sub *serverControlConn, frame controlFrame) {
@@ -705,9 +718,7 @@ func (s *ServerService) enqueueControlPayload(sub *serverControlConn, frame cont
func (s *ServerService) enqueueControlSnapshot(sub *serverControlConn, sequence uint64) {
devices := s.buildDeviceStateV2()
s.controlMu.Lock()
s.controlState = deviceInfoV2Map(devices)
s.controlMu.Unlock()
s.setControlState(deviceInfoV2Map(devices))
s.enqueueControlPayload(sub, controlFrame{
Type: controlFrameDeviceSnapshot,
Version: controlProtocolVersion,
@@ -729,9 +740,12 @@ func (s *ServerService) enqueueControlMessage(sub *serverControlConn, message co
}
func (s *ServerService) refreshControlState() {
devices := s.buildDeviceStateV2()
s.setControlState(deviceInfoV2Map(s.buildDeviceStateV2()))
}
func (s *ServerService) setControlState(nextState map[string]DeviceInfoV2) {
s.controlMu.Lock()
s.controlState = deviceInfoV2Map(devices)
s.controlState = nextState
s.controlMu.Unlock()
}
+1 -1
View File
@@ -543,7 +543,7 @@ box_usbhost_controller_t *box_usbhost_controller_create(uintptr_t ref, uint8_t p
| IOUSBHostCIMessageControlValid
| (port << IOUSBHostCIPortCapabilitiesMessageControlPortNumberPhase),
.data0 = (125 << IOUSBHostCIPortCapabilitiesMessageData0MaxPowerPhase),
.data1 = speed,
.data1 = 0,
};
[capabilities appendBytes:&port_capabilities length:sizeof(port_capabilities)];
}