From f96117f95ac991e5e0d0a4791275c12ff1785faa Mon Sep 17 00:00:00 2001 From: Fletcher Dunn Date: Fri, 17 Apr 2026 22:34:02 -0700 Subject: [PATCH] Port P2P trivial_signaling_server to Python Writing it in go was a fine learning experience for me, but it was just one more dependency/complexity and not very useful. --- README_P2P.md | 2 +- examples/CMakeLists.txt | 35 ++--- examples/trivial_signaling_server.go | 163 ------------------- examples/trivial_signaling_server.py | 226 +++++++++++++++++++++++++++ tests/test_p2p.py | 16 +- 5 files changed, 246 insertions(+), 196 deletions(-) delete mode 100644 examples/trivial_signaling_server.go create mode 100644 examples/trivial_signaling_server.py diff --git a/README_P2P.md b/README_P2P.md index 97df01a..7ec028d 100644 --- a/README_P2P.md +++ b/README_P2P.md @@ -136,7 +136,7 @@ Take a look at these files for more information: * [steamnetworkingcustomsignaling.h](include/steam/steamnetworkingcustomsignaling.h) contains the interfaces you'll need to implement for your signaling service. * An example of a really trivial signaling protocol: - * [trivial_signaling_server.go](examples/trivial_signaling_server.go) server + * [trivial_signaling_server.py](examples/trivial_signaling_server.py) server * [trivial_signaling_client.cpp](examples/trivial_signaling_client.cpp) client * A test case that puts everything together. It starts up an example trivial signaling protocol server and two peers, and has them connect to each other diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 45dc5b2..e41a780 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,30 +1,17 @@ # -# Trivial signaling server, written in go +# Copy trivial signaling server python script into the runtime dir # if( ENABLE_ICE AND ( BUILD_EXAMPLES OR BUILD_TESTS ) ) - find_program( GO go ) - if ( NOT GO ) - message(WARNING "Could not find 'go' binary, will not build signaling server example program") - else() - set(SIGNAL_SERVER_TARGET trivial_signaling_server) - - if ( WIN32 ) - set(SIGNAL_SERVER_OUTPUT ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server.exe) - else() - set(SIGNAL_SERVER_OUTPUT ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server) - endif() - set(SIGNAL_SERVER_SRCS - trivial_signaling_server.go - ) - add_custom_command( - OUTPUT ${SIGNAL_SERVER_OUTPUT} - DEPENDS ${SIGNAL_SERVER_SRCS} - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} - COMMENT "Building GO Trivial signaling server" - COMMAND ${GO} build -o "${SIGNAL_SERVER_OUTPUT}" ${CMAKE_GO_FLAGS} ${SIGNAL_SERVER_SRCS} - ) - add_custom_target(${SIGNAL_SERVER_TARGET} ALL DEPENDS ${SIGNAL_SERVER_OUTPUT}) - endif() + set(SIGNAL_SERVER_TARGET trivial_signaling_server) + set(SIGNAL_SERVER_OUTPUT ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server.py) + set(SIGNAL_SERVER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trivial_signaling_server.py) + add_custom_command( + OUTPUT ${SIGNAL_SERVER_OUTPUT} + DEPENDS ${SIGNAL_SERVER_SRC} + COMMENT "Publishing ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server.py" + COMMAND ${CMAKE_COMMAND} -E copy ${SIGNAL_SERVER_SRC} ${SIGNAL_SERVER_OUTPUT} + ) + add_custom_target(${SIGNAL_SERVER_TARGET} ALL DEPENDS ${SIGNAL_SERVER_OUTPUT}) endif() # diff --git a/examples/trivial_signaling_server.go b/examples/trivial_signaling_server.go deleted file mode 100644 index 63801c4..0000000 --- a/examples/trivial_signaling_server.go +++ /dev/null @@ -1,163 +0,0 @@ -// Really simple P2P signaling server. -// -// When establishing peer-to-peer connections, the peers -// need some sort of pre-arranged side channels that they -// can use to exchange messages. This channel is assumed -// to be relatively low bandwidth and high latency. This -// service is often called "signaling". -// -// This server has the following really simple protocol: -// It listens on a particular TCP port. Clients connect -// raw TCP. The protocol is text-based and line oriented, -// so it is easy to test using telnet. When a client -// connects, it should send its identity on the first line. -// Afterwards, clients can send a message to a peer by -// sending a line formatted as follows: -// -// DESTINATION_IDENTITY PAYLOAD -// -// Identites may not contain spaces, and the payload -// should be plain ASCII text. (Hex or base64 encode it). -// -// If there is a client with that destination identity, -// then the server will forward the message on. Otherwise -// it is discarded. -// -// Forwarded messages have basically the same format and -// are the only type of message the server ever sends to the -// client. The only difference is that the identity is the -// identity of the sender. -// -// This is just an example code to illustrate what a -// signaling service is. A real production server would -// probably need to be able to scale across multiple -// processes, and provide authentication and rate -// limiting. -// -// Note that SteamNetworkingSockets use of signaling -// service does NOT assume guaranteed delivery. - -package main - -import ( - "bufio" - "flag" - "fmt" - "log" - "net" - "strings" -) - -const DEFAULT_LISTEN_PORT = 10000 - -// Current list of client connections -var g_mapClientConnections = make(map[string]net.Conn) - -// Goroutine to service a client connection -func ServiceConnection(conn net.Conn) { - - // Save off address - addr := conn.RemoteAddr().String() - - // Attach a Reader object to the connection, so we can read from it easily - in := bufio.NewReader(conn) - - // In our trivial protocol, the first line contains the client identity - // on a line by itself - intro, err := in.ReadString('\n') - if err != nil { - log.Printf("[%s] Aborting connection before we ever received client identity", addr) - conn.Close() - } - identity := strings.TrimSpace(intro) - - // Amnnnnnd that's it. No authentication. - - // Locate existing connection, if any. - existingConn := g_mapClientConnections[identity] - - // Add us to map or replace existing entry - g_mapClientConnections[identity] = conn - - // Now handle existing entry - if existingConn != nil { - log.Printf("[%s@%s] Closing connection to make room for new connection from '%s'", identity, existingConn.RemoteAddr().String(), addr) - existingConn.Close() - } - log.Printf("[%s@%s] Added connection", identity, addr) - - // Keep reading until connection is closed - for { - line, err := in.ReadString('\n') - if err != nil { - conn.Close() - - // Are we stil in the map? - if g_mapClientConnections[identity] == conn { - log.Printf("[%s@%s] Connecton closed. %s", identity, addr, err) - delete(g_mapClientConnections, identity) - } else { - // Assume it's because we got replaced by another connection. - // The other connection already logged, so don't do anything here - } - break - } - - // Our protocol is just [destination peer identity] [payload] - // And everything is in text. - dest_and_msg := strings.SplitN(line, " ", 2) - if len(dest_and_msg) != 2 { - log.Printf("[%s@%s] Ignoring weird input '%s' (maybe truncated?)", identity, addr, line) - continue - } - dest_identity := strings.TrimSpace(dest_and_msg[0]) - payload := dest_and_msg[1] - - // Locate the destination peer's connection. - dest_conn := g_mapClientConnections[dest_identity] - if dest_conn == nil { - log.Printf("[%s@%s] Ignoring, destination peer '%s' not found", identity, addr, dest_identity) - continue - } - - // Format new message, putting the sender's identity in front. - msg := identity + " " + payload - - // Send to the peer - dest_conn.Write([]byte(msg)) - - // Lawg - log.Printf("[%s@%s] -> %s (%d chars)", identity, addr, dest_identity, len(payload)) - } - -} - -// Main entry point -func main() { - - // Parse command line flags - port := flag.Int("port", DEFAULT_LISTEN_PORT, "Port to listen on") - flag.Parse() - listen_addr := fmt.Sprintf("0.0.0.0:%d", *port) - - // Start listening - listener, err := net.Listen("tcp", listen_addr) - if err != nil { - log.Panic(err) - } - log.Printf("Listening at %s", listen_addr) - - // Main loop - for { - - // Wait for the next incoming connection - conn, err := listener.Accept() - if err != nil { - log.Panic(err) - } - - // Start goroutine to service it - go ServiceConnection(conn) - } - -} diff --git a/examples/trivial_signaling_server.py b/examples/trivial_signaling_server.py new file mode 100644 index 0000000..733aab8 --- /dev/null +++ b/examples/trivial_signaling_server.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 + +# Really simple P2P signaling server. +# +# When establishing peer-to-peer connections, the peers +# need some sort of pre-arranged side channels that they +# can use to exchange messages. This channel is assumed +# to be relatively low bandwidth and high latency. This +# service is often called "signaling". +# +# This server has the following really simple protocol: +# It listens on a particular TCP port. Clients connect +# raw TCP. The protocol is text-based and line oriented, +# so it is easy to test using telnet. When a client +# connects, it should send its identity on the first line. +# Afterwards, clients can send a message to a peer by +# sending a line formatted as follows: +# +# DESTINATION_IDENTITY PAYLOAD +# +# Identites may not contain spaces, and the payload +# should be plain ASCII text. (Hex or base64 encode it). +# +# If there is a client with that destination identity, +# then the server will forward the message on. Otherwise +# it is discarded. +# +# Forwarded messages have basically the same format and +# are the only type of message the server ever sends to the +# client. The only difference is that the identity is the +# identity of the sender. +# +# This is just an example code to illustrate what a +# signaling service is. A real production server would +# probably need to be able to scale across multiple +# processes, and provide authentication and rate +# limiting. +# +# Note that SteamNetworkingSockets use of signaling +# service does NOT assume guaranteed delivery. + +import argparse +import errno +import io +import socket +import socketserver +import sys +import threading + +DEFAULT_LISTEN_PORT = 10000 + + +class SignalingState(object): + def __init__(self): + self.lock = threading.Lock() + self.clients = {} + + +class SignalingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + daemon_threads = True + + def __init__(self, server_address, request_handler_class, state, address_family=socket.AF_INET, dual_stack=False): + self.address_family = address_family + self.dual_stack = dual_stack + socketserver.TCPServer.__init__(self, server_address, request_handler_class, bind_and_activate=False) + + # For AF_INET6 sockets, ask the OS to also accept IPv4-mapped addresses. + if self.dual_stack and self.address_family == socket.AF_INET6 and hasattr(socket, "IPV6_V6ONLY"): + try: + self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + except OSError: + pass + + self.server_bind() + self.server_activate() + self.state = state + + +class SignalingRequestHandler(socketserver.StreamRequestHandler): + def setup(self): + socketserver.StreamRequestHandler.setup(self) + try: + self.request.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # Use more aggressive probing where the platform exposes these knobs. + if hasattr(socket, "TCP_KEEPIDLE"): + self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) + if hasattr(socket, "TCP_KEEPINTVL"): + self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10) + if hasattr(socket, "TCP_KEEPCNT"): + self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3) + except OSError: + # Keepalive tuning is best-effort for this example server. + pass + + def handle(self): + peer_addr = self.client_address[0] + ":" + str(self.client_address[1]) + identity = None + disconnect_reason = None + + try: + intro = self.rfile.readline() + if not intro: + print("[%s] Aborting connection before receiving client identity" % peer_addr) + return + + identity = intro.strip().decode("utf-8", "ignore") + if not identity: + print("[%s] Aborting connection due to empty client identity" % peer_addr) + return + + replaced_conn = None + with self.server.state.lock: + replaced_conn = self.server.state.clients.get(identity) + self.server.state.clients[identity] = self.request + + if replaced_conn is not None and replaced_conn is not self.request: + try: + print("[%s@%s] Closing previous connection" % (identity, peer_addr)) + replaced_conn.close() + except Exception: + pass + + print("[%s@%s] Added connection" % (identity, peer_addr)) + + while True: + try: + line = self.rfile.readline() + except ConnectionResetError: + disconnect_reason = "peer reset connection" + break + except OSError as ex: + if ex.errno == errno.ETIMEDOUT: + disconnect_reason = "connection timed out (keepalive failure)" + else: + disconnect_reason = "socket error %s" % ex + break + if not line: + disconnect_reason = "peer closed connection" + break + + try: + line_text = line.decode("utf-8", "ignore") + except Exception: + continue + + parts = line_text.split(" ", 1) + if len(parts) != 2: + print("[%s@%s] Ignoring weird input '%s'" % (identity, peer_addr, line_text.rstrip())) + continue + + dest_identity = parts[0].strip() + payload = parts[1] + if not dest_identity: + continue + + with self.server.state.lock: + dest_conn = self.server.state.clients.get(dest_identity) + + if dest_conn is None: + print("[%s@%s] Ignoring, destination peer '%s' not found" % (identity, peer_addr, dest_identity)) + continue + + out_msg = (identity + " " + payload).encode("utf-8") + try: + dest_conn.sendall(out_msg) + print("[%s@%s] -> %s (%d chars)" % (identity, peer_addr, dest_identity, len(payload))) + except Exception as ex: + print("[%s@%s] Failed forwarding to %s: %s" % (identity, peer_addr, dest_identity, ex)) + finally: + if identity is not None: + with self.server.state.lock: + if self.server.state.clients.get(identity) is self.request: + del self.server.state.clients[identity] + if disconnect_reason is None: + disconnect_reason = "connection ended" + print("[%s@%s] Connection closed: %s" % (identity, peer_addr, disconnect_reason)) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Trivial P2P signaling server") + parser.add_argument("--port", type=int, default=DEFAULT_LISTEN_PORT, help="Port to listen on") + return parser.parse_args() + + +def main(): + # Enable line buffering for stdout so output appears immediately in CI environments + if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(line_buffering=True) + else: + # Fallback for older Python versions + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True) + + args = parse_args() + listen_addr = ("::", args.port) + state = SignalingState() + + try: + server = SignalingTCPServer( + listen_addr, + SignalingRequestHandler, + state, + address_family=socket.AF_INET6, + dual_stack=True, + ) + except OSError: + # Fall back to IPv4-only if dual-stack IPv6 listener is not available. + listen_addr = ("0.0.0.0", args.port) + server = SignalingTCPServer( + listen_addr, + SignalingRequestHandler, + state, + address_family=socket.AF_INET, + dual_stack=False, + ) + + print("Listening at %s:%d" % (listen_addr[0], listen_addr[1])) + try: + server.serve_forever() + finally: + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/tests/test_p2p.py b/tests/test_p2p.py index 694120a..70336cd 100755 --- a/tests/test_p2p.py +++ b/tests/test_p2p.py @@ -133,13 +133,14 @@ def SymmetricTest(): # # Start the signaling server -trivial_signaling_server = './trivial_signaling_server' -if os.name == 'nt' and not os.path.exists( 'trivial_signaling_server.exe' ): - trivial_signaling_server = '../examples/trivial_signaling_server.exe' - if not os.path.exists( trivial_signaling_server ): - print( "Can't find trivial_signaling_server.exe" ) - sys.exit(1) -signaling = StartProcessInThread( "signaling", [ trivial_signaling_server ] ) +trivial_signaling_server = './trivial_signaling_server.py' +if not os.path.exists( trivial_signaling_server ): + trivial_signaling_server = '../examples/trivial_signaling_server.py' +if not os.path.exists( trivial_signaling_server ): + print( "Can't find trivial_signaling_server.py" ) + sys.exit(1) + +signaling = StartProcessInThread( "signaling", [ sys.executable, trivial_signaling_server ] ) # Run the tests for test in [ ClientServerTest, SymmetricTest ]: @@ -162,4 +163,3 @@ if really_failed: sys.exit(1) print( "TEST SUCCEEDED" ) -