Added "poll group" interface.

This is used to poll many connections in a single function call.  Previously,
this was only possible if all of the connections were those accepted on the
same listen socket.  (ReceiveMessagesOnListenSocket).  But this left out at
least two important use cases with known users:

- If you create more than one listen socket (because there is more way to
  contact your service, e.g. once for P2P and another for direct IP, and
  another for relayed connections), then you could not poll all of the
  connections efficiently.
- In P2P use cases, we may initiate many connections to peers, and we want
  to poll all of them at once.

This change is relevant to: Issue #49, Issue #50, and issue #52.  (But I don't
this it really "fixes" any of them.)
This commit is contained in:
Fletcher Dunn
2019-12-05 15:51:33 -08:00
parent 8fcfe152eb
commit a9e5e86b28
10 changed files with 470 additions and 57 deletions
+19 -1
View File
@@ -243,6 +243,9 @@ public:
m_hListenSock = m_pInterface->CreateListenSocketIP( serverLocalAddr, 0, nullptr );
if ( m_hListenSock == k_HSteamListenSocket_Invalid )
FatalError( "Failed to listen on port %d", nPort );
m_hPollGroup = m_pInterface->CreatePollGroup();
if ( m_hPollGroup == k_HSteamNetPollGroup_Invalid )
FatalError( "Failed to listen on port %d", nPort );
Printf( "Server listening on port %d\n", nPort );
while ( !g_bQuit )
@@ -268,10 +271,17 @@ public:
m_pInterface->CloseConnection( it.first, 0, "Server Shutdown", true );
}
m_mapClients.clear();
m_pInterface->CloseListenSocket( m_hListenSock );
m_hListenSock = k_HSteamListenSocket_Invalid;
m_pInterface->DestroyPollGroup( m_hPollGroup );
m_hPollGroup = k_HSteamNetPollGroup_Invalid;
}
private:
HSteamListenSocket m_hListenSock;
HSteamNetPollGroup m_hPollGroup;
ISteamNetworkingSockets *m_pInterface;
struct Client_t
@@ -302,7 +312,7 @@ private:
while ( !g_bQuit )
{
ISteamNetworkingMessage *pIncomingMsg = nullptr;
int numMsgs = m_pInterface->ReceiveMessagesOnListenSocket( m_hListenSock, &pIncomingMsg, 1 );
int numMsgs = m_pInterface->ReceiveMessagesOnPollGroup( m_hPollGroup, &pIncomingMsg, 1 );
if ( numMsgs == 0 )
break;
if ( numMsgs < 0 )
@@ -468,6 +478,14 @@ private:
break;
}
// Assign the poll group
if ( !m_pInterface->SetConnectionPollGroup( pInfo->m_hConn, m_hPollGroup ) )
{
m_pInterface->CloseConnection( pInfo->m_hConn, 0, nullptr, false );
Printf( "Failed to set poll group?" );
break;
}
// Generate a random nick. A random temporary nick
// is really dumb and not how you would write a real chat server.
// You would want them to have some sort of signon message,
+58 -16
View File
@@ -244,7 +244,7 @@ public:
/// pOutMessageNumberOrResult is an optional array that will receive,
/// for each message, the message number that was assigned to the message
/// if sending was successful. If sending failed, then a negative EResult
/// valid is placed into the array. For example, the array will hold
/// value is placed into the array. For example, the array will hold
/// -k_EResultInvalidState if the connection was in an invalid state.
/// See ISteamNetworkingSockets::SendMessageToConnection for possible
/// failure codes.
@@ -273,7 +273,7 @@ public:
/// Reliable messages will be received in the order they were sent (and with the
/// same sizes --- see SendMessageToConnection for on this subtle difference from a stream socket).
///
/// Unreliable messages may be dropped, or delivered out of order withrespect to
/// Unreliable messages may be dropped, or delivered out of order with respect to
/// each other or with respect to reliable messages. The same unreliable message
/// may be received multiple times.
///
@@ -282,16 +282,6 @@ public:
/// a little while (put it into some queue, etc), and you may call Release() from any thread.
virtual int ReceiveMessagesOnConnection( HSteamNetConnection hConn, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) = 0;
/// Same as ReceiveMessagesOnConnection, but will return the next message available
/// on any connection that was accepted through the specified listen socket. Examine
/// SteamNetworkingMessage_t::m_conn to know which client connection.
///
/// Delivery order of messages among different clients is not defined. They may
/// be returned in an order different from what they were actually received. (Delivery
/// order of messages from the same client is well defined, and thus the order of the
/// messages is relevant!)
virtual int ReceiveMessagesOnListenSocket( HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) = 0;
/// Returns basic information about the high-level state of the connection.
virtual bool GetConnectionInfo( HSteamNetConnection hConn, SteamNetConnectionInfo_t *pInfo ) = 0;
@@ -378,13 +368,66 @@ public:
/// details, pass non-NULL to receive them.
virtual ESteamNetworkingAvailability GetAuthenticationStatus( SteamNetAuthenticationStatus_t *pDetails ) = 0;
//
// Poll groups. A poll group is a set of connections that can be polled efficiently.
// (In our API, to "poll" a connection means to retrieve all pending messages. We
// actually don't have an API to "poll" the connection *state*, like BSD sockets.)
//
/// Create a new poll group.
///
/// You should destroy the poll group when you are done using DestroyPollGroup
virtual HSteamNetPollGroup CreatePollGroup() = 0;
/// Destroy a poll group created with CreatePollGroup().
///
/// If there are any connections in the poll group, they are removed from the group,
/// and left in a state where they are not part of any poll group.
/// Returns false if passed an invalid poll group handle.
virtual bool DestroyPollGroup( HSteamNetPollGroup hPollGroup ) = 0;
/// Assign a connection to a poll group. Note that a connection may only belong to a
/// single poll group. Adding a connection to a poll group implicitly removes it from
/// any other poll group it is in.
///
/// You can pass k_HSteamNetPollGroup_Invalid to remove a connection from its current
/// poll group without adding it to a new poll group.
///
/// If there are received messages currently pending on the connection, an attempt
/// is made to add them to the queue of messages for the poll group in approximately
/// the order that would have applied if the connection was already part of the poll
/// group at the time that the messages were received.
///
/// Returns false if the connection handle is invalid, or if the poll group handle
/// is invalid (and not k_HSteamNetPollGroup_Invalid).
virtual bool SetConnectionPollGroup( HSteamNetConnection hConn, HSteamNetPollGroup hPollGroup ) = 0;
/// Same as ReceiveMessagesOnConnection, but will return the next messages available
/// on any connection in the poll group. Examine SteamNetworkingMessage_t::m_conn
/// to know which connection. (SteamNetworkingMessage_t::m_nConnUserData might also
/// be useful.)
///
/// Delivery order of messages among different connections will usually match the
/// order that the last packet was received which completed the message. But this
/// is not a strong guarantee, especially for packets received right as a connection
/// is being assigned to poll group.
///
/// Delivery order of messages on the same connection is well defined and the
/// same guarantees are present as mentioned in ReceiveMessagesOnConnection.
/// (But the messages are not grouped by connection, so they will not necessarily
/// appear consecutively in the list; they may be interleaved with messages for
/// other connections.)
virtual int ReceiveMessagesOnPollGroup( HSteamNetPollGroup hPollGroup, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) = 0;
#ifdef STEAMNETWORKINGSOCKETS_ENABLE_SDR
// Dedicated servers hosted in known data centers
// Relayed connections using custom signaling protocol
#endif // #ifndef STEAMNETWORKINGSOCKETS_ENABLE_SDR
/// Certificate provision by the application. (On Steam, Steam will handle all this automatically)
#ifndef STEAMNETWORKINGSOCKETS_STEAM
//
// Certificate provision by the application. On Steam, we normally handle all this automatically
// and you will not need to use these advanced functions.
//
/// Get blob that describes a certificate request. You can send this to your game coordinator.
/// Upon entry, *pcbBlob should contain the size of the buffer. On successful exit, it will
@@ -397,7 +440,6 @@ public:
/// Set the certificate. The certificate blob should be the output of
/// SteamDatagram_CreateCert.
virtual bool SetCertificate( const void *pCertificate, int cbCertificate, SteamNetworkingErrMsg &errMsg ) = 0;
#endif
// Invoke all callbacks queued for this interface.
// On Steam, callbacks are dispatched via the ordinary Steamworks callbacks mechanism.
@@ -409,7 +451,7 @@ public:
protected:
~ISteamNetworkingSockets(); // Silence some warnings
};
#define STEAMNETWORKINGSOCKETS_INTERFACE_VERSION "SteamNetworkingSockets006"
#define STEAMNETWORKINGSOCKETS_INTERFACE_VERSION "SteamNetworkingSockets008"
extern "C" {
-5
View File
@@ -40,11 +40,6 @@ public:
/// If cbAllocateBuffer=0, then no buffer is allocated. m_pData will be NULL,
/// m_cbSize will be zero, and m_pfnFreeData will be NULL. You will need to
/// set each of these.
///
/// You can use SteamNetworkingMessage_t::Release to free up the message
/// bookkeeping object and any associated buffer. See
/// ISteamNetworkingSockets::SendMessages for details on reference
/// counting and ownership.
virtual SteamNetworkingMessage_t *AllocateMessage( int cbAllocateBuffer ) = 0;
//
+9 -2
View File
@@ -67,10 +67,17 @@ struct SteamRelayNetworkStatus_t;
typedef uint32 HSteamNetConnection;
const HSteamNetConnection k_HSteamNetConnection_Invalid = 0;
/// Handle used to identify a "listen socket".
/// Handle used to identify a "listen socket". Unlike traditional
/// Berkeley sockets, a listen socket and a connection are two
/// different abstractions.
typedef uint32 HSteamListenSocket;
const HSteamListenSocket k_HSteamListenSocket_Invalid = 0;
/// Handle used to identify a poll group, used to query many
/// connections at once efficiently.
typedef uint32 HSteamNetPollGroup;
const HSteamNetPollGroup k_HSteamNetPollGroup_Invalid = 0;
/// Max length of diagnostic error message
const int k_cchMaxSteamNetworkingErrMsg = 1024;
@@ -86,7 +93,7 @@ typedef uint32 SteamNetworkingPOPID;
/// microseconds. This is guaranteed to increase over time during the lifetime
/// of a process, but not globally across runs. You don't need to worry about
/// the value wrapping around. Note that the underlying clock might not actually have
/// microsecond *resolution*.
/// microsecond resolution.
typedef int64 SteamNetworkingMicroseconds;
/// Describe the status of a particular network resource
@@ -173,6 +173,7 @@ void ConnectionConfig::Init( ConnectionConfig *pInherit )
CUtlHashMap<uint16, CSteamNetworkConnectionBase *, std::equal_to<uint16>, Identity<uint16> > g_mapConnections;
CUtlHashMap<int, CSteamNetworkListenSocketBase *, std::equal_to<int>, Identity<int> > g_mapListenSockets;
CUtlHashMap<int, CSteamNetworkPollGroup *, std::equal_to<int>, Identity<int> > g_mapPollGroups;
static bool BConnectionStateExistsToAPI( ESteamNetworkingConnectionState eState )
{
@@ -226,13 +227,35 @@ static CSteamNetworkConnectionBase *GetConnectionByHandleForAPI( HSteamNetConnec
static CSteamNetworkListenSocketBase *GetListenSocketByHandle( HSteamListenSocket sock )
{
if ( sock == 0 )
if ( sock == k_HSteamListenSocket_Invalid )
return nullptr;
AssertMsg( !(sock & 0x80000000), "A poll group handle was used where a listen socket handle was expected" );
int idx = sock & 0xffff;
if ( !g_mapListenSockets.IsValidIndex( idx ) )
return nullptr;
CSteamNetworkListenSocketBase *pResult = g_mapListenSockets[ idx ];
Assert( pResult && pResult->m_hListenSocketSelf == sock );
if ( pResult->m_hListenSocketSelf != sock )
{
// Slot was reused, but this handle is now invalid
return nullptr;
}
return pResult;
}
static CSteamNetworkPollGroup *GetPollGroupByHandle( HSteamNetPollGroup hPollGroup )
{
if ( hPollGroup == k_HSteamNetPollGroup_Invalid )
return nullptr;
AssertMsg( (hPollGroup & 0x80000000), "A listen socket handle was used where a poll group handle was expected" );
int idx = hPollGroup & 0xffff;
if ( !g_mapPollGroups.IsValidIndex( idx ) )
return nullptr;
CSteamNetworkPollGroup *pResult = g_mapPollGroups[ idx ];
if ( pResult->m_hPollGroupSelf != hPollGroup )
{
// Slot was reused, but this handle is now invalid
return nullptr;
}
return pResult;
}
@@ -304,6 +327,18 @@ void CSteamNetworkingSockets::KillConnections()
Assert( !g_mapListenSockets.IsValidIndex( idx ) );
}
}
// Destroy all of my poll groups
FOR_EACH_HASHMAP( g_mapPollGroups, idx )
{
CSteamNetworkPollGroup *pPollGroup = g_mapPollGroups[idx];
if ( pPollGroup->m_pSteamNetworkingSocketsInterface == this )
{
DbgVerify( DestroyPollGroup( pPollGroup->m_hPollGroupSelf ) );
Assert( !g_mapPollGroups.IsValidIndex( idx ) );
}
}
}
void CSteamNetworkingSockets::Destroy()
@@ -359,9 +394,6 @@ bool CSteamNetworkingSockets::GetIdentity( SteamNetworkingIdentity *pIdentity )
return !m_identity.IsInvalid();
}
/// Certificate provision by the application. (On Steam, Steam will handle all this automatically)
#ifndef STEAMNETWORKINGSOCKETS_STEAM
bool CSteamNetworkingSockets::GetCertificateRequest( int *pcbBlob, void *pBlob, SteamNetworkingErrMsg &errMsg )
{
SteamDatagramTransportLock scopeLock( "GetCertificateRequest" );
@@ -527,8 +559,6 @@ bool CSteamNetworkingSockets::SetCertificate( const void *pCertificate, int cbCe
return true;
}
#endif
#ifdef STEAMNETWORKINGSOCKETS_OPENSOURCE
ESteamNetworkingAvailability CSteamNetworkingSockets::InitAuthentication()
{
@@ -736,14 +766,67 @@ int CSteamNetworkingSockets::ReceiveMessagesOnConnection( HSteamNetConnection hC
return pConn->APIReceiveMessages( ppOutMessages, nMaxMessages );
}
int CSteamNetworkingSockets::ReceiveMessagesOnListenSocket( HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
HSteamNetPollGroup CSteamNetworkingSockets::CreatePollGroup()
{
SteamDatagramTransportLock scopeLock( "CreatePollGroup" );
CSteamNetworkPollGroup *pPollGroup = new CSteamNetworkPollGroup( this );
pPollGroup->AssignHandleAndAddToGlobalTable();
return pPollGroup->m_hPollGroupSelf;
}
bool CSteamNetworkingSockets::DestroyPollGroup( HSteamNetPollGroup hPollGroup )
{
SteamDatagramTransportLock scopeLock( "DestroyPollGroup" );
CSteamNetworkPollGroup *pPollGroup = GetPollGroupByHandle( hPollGroup );
if ( !pPollGroup )
return false;
delete pPollGroup;
return true;
}
bool CSteamNetworkingSockets::SetConnectionPollGroup( HSteamNetConnection hConn, HSteamNetPollGroup hPollGroup )
{
SteamDatagramTransportLock scopeLock( "SetConnectionPollGroup" );
CSteamNetworkConnectionBase *pConn = GetConnectionByHandleForAPI( hConn );
if ( !pConn )
return false;
// Special case for removing the poll group
if ( hPollGroup == k_HSteamNetPollGroup_Invalid )
{
pConn->RemoveFromPollGroup();
return true;
}
CSteamNetworkPollGroup *pPollGroup = GetPollGroupByHandle( hPollGroup );
if ( !pPollGroup )
return false;
pConn->SetPollGroup( pPollGroup );
return true;
}
int CSteamNetworkingSockets::ReceiveMessagesOnPollGroup( HSteamNetPollGroup hPollGroup, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
{
SteamDatagramTransportLock scopeLock( "ReceiveMessagesOnPollGroup" );
CSteamNetworkPollGroup *pPollGroup = GetPollGroupByHandle( hPollGroup );
if ( !pPollGroup )
return -1;
return pPollGroup->m_queueRecvMessages.RemoveMessages( ppOutMessages, nMaxMessages );
}
#ifdef STEAMNETWORKINGSOCKETS_STEAMCLIENT
int CSteamNetworkingSockets::ReceiveMessagesOnListenSocketLegacyPollGroup( HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
{
SteamDatagramTransportLock scopeLock( "ReceiveMessagesOnListenSocket" );
CSteamNetworkListenSocketBase *pSock = GetListenSocketByHandle( hSocket );
if ( !pSock )
return -1;
return pSock->APIReceiveMessages( ppOutMessages, nMaxMessages );
return pSock->m_legacyPollGroup.m_queueRecvMessages.RemoveMessages( ppOutMessages, nMaxMessages );
}
#endif
bool CSteamNetworkingSockets::GetConnectionInfo( HSteamNetConnection hConn, SteamNetConnectionInfo_t *pInfo )
{
@@ -88,7 +88,6 @@ public:
virtual void SendMessages( int nMessages, SteamNetworkingMessage_t *const *pMessages, int64 *pOutMessageNumberOrResult ) override;
virtual EResult FlushMessagesOnConnection( HSteamNetConnection hConn ) override;
virtual int ReceiveMessagesOnConnection( HSteamNetConnection hConn, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) override;
virtual int ReceiveMessagesOnListenSocket( HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) override;
virtual bool GetConnectionInfo( HSteamNetConnection hConn, SteamNetConnectionInfo_t *pInfo ) override;
virtual bool GetQuickConnectionStatus( HSteamNetConnection hConn, SteamNetworkingQuickConnectionStatus *pStats ) override;
virtual int GetDetailedConnectionStatus( HSteamNetConnection hConn, char *pszBuf, int cbBuf ) override;
@@ -96,9 +95,16 @@ public:
virtual bool CreateSocketPair( HSteamNetConnection *pOutConnection1, HSteamNetConnection *pOutConnection2, bool bUseNetworkLoopback, const SteamNetworkingIdentity *pIdentity1, const SteamNetworkingIdentity *pIdentity2 ) override;
virtual bool GetIdentity( SteamNetworkingIdentity *pIdentity ) override;
#ifndef STEAMNETWORKINGSOCKETS_STEAM
virtual HSteamNetPollGroup CreatePollGroup() override;
virtual bool DestroyPollGroup( HSteamNetPollGroup hPollGroup ) override;
virtual bool SetConnectionPollGroup( HSteamNetConnection hConn, HSteamNetPollGroup hPollGroup ) override;
virtual int ReceiveMessagesOnPollGroup( HSteamNetPollGroup hPollGroup, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) override;
virtual bool GetCertificateRequest( int *pcbBlob, void *pBlob, SteamNetworkingErrMsg &errMsg ) override;
virtual bool SetCertificate( const void *pCertificate, int cbCertificate, SteamNetworkingErrMsg &errMsg ) override;
#ifdef STEAMNETWORKINGSOCKETS_STEAMCLIENT
virtual int ReceiveMessagesOnListenSocketLegacyPollGroup( HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) override;
#endif
#ifdef STEAMNETWORKINGSOCKETS_STANDALONELIB
@@ -170,6 +170,46 @@ CSteamNetworkingMessage *CSteamNetworkingMessage::New( CSteamNetworkConnectionBa
return pMsg;
}
void CSteamNetworkingMessage::LinkBefore( CSteamNetworkingMessage *pSuccessor, Links CSteamNetworkingMessage::*pMbrLinks, SteamNetworkingMessageQueue *pQueue )
{
// Make sure we're not already in a queue
UnlinkFromQueue( pMbrLinks );
// No successor?
if ( !pSuccessor )
{
LinkToQueueTail( pMbrLinks, pQueue );
return;
}
// Otherwise, the queue cannot be empty, since it at least contains the successor
Assert( pQueue->m_pFirst );
Assert( pQueue->m_pLast );
Assert( (pSuccessor->*pMbrLinks).m_pQueue == pQueue );
CSteamNetworkingMessage *pPrev = (pSuccessor->*pMbrLinks).m_pPrev;
if ( pPrev )
{
Assert( pQueue->m_pFirst != pSuccessor );
Assert( (pPrev->*pMbrLinks).m_pNext == pSuccessor );
Assert( (pPrev->*pMbrLinks).m_pQueue == pQueue );
(pPrev->*pMbrLinks).m_pNext = this;
(this->*pMbrLinks).m_pPrev = pPrev;
}
else
{
Assert( pQueue->m_pFirst == pSuccessor );
pQueue->m_pFirst = this;
(this->*pMbrLinks).m_pPrev = nullptr; // Should already be null, but let's slam it again anyway
}
// Finish up
(this->*pMbrLinks).m_pQueue = pQueue;
(this->*pMbrLinks).m_pNext = pSuccessor;
(pSuccessor->*pMbrLinks).m_pPrev = this;
}
void CSteamNetworkingMessage::LinkToQueueTail( Links CSteamNetworkingMessage::*pMbrLinks, SteamNetworkingMessageQueue *pQueue )
{
// Locate previous link that should point to us.
@@ -275,6 +315,94 @@ int SteamNetworkingMessageQueue::RemoveMessages( SteamNetworkingMessage_t **ppOu
return nMessagesReturned;
}
/////////////////////////////////////////////////////////////////////////////
//
// CSteamNetworkPollGroup
//
/////////////////////////////////////////////////////////////////////////////
CSteamNetworkPollGroup::CSteamNetworkPollGroup( CSteamNetworkingSockets *pInterface )
: m_pSteamNetworkingSocketsInterface( pInterface )
, m_hPollGroupSelf( k_HSteamListenSocket_Invalid )
{
}
CSteamNetworkPollGroup::~CSteamNetworkPollGroup()
{
FOR_EACH_VEC_BACK( m_vecConnections, i )
{
CSteamNetworkConnectionBase *pConn = m_vecConnections[i];
Assert( pConn->m_pPollGroup == this );
pConn->RemoveFromPollGroup();
Assert( m_vecConnections.Count() == i );
}
// We should not have any messages now! but if we do, unlink them
Assert( m_queueRecvMessages.empty() );
// But if we do, unlink them but leave them in the main queue.
while ( !m_queueRecvMessages.empty() )
{
CSteamNetworkingMessage *pMsg = m_queueRecvMessages.m_pFirst;
// The poll group queue is the "secondary queue"
Assert( pMsg->m_linksSecondaryQueue.m_pQueue == &m_queueRecvMessages );
// They should be in some other queue (for the connection) as the main queue.
// That owns them and make sure they get deleted!
Assert( pMsg->m_links.m_pQueue != nullptr );
// OK, do the work
pMsg->UnlinkFromQueue( &CSteamNetworkingMessage::m_linksSecondaryQueue );
// Make sure it worked.
Assert( pMsg != m_queueRecvMessages.m_pFirst );
}
// Remove us from global table, if we're in it
if ( m_hPollGroupSelf != k_HSteamNetPollGroup_Invalid )
{
int idx = m_hPollGroupSelf & 0xffff;
if ( g_mapPollGroups.IsValidIndex( idx ) && g_mapPollGroups[ idx ] == this )
{
g_mapPollGroups[ idx ] = nullptr; // Just for grins
g_mapPollGroups.Remove( idx );
}
else
{
AssertMsg( false, "Poll group handle bookkeeping bug!" );
}
m_hPollGroupSelf = k_HSteamNetPollGroup_Invalid;
}
}
void CSteamNetworkPollGroup::AssignHandleAndAddToGlobalTable()
{
Assert( m_hPollGroupSelf == k_HSteamNetPollGroup_Invalid );
// We actually don't do map "lookups". We assume the number of listen sockets
// is going to be reasonably small.
static int s_nDummy;
++s_nDummy;
int idx = g_mapPollGroups.Insert( s_nDummy, this );
Assert( idx < 0x1000 );
// Use upper 15 bits as a connection sequence number, so that listen socket handles
// are not reused within a short time period.
// (The top bit is reserved, so that listen socket handles and poll group handles
// come from a different namespace, so that we can immediately detect using the wrong
// and make that bug more obvious.)
static uint32 s_nUpperBits = 0;
s_nUpperBits += 0x10000;
if ( s_nUpperBits & 0x10000000 )
s_nUpperBits = 0x10000;
// Set the handle
m_hPollGroupSelf = HSteamNetPollGroup( idx | s_nUpperBits | 0x80000000 );
}
/////////////////////////////////////////////////////////////////////////////
//
// CSteamNetworkListenSocketBase
@@ -283,14 +411,17 @@ int SteamNetworkingMessageQueue::RemoveMessages( SteamNetworkingMessage_t **ppOu
CSteamNetworkListenSocketBase::CSteamNetworkListenSocketBase( CSteamNetworkingSockets *pSteamNetworkingSocketsInterface )
: m_pSteamNetworkingSocketsInterface( pSteamNetworkingSocketsInterface )
, m_hListenSocketSelf( k_HSteamListenSocket_Invalid )
#ifdef STEAMNETWORKINGSOCKETS_STEAMCLIENT
, m_legacyPollGroup( pSteamNetworkingSocketsInterface )
#endif
{
m_hListenSocketSelf = k_HSteamListenSocket_Invalid;
m_connectionConfig.Init( &pSteamNetworkingSocketsInterface->m_connectionConfig );
}
CSteamNetworkListenSocketBase::~CSteamNetworkListenSocketBase()
{
AssertMsg( m_mapChildConnections.Count() == 0 && !m_queueRecvMessages.m_pFirst && !m_queueRecvMessages.m_pLast, "Destroy() not used properly" );
AssertMsg( m_mapChildConnections.Count() == 0, "Destroy() not used properly" );
// Remove us from global table, if we're in it
if ( m_hListenSocketSelf != k_HSteamListenSocket_Invalid )
@@ -299,7 +430,7 @@ CSteamNetworkListenSocketBase::~CSteamNetworkListenSocketBase()
if ( g_mapListenSockets.IsValidIndex( idx ) && g_mapListenSockets[ idx ] == this )
{
g_mapListenSockets[ idx ] = nullptr; // Just for grins
g_mapListenSockets.RemoveAt( idx );
g_mapListenSockets.Remove( idx );
}
else
{
@@ -323,11 +454,14 @@ bool CSteamNetworkListenSocketBase::BInitListenSocketCommon( int nOptions, const
int idx = g_mapListenSockets.Insert( s_nDummy, this );
Assert( idx < 0x1000 );
// Use upper 16 bits as a connection sequence number, so that listen socket handles
// Use upper 15 bits as a connection sequence number, so that listen socket handles
// are not reused within a short time period.
// (The top bit is reserved, so that listen socket handles and poll group handles
// come from a different namespace, so that we can immediately detect using the wrong
// and make that bug more obvious.)
static uint32 s_nUpperBits = 0;
s_nUpperBits += 0x10000;
if ( s_nUpperBits == 0 )
if ( s_nUpperBits & 0x10000000 )
s_nUpperBits = 0x10000;
// Add it to our table of listen sockets
@@ -381,11 +515,6 @@ bool CSteamNetworkListenSocketBase::APIGetAddress( SteamNetworkingIPAddr *pAddre
return false;
}
int CSteamNetworkListenSocketBase::APIReceiveMessages( SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
{
return m_queueRecvMessages.RemoveMessages( ppOutMessages, nMaxMessages );
}
void CSteamNetworkListenSocketBase::AddChildConnection( CSteamNetworkConnectionBase *pConn )
{
Assert( pConn->m_pParentListenSocket == nullptr );
@@ -402,6 +531,12 @@ void CSteamNetworkListenSocketBase::AddChildConnection( CSteamNetworkConnectionB
// Connection configuration will inherit from us
pConn->m_connectionConfig.Init( &m_connectionConfig );
// If we are possibly providing an old interface that did not have poll groups,
// add the connection to the default poll group
#ifdef STEAMNETWORKINGSOCKETS_STEAMCLIENT
pConn->SetPollGroup( &m_legacyPollGroup );
#endif
}
void CSteamNetworkListenSocketBase::AboutToDestroyChildConnection( CSteamNetworkConnectionBase *pConn )
@@ -452,6 +587,7 @@ CSteamNetworkConnectionBase::CSteamNetworkConnectionBase( CSteamNetworkingSocket
m_unConnectionIDLocal = 0;
m_unConnectionIDRemote = 0;
m_pParentListenSocket = nullptr;
m_pPollGroup = nullptr;
m_hSelfInParentListenSocketMap = -1;
m_pMessagesInterface = nullptr;
m_pMessagesSession = nullptr;
@@ -520,6 +656,9 @@ void CSteamNetworkConnectionBase::FreeResources()
// Discard any messages that weren't retrieved
m_queueRecvMessages.PurgeMessages();
// If we are in a poll group, remove us from the group
RemoveFromPollGroup();
// Detach from the listen socket that owns us, if any
if ( m_pParentListenSocket )
m_pParentListenSocket->AboutToDestroyChildConnection( this );
@@ -577,6 +716,95 @@ void CSteamNetworkConnectionBase::DestroyTransport()
}
}
void CSteamNetworkConnectionBase::RemoveFromPollGroup()
{
if ( !m_pPollGroup )
return;
// Scan all of our messages, and make sure they are not in the secondary queue
for ( CSteamNetworkingMessage *pMsg = m_queueRecvMessages.m_pFirst ; pMsg ; pMsg = pMsg->m_linksSecondaryQueue.m_pNext )
{
Assert( pMsg->m_links.m_pQueue == &m_queueRecvMessages );
// It *should* be in the secondary queue of the poll group
Assert( pMsg->m_linksSecondaryQueue.m_pQueue == &m_pPollGroup->m_queueRecvMessages );
// OK, do the work
pMsg->UnlinkFromQueue( &CSteamNetworkingMessage::m_linksSecondaryQueue );
}
// Remove us from the poll group's list. DbgVerify because we should be in the list!
DbgVerify( m_pPollGroup->m_vecConnections.FindAndFastRemove( this ) );
// We're not in a poll group anymore
m_pPollGroup = nullptr;
}
void CSteamNetworkConnectionBase::SetPollGroup( CSteamNetworkPollGroup *pPollGroup )
{
// Quick early-out for no change
if ( m_pPollGroup == pPollGroup )
return;
// Clearing it?
if ( !pPollGroup )
{
RemoveFromPollGroup();
return;
}
// Scan all messages that are already queued for this connection,
// and insert them into the poll groups queue in the (approximate)
// appropriate spot. Using local timestamps should be really close
// for ordering messages between different connections. Remember
// that the API very clearly does not provide strong guarantees
// regarding ordering of messages from different connections, and
// really anybody who is expecting or relying on such guarantees
// is probably doing something wrong.
CSteamNetworkingMessage *pInsertBefore = pPollGroup->m_queueRecvMessages.m_pFirst;
for ( CSteamNetworkingMessage *pMsg = m_queueRecvMessages.m_pFirst ; pMsg ; pMsg = pMsg->m_links.m_pNext )
{
Assert( pMsg->m_links.m_pQueue == &m_queueRecvMessages );
// Unlink it from existing poll group queue, if any
if ( pMsg->m_linksSecondaryQueue.m_pQueue )
{
Assert( m_pPollGroup && pMsg->m_linksSecondaryQueue.m_pQueue == &m_pPollGroup->m_queueRecvMessages );
pMsg->UnlinkFromQueue( &CSteamNetworkingMessage::m_linksSecondaryQueue );
}
else
{
Assert( !m_pPollGroup );
}
// Scan forward in the poll group message queue, until we find the insertion point
for (;;)
{
// End of queue?
if ( !pInsertBefore )
{
pMsg->LinkToQueueTail( &CSteamNetworkingMessage::m_linksSecondaryQueue, &pPollGroup->m_queueRecvMessages );
break;
}
Assert( pInsertBefore->m_linksSecondaryQueue.m_pQueue == &pPollGroup->m_queueRecvMessages );
if ( pInsertBefore->m_usecTimeReceived > pMsg->m_usecTimeReceived )
{
pMsg->LinkBefore( pInsertBefore, &CSteamNetworkingMessage::m_linksSecondaryQueue, &pPollGroup->m_queueRecvMessages );
break;
}
pInsertBefore = pInsertBefore->m_linksSecondaryQueue.m_pNext;
}
}
m_pPollGroup = pPollGroup;
Assert( !m_pPollGroup->m_vecConnections.HasElement( this ) );
m_pPollGroup->m_vecConnections.AddToTail( this );
}
bool CSteamNetworkConnectionBase::BInitConnection( SteamNetworkingMicroseconds usecNow, int nOptions, const SteamNetworkingConfigValue_t *pOptions, SteamDatagramErrMsg &errMsg )
{
// Make sure MTU values are initialized
@@ -1977,9 +2205,9 @@ void CSteamNetworkConnectionBase::ReceivedMessage( CSteamNetworkingMessage *pMsg
// Add to end of my queue.
pMsg->LinkToQueueTail( &CSteamNetworkingMessage::m_links, &m_queueRecvMessages );
// If we are an inbound, accepted connection, link into the listen socket's queue
if ( m_pParentListenSocket )
pMsg->LinkToQueueTail( &CSteamNetworkingMessage::m_linksSecondaryQueue, &m_pParentListenSocket->m_queueRecvMessages );
// Add to the poll group, if we are in one
if ( m_pPollGroup )
pMsg->LinkToQueueTail( &CSteamNetworkingMessage::m_linksSecondaryQueue, &m_pPollGroup->m_queueRecvMessages );
}
void CSteamNetworkConnectionBase::PostConnectionStateChangedCallback( ESteamNetworkingConnectionState eOldAPIState, ESteamNetworkingConnectionState eNewAPIState )
@@ -145,6 +145,33 @@ public:
virtual void ConnectionStateChanged( ESteamNetworkingConnectionState eOldState, ESteamNetworkingConnectionState eNewState ) = 0;
};
/////////////////////////////////////////////////////////////////////////////
//
// CSteamNetworkPollGroup
//
/////////////////////////////////////////////////////////////////////////////
class CSteamNetworkPollGroup
{
public:
CSteamNetworkPollGroup( CSteamNetworkingSockets *pInterface );
~CSteamNetworkPollGroup();
/// What interface is responsible for this listen socket?
CSteamNetworkingSockets *const m_pSteamNetworkingSocketsInterface;
/// Linked list of messages received through any connection on this listen socket
SteamNetworkingMessageQueue m_queueRecvMessages;
/// Index into the global list
HSteamNetPollGroup m_hPollGroupSelf;
/// List of connections that are in this poll group
CUtlVector<CSteamNetworkConnectionBase *> m_vecConnections;
void AssignHandleAndAddToGlobalTable();
};
/////////////////////////////////////////////////////////////////////////////
//
// CSteamNetworkListenSocketBase
@@ -165,15 +192,11 @@ public:
/// This gets called on an accepted connection before it gets destroyed
virtual void AboutToDestroyChildConnection( CSteamNetworkConnectionBase *pConn );
int APIReceiveMessages( SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages );
virtual bool APIGetAddress( SteamNetworkingIPAddr *pAddress );
/// Map of child connections
CUtlHashMap<RemoteConnectionKey_t, CSteamNetworkConnectionBase *, std::equal_to<RemoteConnectionKey_t>, RemoteConnectionKey_t::Hash > m_mapChildConnections;
/// Linked list of messages received through any connection on this listen socket
SteamNetworkingMessageQueue m_queueRecvMessages;
/// Index into the global list
HSteamListenSocket m_hListenSocketSelf;
@@ -183,6 +206,11 @@ public:
/// Configuration options that will apply to all connections accepted through this listen socket
ConnectionConfig m_connectionConfig;
/// For legacy interface.
#ifdef STEAMNETWORKINGSOCKETS_STEAMCLIENT
CSteamNetworkPollGroup m_legacyPollGroup;
#endif
protected:
CSteamNetworkListenSocketBase( CSteamNetworkingSockets *pSteamNetworkingSocketsInterface );
virtual ~CSteamNetworkListenSocketBase(); // hidden destructor, don't call directly. Use Destroy()
@@ -326,6 +354,15 @@ public:
/// The listen socket through which we were accepted, if any.
CSteamNetworkListenSocketBase *m_pParentListenSocket;
/// What poll group are we assigned to?
CSteamNetworkPollGroup *m_pPollGroup;
/// Assign poll group
void SetPollGroup( CSteamNetworkPollGroup *pPollGroup );
/// Remove us from the poll group we are in (if any)
void RemoveFromPollGroup();
/// Was this connection initiated locally (we are the "client") or remotely (we are the "server")?
/// In *most* use cases, "server" cnonections have a listen socket, but not always.
bool m_bConnectionInitiatedRemotely;
@@ -761,6 +798,7 @@ inline void SendPacketContext<TStatsMsg>::CalcMaxEncryptedPayloadSize( size_t cb
extern CUtlHashMap<uint16, CSteamNetworkConnectionBase *, std::equal_to<uint16>, Identity<uint16> > g_mapConnections;
extern CUtlHashMap<int, CSteamNetworkListenSocketBase *, std::equal_to<int>, Identity<int> > g_mapListenSockets;
extern CUtlHashMap<int, CSteamNetworkPollGroup *, std::equal_to<int>, Identity<int> > g_mapPollGroups;
extern bool BCheckGlobalSpamReplyRateLimit( SteamNetworkingMicroseconds usecNow );
extern CSteamNetworkConnectionBase *GetConnectionByHandle( HSteamNetConnection sock );
@@ -83,11 +83,6 @@ STEAMNETWORKINGSOCKETS_INTERFACE int SteamAPI_ISteamNetworkingSockets_ReceiveMes
return ((ISteamNetworkingSockets*)instancePtr)->ReceiveMessagesOnConnection( hConn, ppOutMessages, nMaxMessages );
}
STEAMNETWORKINGSOCKETS_INTERFACE int SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnListenSocket( intptr_t instancePtr, HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
{
return ((ISteamNetworkingSockets*)instancePtr)->ReceiveMessagesOnListenSocket( hSocket, ppOutMessages, nMaxMessages );
}
STEAMNETWORKINGSOCKETS_INTERFACE bool SteamAPI_ISteamNetworkingSockets_GetConnectionInfo( intptr_t instancePtr, HSteamNetConnection hConn, SteamNetConnectionInfo_t *pInfo )
{
return ((ISteamNetworkingSockets*)instancePtr)->GetConnectionInfo( hConn, pInfo );
@@ -78,6 +78,7 @@ public:
/// P2P channel, depending on message type)
Links m_linksSecondaryQueue;
void LinkBefore( CSteamNetworkingMessage *pSuccessor, Links CSteamNetworkingMessage::*pMbrLinks, SteamNetworkingMessageQueue *pQueue );
void LinkToQueueTail( Links CSteamNetworkingMessage::*pMbrLinks, SteamNetworkingMessageQueue *pQueue );
void UnlinkFromQueue( Links CSteamNetworkingMessage::*pMbrLinks );