Port P2P trivial_signaling_server to Python

Writing it in go was a fine learning experience for me, but it was
just one more dependency/complexity and not very useful.
This commit is contained in:
Fletcher Dunn
2026-04-17 22:34:02 -07:00
parent fb2e965666
commit f96117f95a
5 changed files with 246 additions and 196 deletions
+1 -1
View File
@@ -136,7 +136,7 @@ Take a look at these files for more information:
* [steamnetworkingcustomsignaling.h](include/steam/steamnetworkingcustomsignaling.h) * [steamnetworkingcustomsignaling.h](include/steam/steamnetworkingcustomsignaling.h)
contains the interfaces you'll need to implement for your signaling service. contains the interfaces you'll need to implement for your signaling service.
* An example of a really trivial signaling protocol: * An example of a really trivial signaling protocol:
* [trivial_signaling_server.go](examples/trivial_signaling_server.go) server * [trivial_signaling_server.py](examples/trivial_signaling_server.py) server
* [trivial_signaling_client.cpp](examples/trivial_signaling_client.cpp) client * [trivial_signaling_client.cpp](examples/trivial_signaling_client.cpp) client
* A test case that puts everything together. It starts up an example trivial * A test case that puts everything together. It starts up an example trivial
signaling protocol server and two peers, and has them connect to each other signaling protocol server and two peers, and has them connect to each other
+11 -24
View File
@@ -1,30 +1,17 @@
# #
# Trivial signaling server, written in go # Copy trivial signaling server python script into the runtime dir
# #
if( ENABLE_ICE AND ( BUILD_EXAMPLES OR BUILD_TESTS ) ) if( ENABLE_ICE AND ( BUILD_EXAMPLES OR BUILD_TESTS ) )
find_program( GO go ) set(SIGNAL_SERVER_TARGET trivial_signaling_server)
if ( NOT GO ) set(SIGNAL_SERVER_OUTPUT ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server.py)
message(WARNING "Could not find 'go' binary, will not build signaling server example program") set(SIGNAL_SERVER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trivial_signaling_server.py)
else() add_custom_command(
set(SIGNAL_SERVER_TARGET trivial_signaling_server) OUTPUT ${SIGNAL_SERVER_OUTPUT}
DEPENDS ${SIGNAL_SERVER_SRC}
if ( WIN32 ) COMMENT "Publishing ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server.py"
set(SIGNAL_SERVER_OUTPUT ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server.exe) COMMAND ${CMAKE_COMMAND} -E copy ${SIGNAL_SERVER_SRC} ${SIGNAL_SERVER_OUTPUT}
else() )
set(SIGNAL_SERVER_OUTPUT ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/trivial_signaling_server) add_custom_target(${SIGNAL_SERVER_TARGET} ALL DEPENDS ${SIGNAL_SERVER_OUTPUT})
endif()
set(SIGNAL_SERVER_SRCS
trivial_signaling_server.go
)
add_custom_command(
OUTPUT ${SIGNAL_SERVER_OUTPUT}
DEPENDS ${SIGNAL_SERVER_SRCS}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMENT "Building GO Trivial signaling server"
COMMAND ${GO} build -o "${SIGNAL_SERVER_OUTPUT}" ${CMAKE_GO_FLAGS} ${SIGNAL_SERVER_SRCS}
)
add_custom_target(${SIGNAL_SERVER_TARGET} ALL DEPENDS ${SIGNAL_SERVER_OUTPUT})
endif()
endif() endif()
# #
-163
View File
@@ -1,163 +0,0 @@
// Really simple P2P signaling server.
//
// When establishing peer-to-peer connections, the peers
// need some sort of pre-arranged side channels that they
// can use to exchange messages. This channel is assumed
// to be relatively low bandwidth and high latency. This
// service is often called "signaling".
//
// This server has the following really simple protocol:
// It listens on a particular TCP port. Clients connect
// raw TCP. The protocol is text-based and line oriented,
// so it is easy to test using telnet. When a client
// connects, it should send its identity on the first line.
// Afterwards, clients can send a message to a peer by
// sending a line formatted as follows:
//
// DESTINATION_IDENTITY PAYLOAD
//
// Identites may not contain spaces, and the payload
// should be plain ASCII text. (Hex or base64 encode it).
//
// If there is a client with that destination identity,
// then the server will forward the message on. Otherwise
// it is discarded.
//
// Forwarded messages have basically the same format and
// are the only type of message the server ever sends to the
// client. The only difference is that the identity is the
// identity of the sender.
//
// This is just an example code to illustrate what a
// signaling service is. A real production server would
// probably need to be able to scale across multiple
// processes, and provide authentication and rate
// limiting.
//
// Note that SteamNetworkingSockets use of signaling
// service does NOT assume guaranteed delivery.
package main
import (
"bufio"
"flag"
"fmt"
"log"
"net"
"strings"
)
const DEFAULT_LISTEN_PORT = 10000
// Current list of client connections
var g_mapClientConnections = make(map[string]net.Conn)
// Goroutine to service a client connection
func ServiceConnection(conn net.Conn) {
// Save off address
addr := conn.RemoteAddr().String()
// Attach a Reader object to the connection, so we can read from it easily
in := bufio.NewReader(conn)
// In our trivial protocol, the first line contains the client identity
// on a line by itself
intro, err := in.ReadString('\n')
if err != nil {
log.Printf("[%s] Aborting connection before we ever received client identity", addr)
conn.Close()
}
identity := strings.TrimSpace(intro)
// Amnnnnnd that's it. No authentication.
// Locate existing connection, if any.
existingConn := g_mapClientConnections[identity]
// Add us to map or replace existing entry
g_mapClientConnections[identity] = conn
// Now handle existing entry
if existingConn != nil {
log.Printf("[%s@%s] Closing connection to make room for new connection from '%s'", identity, existingConn.RemoteAddr().String(), addr)
existingConn.Close()
}
log.Printf("[%s@%s] Added connection", identity, addr)
// Keep reading until connection is closed
for {
line, err := in.ReadString('\n')
if err != nil {
conn.Close()
// Are we stil in the map?
if g_mapClientConnections[identity] == conn {
log.Printf("[%s@%s] Connecton closed. %s", identity, addr, err)
delete(g_mapClientConnections, identity)
} else {
// Assume it's because we got replaced by another connection.
// The other connection already logged, so don't do anything here
}
break
}
// Our protocol is just [destination peer identity] [payload]
// And everything is in text.
dest_and_msg := strings.SplitN(line, " ", 2)
if len(dest_and_msg) != 2 {
log.Printf("[%s@%s] Ignoring weird input '%s' (maybe truncated?)", identity, addr, line)
continue
}
dest_identity := strings.TrimSpace(dest_and_msg[0])
payload := dest_and_msg[1]
// Locate the destination peer's connection.
dest_conn := g_mapClientConnections[dest_identity]
if dest_conn == nil {
log.Printf("[%s@%s] Ignoring, destination peer '%s' not found", identity, addr, dest_identity)
continue
}
// Format new message, putting the sender's identity in front.
msg := identity + " " + payload
// Send to the peer
dest_conn.Write([]byte(msg))
// Lawg
log.Printf("[%s@%s] -> %s (%d chars)", identity, addr, dest_identity, len(payload))
}
}
// Main entry point
func main() {
// Parse command line flags
port := flag.Int("port", DEFAULT_LISTEN_PORT, "Port to listen on")
flag.Parse()
listen_addr := fmt.Sprintf("0.0.0.0:%d", *port)
// Start listening
listener, err := net.Listen("tcp", listen_addr)
if err != nil {
log.Panic(err)
}
log.Printf("Listening at %s", listen_addr)
// Main loop
for {
// Wait for the next incoming connection
conn, err := listener.Accept()
if err != nil {
log.Panic(err)
}
// Start goroutine to service it
go ServiceConnection(conn)
}
}
+226
View File
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
# Really simple P2P signaling server.
#
# When establishing peer-to-peer connections, the peers
# need some sort of pre-arranged side channels that they
# can use to exchange messages. This channel is assumed
# to be relatively low bandwidth and high latency. This
# service is often called "signaling".
#
# This server has the following really simple protocol:
# It listens on a particular TCP port. Clients connect
# raw TCP. The protocol is text-based and line oriented,
# so it is easy to test using telnet. When a client
# connects, it should send its identity on the first line.
# Afterwards, clients can send a message to a peer by
# sending a line formatted as follows:
#
# DESTINATION_IDENTITY PAYLOAD
#
# Identites may not contain spaces, and the payload
# should be plain ASCII text. (Hex or base64 encode it).
#
# If there is a client with that destination identity,
# then the server will forward the message on. Otherwise
# it is discarded.
#
# Forwarded messages have basically the same format and
# are the only type of message the server ever sends to the
# client. The only difference is that the identity is the
# identity of the sender.
#
# This is just an example code to illustrate what a
# signaling service is. A real production server would
# probably need to be able to scale across multiple
# processes, and provide authentication and rate
# limiting.
#
# Note that SteamNetworkingSockets use of signaling
# service does NOT assume guaranteed delivery.
import argparse
import errno
import io
import socket
import socketserver
import sys
import threading
DEFAULT_LISTEN_PORT = 10000
class SignalingState(object):
def __init__(self):
self.lock = threading.Lock()
self.clients = {}
class SignalingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
def __init__(self, server_address, request_handler_class, state, address_family=socket.AF_INET, dual_stack=False):
self.address_family = address_family
self.dual_stack = dual_stack
socketserver.TCPServer.__init__(self, server_address, request_handler_class, bind_and_activate=False)
# For AF_INET6 sockets, ask the OS to also accept IPv4-mapped addresses.
if self.dual_stack and self.address_family == socket.AF_INET6 and hasattr(socket, "IPV6_V6ONLY"):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except OSError:
pass
self.server_bind()
self.server_activate()
self.state = state
class SignalingRequestHandler(socketserver.StreamRequestHandler):
def setup(self):
socketserver.StreamRequestHandler.setup(self)
try:
self.request.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Use more aggressive probing where the platform exposes these knobs.
if hasattr(socket, "TCP_KEEPIDLE"):
self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
if hasattr(socket, "TCP_KEEPINTVL"):
self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
if hasattr(socket, "TCP_KEEPCNT"):
self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
except OSError:
# Keepalive tuning is best-effort for this example server.
pass
def handle(self):
peer_addr = self.client_address[0] + ":" + str(self.client_address[1])
identity = None
disconnect_reason = None
try:
intro = self.rfile.readline()
if not intro:
print("[%s] Aborting connection before receiving client identity" % peer_addr)
return
identity = intro.strip().decode("utf-8", "ignore")
if not identity:
print("[%s] Aborting connection due to empty client identity" % peer_addr)
return
replaced_conn = None
with self.server.state.lock:
replaced_conn = self.server.state.clients.get(identity)
self.server.state.clients[identity] = self.request
if replaced_conn is not None and replaced_conn is not self.request:
try:
print("[%s@%s] Closing previous connection" % (identity, peer_addr))
replaced_conn.close()
except Exception:
pass
print("[%s@%s] Added connection" % (identity, peer_addr))
while True:
try:
line = self.rfile.readline()
except ConnectionResetError:
disconnect_reason = "peer reset connection"
break
except OSError as ex:
if ex.errno == errno.ETIMEDOUT:
disconnect_reason = "connection timed out (keepalive failure)"
else:
disconnect_reason = "socket error %s" % ex
break
if not line:
disconnect_reason = "peer closed connection"
break
try:
line_text = line.decode("utf-8", "ignore")
except Exception:
continue
parts = line_text.split(" ", 1)
if len(parts) != 2:
print("[%s@%s] Ignoring weird input '%s'" % (identity, peer_addr, line_text.rstrip()))
continue
dest_identity = parts[0].strip()
payload = parts[1]
if not dest_identity:
continue
with self.server.state.lock:
dest_conn = self.server.state.clients.get(dest_identity)
if dest_conn is None:
print("[%s@%s] Ignoring, destination peer '%s' not found" % (identity, peer_addr, dest_identity))
continue
out_msg = (identity + " " + payload).encode("utf-8")
try:
dest_conn.sendall(out_msg)
print("[%s@%s] -> %s (%d chars)" % (identity, peer_addr, dest_identity, len(payload)))
except Exception as ex:
print("[%s@%s] Failed forwarding to %s: %s" % (identity, peer_addr, dest_identity, ex))
finally:
if identity is not None:
with self.server.state.lock:
if self.server.state.clients.get(identity) is self.request:
del self.server.state.clients[identity]
if disconnect_reason is None:
disconnect_reason = "connection ended"
print("[%s@%s] Connection closed: %s" % (identity, peer_addr, disconnect_reason))
def parse_args():
parser = argparse.ArgumentParser(description="Trivial P2P signaling server")
parser.add_argument("--port", type=int, default=DEFAULT_LISTEN_PORT, help="Port to listen on")
return parser.parse_args()
def main():
# Enable line buffering for stdout so output appears immediately in CI environments
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(line_buffering=True)
else:
# Fallback for older Python versions
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
args = parse_args()
listen_addr = ("::", args.port)
state = SignalingState()
try:
server = SignalingTCPServer(
listen_addr,
SignalingRequestHandler,
state,
address_family=socket.AF_INET6,
dual_stack=True,
)
except OSError:
# Fall back to IPv4-only if dual-stack IPv6 listener is not available.
listen_addr = ("0.0.0.0", args.port)
server = SignalingTCPServer(
listen_addr,
SignalingRequestHandler,
state,
address_family=socket.AF_INET,
dual_stack=False,
)
print("Listening at %s:%d" % (listen_addr[0], listen_addr[1]))
try:
server.serve_forever()
finally:
server.server_close()
if __name__ == "__main__":
main()
+8 -8
View File
@@ -133,13 +133,14 @@ def SymmetricTest():
# #
# Start the signaling server # Start the signaling server
trivial_signaling_server = './trivial_signaling_server' trivial_signaling_server = './trivial_signaling_server.py'
if os.name == 'nt' and not os.path.exists( 'trivial_signaling_server.exe' ): if not os.path.exists( trivial_signaling_server ):
trivial_signaling_server = '../examples/trivial_signaling_server.exe' trivial_signaling_server = '../examples/trivial_signaling_server.py'
if not os.path.exists( trivial_signaling_server ): if not os.path.exists( trivial_signaling_server ):
print( "Can't find trivial_signaling_server.exe" ) print( "Can't find trivial_signaling_server.py" )
sys.exit(1) sys.exit(1)
signaling = StartProcessInThread( "signaling", [ trivial_signaling_server ] )
signaling = StartProcessInThread( "signaling", [ sys.executable, trivial_signaling_server ] )
# Run the tests # Run the tests
for test in [ ClientServerTest, SymmetricTest ]: for test in [ ClientServerTest, SymmetricTest ]:
@@ -162,4 +163,3 @@ if really_failed:
sys.exit(1) sys.exit(1)
print( "TEST SUCCEEDED" ) print( "TEST SUCCEEDED" )