mirror of
https://github.com/jackjackbits/bitchat.git
synced 2026-05-05 20:22:31 +00:00
feat: Implement Request Sync Manager (V2 Sync) (#965)
* feat: Implement Request Sync Manager (V2 Sync) - Add RequestSyncManager to track and attribute sync requests - Update BitchatPacket and BinaryProtocol to support IS_RSR flag (0x10) - Update RequestSyncPacket with new TLV fields (sinceTimestamp, fragmentIdFilter) - Update GossipSyncManager to use unicast sync requests and mark responses as RSR - Update BLEService to enforce timestamp validation for normal packets and exempt valid RSRs - Add documentation for the new sync manager mechanism * fix: Resolve compilation errors in V2 Sync implementation - Remove duplicate restartGossipManager in BLEService - Add missing TransportConfig constants for sync - Add 'sync' log category to BitLogger - Add missing BitLogger import in GossipSyncManager * fix: Update tests for V2 Sync changes - Add requestSyncManager parameter to GossipSyncManager init in tests - Implement getConnectedPeers stub in RecordingDelegate - Remove unused variable warning in SubscriptionRateLimitTests --------- Co-authored-by: a1denvalu3 <> Co-authored-by: jack <212554440+jackjackbits@users.noreply.github.com>
This commit is contained in:
@@ -22,8 +22,9 @@ struct BitchatPacket: Codable {
|
|||||||
var signature: Data?
|
var signature: Data?
|
||||||
var ttl: UInt8
|
var ttl: UInt8
|
||||||
var route: [Data]?
|
var route: [Data]?
|
||||||
|
var isRSR: Bool
|
||||||
|
|
||||||
init(type: UInt8, senderID: Data, recipientID: Data?, timestamp: UInt64, payload: Data, signature: Data?, ttl: UInt8, version: UInt8 = 1, route: [Data]? = nil) {
|
init(type: UInt8, senderID: Data, recipientID: Data?, timestamp: UInt64, payload: Data, signature: Data?, ttl: UInt8, version: UInt8 = 1, route: [Data]? = nil, isRSR: Bool = false) {
|
||||||
self.version = version
|
self.version = version
|
||||||
self.type = type
|
self.type = type
|
||||||
self.senderID = senderID
|
self.senderID = senderID
|
||||||
@@ -33,10 +34,11 @@ struct BitchatPacket: Codable {
|
|||||||
self.signature = signature
|
self.signature = signature
|
||||||
self.ttl = ttl
|
self.ttl = ttl
|
||||||
self.route = route
|
self.route = route
|
||||||
|
self.isRSR = isRSR
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convenience initializer for new binary format
|
// Convenience initializer for new binary format
|
||||||
init(type: UInt8, ttl: UInt8, senderID: PeerID, payload: Data) {
|
init(type: UInt8, ttl: UInt8, senderID: PeerID, payload: Data, isRSR: Bool = false) {
|
||||||
self.version = 1
|
self.version = 1
|
||||||
self.type = type
|
self.type = type
|
||||||
// Convert hex string peer ID to binary data (8 bytes)
|
// Convert hex string peer ID to binary data (8 bytes)
|
||||||
@@ -56,6 +58,7 @@ struct BitchatPacket: Codable {
|
|||||||
self.signature = nil
|
self.signature = nil
|
||||||
self.ttl = ttl
|
self.ttl = ttl
|
||||||
self.route = nil
|
self.route = nil
|
||||||
|
self.isRSR = isRSR
|
||||||
}
|
}
|
||||||
|
|
||||||
var data: Data? {
|
var data: Data? {
|
||||||
@@ -85,7 +88,8 @@ struct BitchatPacket: Codable {
|
|||||||
signature: nil, // Remove signature for signing
|
signature: nil, // Remove signature for signing
|
||||||
ttl: 0, // Use fixed TTL=0 for signing to ensure relay compatibility
|
ttl: 0, // Use fixed TTL=0 for signing to ensure relay compatibility
|
||||||
version: version,
|
version: version,
|
||||||
route: route
|
route: route,
|
||||||
|
isRSR: false // RSR flag is mutable and not part of the signature
|
||||||
)
|
)
|
||||||
return BinaryProtocol.encode(unsignedPacket)
|
return BinaryProtocol.encode(unsignedPacket)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,12 +9,16 @@ struct RequestSyncPacket {
|
|||||||
let m: UInt32
|
let m: UInt32
|
||||||
let data: Data
|
let data: Data
|
||||||
let types: SyncTypeFlags?
|
let types: SyncTypeFlags?
|
||||||
|
let sinceTimestamp: UInt64?
|
||||||
|
let fragmentIdFilter: String?
|
||||||
|
|
||||||
init(p: Int, m: UInt32, data: Data, types: SyncTypeFlags? = nil) {
|
init(p: Int, m: UInt32, data: Data, types: SyncTypeFlags? = nil, sinceTimestamp: UInt64? = nil, fragmentIdFilter: String? = nil) {
|
||||||
self.p = p
|
self.p = p
|
||||||
self.m = m
|
self.m = m
|
||||||
self.data = data
|
self.data = data
|
||||||
self.types = types
|
self.types = types
|
||||||
|
self.sinceTimestamp = sinceTimestamp
|
||||||
|
self.fragmentIdFilter = fragmentIdFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
func encode() -> Data {
|
func encode() -> Data {
|
||||||
@@ -36,15 +40,24 @@ struct RequestSyncPacket {
|
|||||||
if let typesData = types?.toData() {
|
if let typesData = types?.toData() {
|
||||||
putTLV(0x04, typesData)
|
putTLV(0x04, typesData)
|
||||||
}
|
}
|
||||||
|
if let ts = sinceTimestamp {
|
||||||
|
var tsBE = ts.bigEndian
|
||||||
|
putTLV(0x05, withUnsafeBytes(of: &tsBE) { Data($0) })
|
||||||
|
}
|
||||||
|
if let fid = fragmentIdFilter, let fidData = fid.data(using: .utf8) {
|
||||||
|
putTLV(0x06, fidData)
|
||||||
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
static func decode(from data: Data, maxAcceptBytes: Int = 1024) -> RequestSyncPacket? {
|
static func decode(from data: Data, maxAcceptBytes: Int = 1024) -> RequestSyncPacket? {
|
||||||
var off = 0
|
var off = 0
|
||||||
var p: Int? = nil
|
var p: Int? = nil
|
||||||
var m: UInt32? = nil
|
var m: UInt32? = nil
|
||||||
var payload: Data? = nil
|
var payload: Data? = nil
|
||||||
var types: SyncTypeFlags? = nil
|
var types: SyncTypeFlags? = nil
|
||||||
|
var sinceTimestamp: UInt64? = nil
|
||||||
|
var fragmentIdFilter: String? = nil
|
||||||
|
|
||||||
while off + 3 <= data.count {
|
while off + 3 <= data.count {
|
||||||
let t = Int(data[off]); off += 1
|
let t = Int(data[off]); off += 1
|
||||||
@@ -68,12 +81,22 @@ struct RequestSyncPacket {
|
|||||||
if let decoded = SyncTypeFlags.decode(v) {
|
if let decoded = SyncTypeFlags.decode(v) {
|
||||||
types = decoded
|
types = decoded
|
||||||
}
|
}
|
||||||
|
case 0x05:
|
||||||
|
if v.count == 8 {
|
||||||
|
var ts: UInt64 = 0
|
||||||
|
for b in v { ts = (ts << 8) | UInt64(b) }
|
||||||
|
sinceTimestamp = ts
|
||||||
|
}
|
||||||
|
case 0x06:
|
||||||
|
if let fid = String(data: v, encoding: .utf8) {
|
||||||
|
fragmentIdFilter = fid
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
break // forward compatible; ignore unknown TLVs
|
break // forward compatible; ignore unknown TLVs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
guard let pp = p, let mm = m, let dd = payload, pp >= 1, mm > 0 else { return nil }
|
guard let pp = p, let mm = m, let dd = payload, pp >= 1, mm > 0 else { return nil }
|
||||||
return RequestSyncPacket(p: pp, m: mm, data: dd, types: types)
|
return RequestSyncPacket(p: pp, m: mm, data: dd, types: types, sinceTimestamp: sinceTimestamp, fragmentIdFilter: fragmentIdFilter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -138,6 +138,7 @@ struct BinaryProtocol {
|
|||||||
static let hasSignature: UInt8 = 0x02
|
static let hasSignature: UInt8 = 0x02
|
||||||
static let isCompressed: UInt8 = 0x04
|
static let isCompressed: UInt8 = 0x04
|
||||||
static let hasRoute: UInt8 = 0x08
|
static let hasRoute: UInt8 = 0x08
|
||||||
|
static let isRSR: UInt8 = 0x10
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode BitchatPacket to binary format
|
// Encode BitchatPacket to binary format
|
||||||
@@ -204,8 +205,9 @@ struct BinaryProtocol {
|
|||||||
if isCompressed { flags |= Flags.isCompressed }
|
if isCompressed { flags |= Flags.isCompressed }
|
||||||
// HAS_ROUTE is only valid for v2+ packets
|
// HAS_ROUTE is only valid for v2+ packets
|
||||||
if hasRoute && version >= 2 { flags |= Flags.hasRoute }
|
if hasRoute && version >= 2 { flags |= Flags.hasRoute }
|
||||||
|
if packet.isRSR { flags |= Flags.isRSR }
|
||||||
data.append(flags)
|
data.append(flags)
|
||||||
|
|
||||||
if version == 2 {
|
if version == 2 {
|
||||||
let length = UInt32(payloadDataSize)
|
let length = UInt32(payloadDataSize)
|
||||||
for shift in stride(from: 24, through: 0, by: -8) {
|
for shift in stride(from: 24, through: 0, by: -8) {
|
||||||
@@ -329,7 +331,8 @@ struct BinaryProtocol {
|
|||||||
let isCompressed = (flags & Flags.isCompressed) != 0
|
let isCompressed = (flags & Flags.isCompressed) != 0
|
||||||
// HAS_ROUTE is only valid for v2+ packets; ignore the flag for v1
|
// HAS_ROUTE is only valid for v2+ packets; ignore the flag for v1
|
||||||
let hasRoute = (version >= 2) && (flags & Flags.hasRoute) != 0
|
let hasRoute = (version >= 2) && (flags & Flags.hasRoute) != 0
|
||||||
|
let isRSR = (flags & Flags.isRSR) != 0
|
||||||
|
|
||||||
let payloadLength: Int
|
let payloadLength: Int
|
||||||
if version == 2 {
|
if version == 2 {
|
||||||
guard let len = read32() else { return nil }
|
guard let len = read32() else { return nil }
|
||||||
@@ -410,7 +413,8 @@ struct BinaryProtocol {
|
|||||||
signature: signature,
|
signature: signature,
|
||||||
ttl: ttl,
|
ttl: ttl,
|
||||||
version: version,
|
version: version,
|
||||||
route: route
|
route: route,
|
||||||
|
isRSR: isRSR
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -195,6 +195,7 @@ final class BLEService: NSObject {
|
|||||||
|
|
||||||
// MARK: - Gossip Sync
|
// MARK: - Gossip Sync
|
||||||
private var gossipSyncManager: GossipSyncManager?
|
private var gossipSyncManager: GossipSyncManager?
|
||||||
|
private let requestSyncManager = RequestSyncManager()
|
||||||
|
|
||||||
// MARK: - Maintenance Timer
|
// MARK: - Maintenance Timer
|
||||||
|
|
||||||
@@ -328,6 +329,31 @@ final class BLEService: NSObject {
|
|||||||
// Initialize gossip sync manager
|
// Initialize gossip sync manager
|
||||||
restartGossipManager()
|
restartGossipManager()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func restartGossipManager() {
|
||||||
|
// Stop existing
|
||||||
|
gossipSyncManager?.stop()
|
||||||
|
|
||||||
|
let config = GossipSyncManager.Config(
|
||||||
|
seenCapacity: TransportConfig.syncSeenCapacity,
|
||||||
|
gcsMaxBytes: TransportConfig.syncGCSMaxBytes,
|
||||||
|
gcsTargetFpr: TransportConfig.syncGCSTargetFpr,
|
||||||
|
maxMessageAgeSeconds: TransportConfig.syncMaxMessageAgeSeconds,
|
||||||
|
maintenanceIntervalSeconds: TransportConfig.syncMaintenanceIntervalSeconds,
|
||||||
|
stalePeerCleanupIntervalSeconds: TransportConfig.syncStalePeerCleanupIntervalSeconds,
|
||||||
|
stalePeerTimeoutSeconds: TransportConfig.syncStalePeerTimeoutSeconds,
|
||||||
|
fragmentCapacity: TransportConfig.syncFragmentCapacity,
|
||||||
|
fileTransferCapacity: TransportConfig.syncFileTransferCapacity,
|
||||||
|
fragmentSyncIntervalSeconds: TransportConfig.syncFragmentIntervalSeconds,
|
||||||
|
fileTransferSyncIntervalSeconds: TransportConfig.syncFileTransferIntervalSeconds,
|
||||||
|
messageSyncIntervalSeconds: TransportConfig.syncMessageIntervalSeconds
|
||||||
|
)
|
||||||
|
|
||||||
|
let manager = GossipSyncManager(myPeerID: myPeerID, config: config, requestSyncManager: requestSyncManager)
|
||||||
|
manager.delegate = self
|
||||||
|
manager.start()
|
||||||
|
gossipSyncManager = manager
|
||||||
|
}
|
||||||
|
|
||||||
// No advertising policy to set; we never include Local Name in adverts.
|
// No advertising policy to set; we never include Local Name in adverts.
|
||||||
|
|
||||||
@@ -794,6 +820,43 @@ final class BLEService: NSObject {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func validatePacket(_ packet: BitchatPacket, from peerID: PeerID) -> Bool {
|
||||||
|
let currentTime = UInt64(Date().timeIntervalSince1970 * 1000)
|
||||||
|
let messageType = MessageType(rawValue: packet.type)
|
||||||
|
|
||||||
|
// 1. Timestamp Validation (Skipped for valid RSR packets)
|
||||||
|
let isRSR = packet.isRSR
|
||||||
|
// Treat TTL=0 as legacy RSR if not REQUEST_SYNC
|
||||||
|
// (Legacy clients send responses with TTL=0 but no flag)
|
||||||
|
let isLegacyRSR = packet.ttl == 0 && messageType != .requestSync
|
||||||
|
var skipTimestampCheck = false
|
||||||
|
|
||||||
|
if isRSR || isLegacyRSR {
|
||||||
|
// We check both explicit RSR flag AND legacy TTL=0 packets
|
||||||
|
if requestSyncManager.isValidResponse(from: peerID, isRSR: true) {
|
||||||
|
SecureLogger.debug("Valid RSR packet (legacy=\(isLegacyRSR)) from \(peerID.id.prefix(8))… - skipping timestamp check", category: .security)
|
||||||
|
skipTimestampCheck = true
|
||||||
|
} else {
|
||||||
|
SecureLogger.warning("Invalid or unsolicited RSR packet from \(peerID.id.prefix(8))… - rejecting", category: .security)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !skipTimestampCheck {
|
||||||
|
// Enforce timestamp check for normal packets (2 minutes skew)
|
||||||
|
let maxSkew: UInt64 = 120_000 // 2 minutes in ms
|
||||||
|
let packetTime = packet.timestamp
|
||||||
|
let skew = (packetTime > currentTime) ? (packetTime - currentTime) : (currentTime - packetTime)
|
||||||
|
|
||||||
|
if skew > maxSkew {
|
||||||
|
SecureLogger.warning("Packet timestamp skewed by \(skew)ms (max \(maxSkew)ms) from \(peerID.id.prefix(8))…", category: .security)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// MARK: - Packet Broadcasting
|
// MARK: - Packet Broadcasting
|
||||||
|
|
||||||
private func broadcastPacket(_ packet: BitchatPacket, transferId: String? = nil) {
|
private func broadcastPacket(_ packet: BitchatPacket, transferId: String? = nil) {
|
||||||
@@ -1547,6 +1610,12 @@ extension BLEService: GossipSyncManager.Delegate {
|
|||||||
func signPacketForBroadcast(_ packet: BitchatPacket) -> BitchatPacket {
|
func signPacketForBroadcast(_ packet: BitchatPacket) -> BitchatPacket {
|
||||||
return noiseService.signPacket(packet) ?? packet
|
return noiseService.signPacket(packet) ?? packet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getConnectedPeers() -> [PeerID] {
|
||||||
|
return collectionsQueue.sync {
|
||||||
|
peers.values.compactMap { $0.isConnected ? $0.peerID : nil }
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - CBCentralManagerDelegate
|
// MARK: - CBCentralManagerDelegate
|
||||||
@@ -2130,6 +2199,11 @@ extension BLEService: CBPeripheralDelegate {
|
|||||||
SecureLogger.error("❌ Failed to decode assembled notification frame (len=\(frame.count), prefix=\(prefix))", category: .session)
|
SecureLogger.error("❌ Failed to decode assembled notification frame (len=\(frame.count), prefix=\(prefix))", category: .session)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Validate packet (Timestamp/RSR) before processing
|
||||||
|
let senderID = PeerID(hexData: packet.senderID)
|
||||||
|
if !validatePacket(packet, from: senderID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
processNotificationPacket(packet, from: peripheral, peripheralUUID: peripheralUUID)
|
processNotificationPacket(packet, from: peripheral, peripheralUUID: peripheralUUID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2545,6 +2619,12 @@ extension BLEService: CBPeripheralManagerDelegate {
|
|||||||
// Clear buffer on success
|
// Clear buffer on success
|
||||||
pendingWriteBuffers.removeValue(forKey: centralUUID)
|
pendingWriteBuffers.removeValue(forKey: centralUUID)
|
||||||
let senderID = PeerID(hexData: packet.senderID)
|
let senderID = PeerID(hexData: packet.senderID)
|
||||||
|
|
||||||
|
// Validate packet (Timestamp/RSR)
|
||||||
|
if !validatePacket(packet, from: senderID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if packet.type != MessageType.announce.rawValue {
|
if packet.type != MessageType.announce.rawValue {
|
||||||
SecureLogger.debug("📦 Decoded (combined) packet type: \(packet.type) from sender: \(senderID)", category: .session)
|
SecureLogger.debug("📦 Decoded (combined) packet type: \(packet.type) from sender: \(senderID)", category: .session)
|
||||||
}
|
}
|
||||||
@@ -2793,13 +2873,7 @@ extension BLEService {
|
|||||||
meshTopology.reset()
|
meshTopology.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
private func restartGossipManager() {
|
|
||||||
gossipSyncManager?.stop()
|
|
||||||
let sync = GossipSyncManager(myPeerID: myPeerID)
|
|
||||||
sync.delegate = self
|
|
||||||
sync.start()
|
|
||||||
gossipSyncManager = sync
|
|
||||||
}
|
|
||||||
|
|
||||||
private func sendNoisePayload(_ typedPayload: Data, to peerID: PeerID) {
|
private func sendNoisePayload(_ typedPayload: Data, to peerID: PeerID) {
|
||||||
guard noiseService.hasSession(with: peerID) else {
|
guard noiseService.hasSession(with: peerID) else {
|
||||||
@@ -3377,7 +3451,8 @@ extension BLEService {
|
|||||||
signature: nil,
|
signature: nil,
|
||||||
ttl: packet.ttl,
|
ttl: packet.ttl,
|
||||||
version: fragmentVersion,
|
version: fragmentVersion,
|
||||||
route: packet.route
|
route: packet.route,
|
||||||
|
isRSR: packet.isRSR
|
||||||
)
|
)
|
||||||
|
|
||||||
let workItem = DispatchWorkItem { [weak self] in
|
let workItem = DispatchWorkItem { [weak self] in
|
||||||
@@ -3568,9 +3643,16 @@ extension BLEService {
|
|||||||
|
|
||||||
// Decode the original packet bytes we reassembled, so flags/compression are preserved
|
// Decode the original packet bytes we reassembled, so flags/compression are preserved
|
||||||
if var originalPacket = BinaryProtocol.decode(reassembled) {
|
if var originalPacket = BinaryProtocol.decode(reassembled) {
|
||||||
SecureLogger.debug("✅ Reassembled packet id=\(String(format: "%016llx", fragU64)) type=\(originalPacket.type) bytes=\(reassembled.count)", category: .session)
|
|
||||||
originalPacket.ttl = 0
|
// Reassembled packet validation
|
||||||
handleReceivedPacket(originalPacket, from: peerID)
|
let innerSender = PeerID(hexData: originalPacket.senderID)
|
||||||
|
if !validatePacket(originalPacket, from: innerSender) {
|
||||||
|
// Cleanup below
|
||||||
|
} else {
|
||||||
|
SecureLogger.debug("✅ Reassembled packet id=\(String(format: "%016llx", fragU64)) type=\(originalPacket.type) bytes=\(reassembled.count)", category: .session)
|
||||||
|
originalPacket.ttl = 0
|
||||||
|
handleReceivedPacket(originalPacket, from: peerID)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SecureLogger.error("❌ Failed to decode reassembled packet (type=\(originalType), total=\(total))", category: .session)
|
SecureLogger.error("❌ Failed to decode reassembled packet (type=\(originalType), total=\(total))", category: .session)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -212,4 +212,18 @@ enum TransportConfig {
|
|||||||
static let uiShareExtensionDismissDelaySeconds: TimeInterval = 2.0
|
static let uiShareExtensionDismissDelaySeconds: TimeInterval = 2.0
|
||||||
static let uiShareAcceptWindowSeconds: TimeInterval = 30.0
|
static let uiShareAcceptWindowSeconds: TimeInterval = 30.0
|
||||||
static let uiMigrationCutoffSeconds: TimeInterval = 24 * 60 * 60
|
static let uiMigrationCutoffSeconds: TimeInterval = 24 * 60 * 60
|
||||||
|
|
||||||
|
// Gossip Sync Configuration
|
||||||
|
static let syncSeenCapacity: Int = 1000
|
||||||
|
static let syncGCSMaxBytes: Int = 400
|
||||||
|
static let syncGCSTargetFpr: Double = 0.01
|
||||||
|
static let syncMaxMessageAgeSeconds: TimeInterval = 900
|
||||||
|
static let syncMaintenanceIntervalSeconds: TimeInterval = 30.0
|
||||||
|
static let syncStalePeerCleanupIntervalSeconds: TimeInterval = 60.0
|
||||||
|
static let syncStalePeerTimeoutSeconds: TimeInterval = 60.0
|
||||||
|
static let syncFragmentCapacity: Int = 600
|
||||||
|
static let syncFileTransferCapacity: Int = 200
|
||||||
|
static let syncFragmentIntervalSeconds: TimeInterval = 30.0
|
||||||
|
static let syncFileTransferIntervalSeconds: TimeInterval = 60.0
|
||||||
|
static let syncMessageIntervalSeconds: TimeInterval = 15.0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import Foundation
|
import Foundation
|
||||||
|
import BitLogger
|
||||||
|
|
||||||
// Gossip-based sync manager using on-demand GCS filters
|
// Gossip-based sync manager using on-demand GCS filters
|
||||||
final class GossipSyncManager {
|
final class GossipSyncManager {
|
||||||
@@ -6,6 +7,7 @@ final class GossipSyncManager {
|
|||||||
func sendPacket(_ packet: BitchatPacket)
|
func sendPacket(_ packet: BitchatPacket)
|
||||||
func sendPacket(to peerID: PeerID, packet: BitchatPacket)
|
func sendPacket(to peerID: PeerID, packet: BitchatPacket)
|
||||||
func signPacketForBroadcast(_ packet: BitchatPacket) -> BitchatPacket
|
func signPacketForBroadcast(_ packet: BitchatPacket) -> BitchatPacket
|
||||||
|
func getConnectedPeers() -> [PeerID]
|
||||||
}
|
}
|
||||||
|
|
||||||
private struct PacketStore {
|
private struct PacketStore {
|
||||||
@@ -74,6 +76,7 @@ final class GossipSyncManager {
|
|||||||
|
|
||||||
private let myPeerID: PeerID
|
private let myPeerID: PeerID
|
||||||
private let config: Config
|
private let config: Config
|
||||||
|
private let requestSyncManager: RequestSyncManager
|
||||||
weak var delegate: Delegate?
|
weak var delegate: Delegate?
|
||||||
|
|
||||||
// Storage: broadcast packets by type, and latest announce per sender
|
// Storage: broadcast packets by type, and latest announce per sender
|
||||||
@@ -88,9 +91,10 @@ final class GossipSyncManager {
|
|||||||
private var lastStalePeerCleanup: Date = .distantPast
|
private var lastStalePeerCleanup: Date = .distantPast
|
||||||
private var syncSchedules: [SyncSchedule] = []
|
private var syncSchedules: [SyncSchedule] = []
|
||||||
|
|
||||||
init(myPeerID: PeerID, config: Config = Config()) {
|
init(myPeerID: PeerID, config: Config = Config(), requestSyncManager: RequestSyncManager) {
|
||||||
self.myPeerID = myPeerID
|
self.myPeerID = myPeerID
|
||||||
self.config = config
|
self.config = config
|
||||||
|
self.requestSyncManager = requestSyncManager
|
||||||
var schedules: [SyncSchedule] = []
|
var schedules: [SyncSchedule] = []
|
||||||
if config.seenCapacity > 0 && config.messageSyncIntervalSeconds > 0 {
|
if config.seenCapacity > 0 && config.messageSyncIntervalSeconds > 0 {
|
||||||
schedules.append(SyncSchedule(types: .publicMessages, interval: config.messageSyncIntervalSeconds, lastSent: .distantPast))
|
schedules.append(SyncSchedule(types: .publicMessages, interval: config.messageSyncIntervalSeconds, lastSent: .distantPast))
|
||||||
@@ -202,6 +206,19 @@ final class GossipSyncManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func sendPeriodicSync(for types: SyncTypeFlags) {
|
||||||
|
// Unicast sync to connected peers to allow RSR attribution
|
||||||
|
if let connectedPeers = delegate?.getConnectedPeers(), !connectedPeers.isEmpty {
|
||||||
|
SecureLogger.debug("Sending periodic sync to \(connectedPeers.count) connected peers", category: .sync)
|
||||||
|
for peerID in connectedPeers {
|
||||||
|
sendRequestSync(to: peerID, types: types)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Fallback to broadcast (discovery phase)
|
||||||
|
sendRequestSync(for: types)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private func sendRequestSync(for types: SyncTypeFlags) {
|
private func sendRequestSync(for types: SyncTypeFlags) {
|
||||||
let payload = buildGcsPayload(for: types)
|
let payload = buildGcsPayload(for: types)
|
||||||
let pkt = BitchatPacket(
|
let pkt = BitchatPacket(
|
||||||
@@ -218,6 +235,9 @@ final class GossipSyncManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private func sendRequestSync(to peerID: PeerID, types: SyncTypeFlags) {
|
private func sendRequestSync(to peerID: PeerID, types: SyncTypeFlags) {
|
||||||
|
// Register the request for RSR validation
|
||||||
|
requestSyncManager.registerRequest(to: peerID)
|
||||||
|
|
||||||
let payload = buildGcsPayload(for: types)
|
let payload = buildGcsPayload(for: types)
|
||||||
var recipient = Data()
|
var recipient = Data()
|
||||||
var temp = peerID.id
|
var temp = peerID.id
|
||||||
@@ -262,6 +282,7 @@ final class GossipSyncManager {
|
|||||||
if !mightContain(idBytes) {
|
if !mightContain(idBytes) {
|
||||||
var toSend = pkt
|
var toSend = pkt
|
||||||
toSend.ttl = 0
|
toSend.ttl = 0
|
||||||
|
toSend.isRSR = true // Mark as solicited response
|
||||||
delegate?.sendPacket(to: peerID, packet: toSend)
|
delegate?.sendPacket(to: peerID, packet: toSend)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -274,6 +295,7 @@ final class GossipSyncManager {
|
|||||||
if !mightContain(idBytes) {
|
if !mightContain(idBytes) {
|
||||||
var toSend = pkt
|
var toSend = pkt
|
||||||
toSend.ttl = 0
|
toSend.ttl = 0
|
||||||
|
toSend.isRSR = true // Mark as solicited response
|
||||||
delegate?.sendPacket(to: peerID, packet: toSend)
|
delegate?.sendPacket(to: peerID, packet: toSend)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -286,6 +308,7 @@ final class GossipSyncManager {
|
|||||||
if !mightContain(idBytes) {
|
if !mightContain(idBytes) {
|
||||||
var toSend = pkt
|
var toSend = pkt
|
||||||
toSend.ttl = 0
|
toSend.ttl = 0
|
||||||
|
toSend.isRSR = true // Mark as solicited response
|
||||||
delegate?.sendPacket(to: peerID, packet: toSend)
|
delegate?.sendPacket(to: peerID, packet: toSend)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -298,6 +321,7 @@ final class GossipSyncManager {
|
|||||||
if !mightContain(idBytes) {
|
if !mightContain(idBytes) {
|
||||||
var toSend = pkt
|
var toSend = pkt
|
||||||
toSend.ttl = 0
|
toSend.ttl = 0
|
||||||
|
toSend.isRSR = true // Mark as solicited response
|
||||||
delegate?.sendPacket(to: peerID, packet: toSend)
|
delegate?.sendPacket(to: peerID, packet: toSend)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -366,11 +390,13 @@ final class GossipSyncManager {
|
|||||||
private func performPeriodicMaintenance(now: Date = Date()) {
|
private func performPeriodicMaintenance(now: Date = Date()) {
|
||||||
cleanupExpiredMessages()
|
cleanupExpiredMessages()
|
||||||
cleanupStaleAnnouncementsIfNeeded(now: now)
|
cleanupStaleAnnouncementsIfNeeded(now: now)
|
||||||
|
requestSyncManager.cleanup() // Cleanup expired sync requests
|
||||||
|
|
||||||
for index in syncSchedules.indices {
|
for index in syncSchedules.indices {
|
||||||
guard syncSchedules[index].interval > 0 else { continue }
|
guard syncSchedules[index].interval > 0 else { continue }
|
||||||
if syncSchedules[index].lastSent == .distantPast || now.timeIntervalSince(syncSchedules[index].lastSent) >= syncSchedules[index].interval {
|
if syncSchedules[index].lastSent == .distantPast || now.timeIntervalSince(syncSchedules[index].lastSent) >= syncSchedules[index].interval {
|
||||||
syncSchedules[index].lastSent = now
|
syncSchedules[index].lastSent = now
|
||||||
sendRequestSync(for: syncSchedules[index].types)
|
sendPeriodicSync(for: syncSchedules[index].types)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,74 @@
|
|||||||
|
//
|
||||||
|
// RequestSyncManager.swift
|
||||||
|
// bitchat
|
||||||
|
//
|
||||||
|
// This is free and unencumbered software released into the public domain.
|
||||||
|
// For more information, see <https://unlicense.org>
|
||||||
|
//
|
||||||
|
|
||||||
|
import Foundation
|
||||||
|
import BitLogger
|
||||||
|
|
||||||
|
/// Manages outgoing sync requests and validates incoming responses.
|
||||||
|
///
|
||||||
|
/// Allows attributing RSR (Request-Sync Response) packets to specific peers
|
||||||
|
/// that we have actively requested sync from.
|
||||||
|
final class RequestSyncManager {
|
||||||
|
|
||||||
|
private let queue = DispatchQueue(label: "request.sync.manager", attributes: .concurrent)
|
||||||
|
private var pendingRequests: [PeerID: TimeInterval] = [:]
|
||||||
|
|
||||||
|
// Allow responses for 30s after request
|
||||||
|
private let responseWindow: TimeInterval = 30.0
|
||||||
|
|
||||||
|
/// Register that we are sending a sync request to a peer.
|
||||||
|
/// - Parameter peerID: The peer we are requesting sync from
|
||||||
|
func registerRequest(to peerID: PeerID) {
|
||||||
|
let now = Date().timeIntervalSince1970
|
||||||
|
queue.async(flags: .barrier) {
|
||||||
|
SecureLogger.debug("Registering sync request to \(peerID.id.prefix(8))…", category: .sync)
|
||||||
|
self.pendingRequests[peerID] = now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a packet from a peer is a valid response to a sync request.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - peerID: The sender of the packet
|
||||||
|
/// - isRSR: Whether the packet is marked as a Request-Sync Response
|
||||||
|
/// - Returns: true if we have a pending request for this peer and the window is open
|
||||||
|
func isValidResponse(from peerID: PeerID, isRSR: Bool) -> Bool {
|
||||||
|
guard isRSR else { return false }
|
||||||
|
|
||||||
|
return queue.sync {
|
||||||
|
guard let requestTime = pendingRequests[peerID] else {
|
||||||
|
SecureLogger.warning("Received unsolicited RSR packet from \(peerID.id.prefix(8))…", category: .security)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Date().timeIntervalSince1970
|
||||||
|
if now - requestTime > responseWindow {
|
||||||
|
SecureLogger.warning("Received RSR packet from \(peerID.id.prefix(8))… outside of response window", category: .security)
|
||||||
|
// We don't remove here because we might receive multiple packets for one request
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Periodic cleanup of expired requests
|
||||||
|
func cleanup() {
|
||||||
|
let now = Date().timeIntervalSince1970
|
||||||
|
queue.async(flags: .barrier) {
|
||||||
|
let originalCount = self.pendingRequests.count
|
||||||
|
self.pendingRequests = self.pendingRequests.filter { _, timestamp in
|
||||||
|
now - timestamp <= self.responseWindow
|
||||||
|
}
|
||||||
|
let removed = originalCount - self.pendingRequests.count
|
||||||
|
if removed > 0 {
|
||||||
|
SecureLogger.debug("Cleaned up \(removed) expired sync requests", category: .sync)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,7 +7,8 @@ struct GossipSyncManagerTests {
|
|||||||
private let myPeerID = PeerID(str: "0102030405060708")
|
private let myPeerID = PeerID(str: "0102030405060708")
|
||||||
|
|
||||||
@Test func concurrentPacketIntakeAndSyncRequest() async throws {
|
@Test func concurrentPacketIntakeAndSyncRequest() async throws {
|
||||||
let manager = GossipSyncManager(myPeerID: myPeerID)
|
let requestSyncManager = RequestSyncManager()
|
||||||
|
let manager = GossipSyncManager(myPeerID: myPeerID, requestSyncManager: requestSyncManager)
|
||||||
let delegate = RecordingDelegate()
|
let delegate = RecordingDelegate()
|
||||||
manager.delegate = delegate
|
manager.delegate = delegate
|
||||||
|
|
||||||
@@ -48,7 +49,8 @@ struct GossipSyncManagerTests {
|
|||||||
config.stalePeerCleanupIntervalSeconds = 0
|
config.stalePeerCleanupIntervalSeconds = 0
|
||||||
config.stalePeerTimeoutSeconds = 5
|
config.stalePeerTimeoutSeconds = 5
|
||||||
|
|
||||||
let manager = GossipSyncManager(myPeerID: myPeerID, config: config)
|
let requestSyncManager = RequestSyncManager()
|
||||||
|
let manager = GossipSyncManager(myPeerID: myPeerID, config: config, requestSyncManager: requestSyncManager)
|
||||||
let peerHex = "0011223344556677"
|
let peerHex = "0011223344556677"
|
||||||
let senderData = try #require(Data(hexString: peerHex))
|
let senderData = try #require(Data(hexString: peerHex))
|
||||||
let initialTimestampMs = UInt64(Date().timeIntervalSince1970 * 1000)
|
let initialTimestampMs = UInt64(Date().timeIntervalSince1970 * 1000)
|
||||||
@@ -93,7 +95,8 @@ struct GossipSyncManagerTests {
|
|||||||
config.stalePeerTimeoutSeconds = 5
|
config.stalePeerTimeoutSeconds = 5
|
||||||
config.maxMessageAgeSeconds = 100
|
config.maxMessageAgeSeconds = 100
|
||||||
|
|
||||||
let manager = GossipSyncManager(myPeerID: myPeerID, config: config)
|
let requestSyncManager = RequestSyncManager()
|
||||||
|
let manager = GossipSyncManager(myPeerID: myPeerID, config: config, requestSyncManager: requestSyncManager)
|
||||||
let peerHex = "8899aabbccddeeff"
|
let peerHex = "8899aabbccddeeff"
|
||||||
let senderData = try #require(Data(hexString: peerHex))
|
let senderData = try #require(Data(hexString: peerHex))
|
||||||
let staleTimestampMs = UInt64(Date().addingTimeInterval(-(config.stalePeerTimeoutSeconds + 1)).timeIntervalSince1970 * 1000)
|
let staleTimestampMs = UInt64(Date().addingTimeInterval(-(config.stalePeerTimeoutSeconds + 1)).timeIntervalSince1970 * 1000)
|
||||||
@@ -137,7 +140,8 @@ struct GossipSyncManagerTests {
|
|||||||
config.fileTransferSyncIntervalSeconds = 1
|
config.fileTransferSyncIntervalSeconds = 1
|
||||||
config.maintenanceIntervalSeconds = 0
|
config.maintenanceIntervalSeconds = 0
|
||||||
|
|
||||||
let manager = GossipSyncManager(myPeerID: myPeerID, config: config)
|
let requestSyncManager = RequestSyncManager()
|
||||||
|
let manager = GossipSyncManager(myPeerID: myPeerID, config: config, requestSyncManager: requestSyncManager)
|
||||||
let delegate = RecordingDelegate()
|
let delegate = RecordingDelegate()
|
||||||
manager.delegate = delegate
|
manager.delegate = delegate
|
||||||
|
|
||||||
@@ -207,7 +211,8 @@ struct GossipSyncManagerTests {
|
|||||||
config.fragmentSyncIntervalSeconds = 0
|
config.fragmentSyncIntervalSeconds = 0
|
||||||
config.fileTransferSyncIntervalSeconds = 0
|
config.fileTransferSyncIntervalSeconds = 0
|
||||||
|
|
||||||
let manager = GossipSyncManager(myPeerID: myPeerID, config: config)
|
let requestSyncManager = RequestSyncManager()
|
||||||
|
let manager = GossipSyncManager(myPeerID: myPeerID, config: config, requestSyncManager: requestSyncManager)
|
||||||
let delegate = RecordingDelegate()
|
let delegate = RecordingDelegate()
|
||||||
manager.delegate = delegate
|
manager.delegate = delegate
|
||||||
|
|
||||||
@@ -269,4 +274,8 @@ private final class RecordingDelegate: GossipSyncManager.Delegate {
|
|||||||
func signPacketForBroadcast(_ packet: BitchatPacket) -> BitchatPacket {
|
func signPacketForBroadcast(_ packet: BitchatPacket) -> BitchatPacket {
|
||||||
packet
|
packet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getConnectedPeers() -> [PeerID] {
|
||||||
|
return []
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,7 +75,6 @@ struct SubscriptionRateLimitTests {
|
|||||||
@Test("Max attempts threshold prevents complete enumeration")
|
@Test("Max attempts threshold prevents complete enumeration")
|
||||||
func maxAttemptsThresholdPreventsEnumeration() {
|
func maxAttemptsThresholdPreventsEnumeration() {
|
||||||
let maxAttempts = TransportConfig.bleSubscriptionRateLimitMaxAttempts
|
let maxAttempts = TransportConfig.bleSubscriptionRateLimitMaxAttempts
|
||||||
let minInterval = TransportConfig.bleSubscriptionRateLimitMinSeconds
|
|
||||||
|
|
||||||
// After max attempts within window, announces are suppressed entirely
|
// After max attempts within window, announces are suppressed entirely
|
||||||
// This means an attacker gets at most maxAttempts announces per window
|
// This means an attacker gets at most maxAttempts announces per window
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
# Request Sync Manager & V2 Packet Updates
|
||||||
|
|
||||||
|
This document details the implementation of the Request Sync Manager and updates to the V2 packet structure to improve synchronization security and attribution on iOS, mirroring the Android implementation.
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
The goal of these changes is to make the request sync functionality "less blind". Previously, sync requests were broadcast, and responses were accepted without strict attribution or timestamp validation (to allow syncing old messages). This opened up potential spoofing vectors and prevented us from enforcing timestamp checks on normal traffic.
|
||||||
|
|
||||||
|
The new implementation introduces a **RequestSyncManager** to track outgoing sync requests and attributes incoming responses (RSR - Request-Sync Response) to specific peers. This allows us to:
|
||||||
|
1. **Enforce Timestamp Validation**: Normal packets now require timestamps to be within 2 minutes of the local clock.
|
||||||
|
2. **Exempt Solicited Sync Responses**: Packets marked as RSR are exempt from timestamp validation *only if* they correspond to a valid, pending sync request sent to that specific peer.
|
||||||
|
3. **Prevent Unsolicited Sync Floods**: Unsolicited RSR packets are rejected.
|
||||||
|
|
||||||
|
## Protocol Changes
|
||||||
|
|
||||||
|
### Binary Protocol Updates
|
||||||
|
* **New Flag**: `IS_RSR` (0x10) added to the packet header flags.
|
||||||
|
* **BitchatPacket**: Updated to include `isRSR: Bool` field.
|
||||||
|
* **Encoding/Decoding**: Updated `BinaryProtocol` to handle the new flag.
|
||||||
|
|
||||||
|
### Request Sync Payload
|
||||||
|
The `REQUEST_SYNC` packet payload (TLV encoded) has been updated to include:
|
||||||
|
* **Future Filters**:
|
||||||
|
* `sinceTimestamp` (Type 0x05): To request packets since a certain time (UInt64 big-endian).
|
||||||
|
* `fragmentIdFilter` (Type 0x06): To request specific fragments (UTF-8 string).
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
### RequestSyncManager
|
||||||
|
A new component (`Sync/RequestSyncManager.swift`) responsible for:
|
||||||
|
* **Tracking**: Stores `peerID -> timestamp` mappings for pending sync requests.
|
||||||
|
* **Validation**: `isValidResponse(from: PeerID, isRSR: Bool)` checks if an incoming RSR packet matches a pending request within the 30-second window.
|
||||||
|
* **Cleanup**: Periodically removes expired requests.
|
||||||
|
|
||||||
|
### GossipSyncManager Updates
|
||||||
|
* **Unicast Sync**: Instead of blind broadcasting, the periodic sync task now iterates over connected peers and sends unicast `REQUEST_SYNC` packets.
|
||||||
|
* **Registration**: Before sending, requests are registered with `RequestSyncManager`.
|
||||||
|
* **Response Marking**: When responding to a `REQUEST_SYNC`, generated packets (Announce/Message) are explicitly marked with `isRSR = true` (and `ttl = 0`).
|
||||||
|
|
||||||
|
### BLEService (Security Manager) Updates
|
||||||
|
* **Timestamp Enforcement**: Checks `abs(now - packetTimestamp) < 2 minutes` for standard packets.
|
||||||
|
* **Conditional Exemption**: If `packet.isRSR` is true (or packet is a legacy TTL=0 response), it queries `RequestSyncManager`.
|
||||||
|
* **Valid**: If solicited, timestamp check is skipped (allowing historical data sync).
|
||||||
|
* **Invalid**: If unsolicited or timed out, the packet is rejected.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
These changes are integrated into `BLEService` and `GossipSyncManager`. No external API changes are required for clients, but all peers must be updated to support the new `IS_RSR` flag and protocol logic to participate in the secure sync process.
|
||||||
@@ -19,4 +19,5 @@ public extension OSLog {
|
|||||||
static let session = OSLog(subsystem: subsystem, category: "session")
|
static let session = OSLog(subsystem: subsystem, category: "session")
|
||||||
static let security = OSLog(subsystem: subsystem, category: "security")
|
static let security = OSLog(subsystem: subsystem, category: "security")
|
||||||
static let handshake = OSLog(subsystem: subsystem, category: "handshake")
|
static let handshake = OSLog(subsystem: subsystem, category: "handshake")
|
||||||
|
static let sync = OSLog(subsystem: subsystem, category: "sync")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user