Remove empty PoolNode entries from ConnectionPool and InUsePool

ConnectionPool's internal HashMap never removed PoolNode entries after all
their connections were drained. Each empty PoolNode retains ~1 KB (mainly
from the pre-allocated ArrayQueue), so workloads connecting to many unique
upstreams over time would see unbounded memory growth.

Root cause: there was exactly one code path inserting keys into the pool
HashMap (get_pool_node → pool.insert) and zero code paths removing them.

The fix adds inline cleanup at each point where a node can become empty:

- get(): after taking the last connection (important because idle_poll/
  idle_timeout exit via watch_use and never call pop_closed)
- pop_evicted(): after LRU eviction removes a connection
- pop_closed(): inherits cleanup from the above paths

Cleanup uses a double-check pattern: callers first check is_empty() (a
cheap atomic load on the hot queue that short-circuits in the common case),
then call try_remove_empty_node() which re-verifies under the write lock
to avoid removing a node that was concurrently repopulated.

The same bug existed in InUsePool (pingora-core h2 connector), which is
also fixed here with the same pattern applied to get() and release().

Hot-path cost: one additional atomic load (~1 ns) when connections remain
in the node. The write lock is only acquired on the cold path when a node
actually empties, matching the cleanup strategy used by hyper-util and
Go's net/http.
This commit is contained in:
zaidoon
2026-02-17 02:19:23 -05:00
committed by Edward Wang
parent b2bba812aa
commit 0f1248114f
3 changed files with 315 additions and 8 deletions
+1 -1
View File
@@ -1 +1 @@
1ec646eede6044342d0511e1154d0a6952327330
914233a5fbd65a92534e0b0ea3108c33925bf947
+45 -7
View File
@@ -175,6 +175,21 @@ impl InUsePool {
}
}
/// Attempt to remove an empty [`PoolNode`] entry from the pools `HashMap`.
///
/// Same rationale as [`ConnectionPool::try_remove_empty_node`]: prevents
/// unbounded growth when many unique reuse hashes are seen over time.
/// The write lock + re-check ensures we never remove a node that was
/// concurrently repopulated.
fn try_remove_empty_node(&self, reuse_hash: u64) {
let mut pools = self.pools.write();
if let Some(pool) = pools.get(&reuse_hash) {
if pool.is_empty() {
pools.remove(&reuse_hash);
}
}
}
pub fn insert(&self, reuse_hash: u64, conn: ConnectionRef) {
{
let pools = self.pools.read();
@@ -194,19 +209,42 @@ impl InUsePool {
// the caller should return the conn ref to this pool if there are still
// capacity left for more streams
pub fn get(&self, reuse_hash: u64) -> Option<ConnectionRef> {
let pools = self.pools.read();
pools.get(&reuse_hash)?.get_any().map(|v| v.1)
let (result, maybe_empty) = {
let pools = self.pools.read();
match pools.get(&reuse_hash) {
Some(pool) => match pool.get_any() {
Some((_, conn)) => (Some(conn), pool.is_empty()),
None => (None, true),
},
None => (None, false),
}
}; // read lock released here
if maybe_empty {
self.try_remove_empty_node(reuse_hash);
}
result
}
// release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist)
// the caller should update the ref and then decide where to put it (in use pool or idle)
pub fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option<ConnectionRef> {
let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) {
pool.remove(id)
} else {
None
let (result, maybe_empty) = {
let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) {
let removed = pool.remove(id);
(removed, pool.is_empty())
} else {
(None, false)
}
}; // read lock released here
if maybe_empty {
self.try_remove_empty_node(reuse_hash);
}
result
}
}
+269
View File
@@ -124,6 +124,25 @@ impl<T> PoolNode<T> {
}
}
/// Returns `true` if the pool node contains no connections in either the hot queue
/// or the overflow hash map.
///
/// # Concurrency note
///
/// This check is not atomic across the two internal stores (`hot_queue` and
/// `connections`). Between checking one and the other, a concurrent `insert` or
/// `get_any` could change the state. This is acceptable because callers use
/// `is_empty` only as a hint to attempt cleanup, and always re-verify under
/// an exclusive (write) lock before actually removing the node from the parent
/// pool HashMap. A false-negative simply defers cleanup to the next opportunity;
/// a false-positive is largely mitigated by the re-check (see
/// [`ConnectionPool::try_remove_empty_node`] for residual race-window analysis).
pub fn is_empty(&self) -> bool {
// Check the lock-free queue first (cheap atomic load) to avoid acquiring
// the mutex in the common case where connections are present.
self.hot_queue.is_empty() && self.connections.lock().is_empty()
}
// This function acquires 2 locks and iterates over the entire hot queue.
// But it should be fine because remove() rarely happens on a busy PoolNode.
/// Remove the item associated with the id from the pool. The item is returned
@@ -201,6 +220,38 @@ impl<S> ConnectionPool<S> {
}
}
/// Attempt to remove an empty [`PoolNode`] entry from the pool `HashMap`.
///
/// This prevents unbounded growth of the pool map when many unique group keys
/// are seen over the lifetime of the pool (e.g. connecting to many distinct
/// upstreams). Without this cleanup, each unique `GroupKey` leaves an
/// empty `PoolNode` behind even after all its connections are gone.
///
/// The method acquires the pool write lock and re-checks emptiness to avoid
/// removing a node that was concurrently repopulated between the caller's
/// initial `is_empty()` hint and this write-lock acquisition.
///
/// # Race window
///
/// There is a narrow window where another thread could have called
/// [`get_pool_node`] (obtaining a clone of the `Arc<PoolNode>`) just before
/// we remove the entry. If that thread then inserts a connection into the
/// now-orphaned node, the connection is dropped when the last `Arc` reference
/// goes away. This is benign: the `oneshot::Sender` inside the dropped
/// `PoolConnection` is also dropped, which resolves the corresponding
/// `watch_use` receiver in `idle_poll`/`idle_timeout`, causing a clean exit.
/// The next request to the same upstream simply creates a fresh connection.
/// This trade-off matches the existing concurrency model of the pool and is
/// consistent with how hyper-util and Go's `net/http` handle this case.
fn try_remove_empty_node(&self, key: GroupKey) {
let mut pool = self.pool.write();
if let Some(node) = pool.get(&key) {
if node.is_empty() {
pool.remove(&key);
}
}
}
// only remove from the pool because lru already removed it
fn pop_evicted(&self, meta: &ConnectionMeta) {
let pool_node = {
@@ -216,6 +267,14 @@ impl<S> ConnectionPool<S> {
pool_node.remove(meta.id);
debug!("evict fd: {} from key {}", meta.id, meta.key);
// Clean up the PoolNode entry if it is now empty, to prevent unbounded
// growth of the pool HashMap.
// The is_empty() check avoids acquiring the write lock in the common case
// where other connections still exist under this key.
if pool_node.is_empty() {
self.try_remove_empty_node(meta.key);
}
}
pub fn pop_closed(&self, meta: &ConnectionMeta) {
@@ -236,8 +295,19 @@ impl<S> ConnectionPool<S> {
if let Some((id, connection)) = pool_node.get_any() {
self.lru.pop(&id); // the notified is not needed
// Clean up the now-empty node. This path is important because when a
// connection is retrieved (not evicted), the idle_poll/idle_timeout
// tasks exit via the watch_use channel and never call pop_closed(),
// so pop_evicted's cleanup would never run for this key.
if pool_node.is_empty() {
self.try_remove_empty_node(*key);
}
Some(connection.release())
} else {
// The node exists but has no connections. Clean it up.
self.try_remove_empty_node(*key);
None
}
}
@@ -531,4 +601,203 @@ mod tests {
let _ = cp.get(&meta1.key).unwrap(); // mock_io3 should be selected
assert!(cp.get(&meta1.key).is_none()) // mock_io1 should already be removed by idle_poll
}
#[test]
fn test_pool_node_is_empty() {
let node: PoolNode<String> = PoolNode::new();
assert!(node.is_empty(), "newly created node should be empty");
node.insert(1, "v1".to_string());
assert!(!node.is_empty(), "node with one item should not be empty");
// get_any removes the item
let item = node.get_any();
assert!(item.is_some());
assert!(node.is_empty(), "node should be empty after get_any");
// insert then remove by id
node.insert(2, "v2".to_string());
assert!(!node.is_empty());
let removed = node.remove(2);
assert!(removed.is_some());
assert!(node.is_empty(), "node should be empty after remove");
}
#[test]
fn test_pool_node_is_empty_overflow_to_connections() {
// Fill the hot queue (capacity = HOT_QUEUE_SIZE = 16), then overflow
// into the connections HashMap, and verify is_empty drains both.
let node: PoolNode<String> = PoolNode::new();
for i in 0..(HOT_QUEUE_SIZE as i32 + 4) {
node.insert(i, format!("v{i}"));
}
assert!(!node.is_empty());
// Drain all items via get_any
while node.get_any().is_some() {}
assert!(node.is_empty(), "node should be empty after draining all");
}
#[tokio::test]
async fn test_empty_node_removed_after_pop_closed() {
// Reproducer from GitHub issue #748: a single connection is added and
// then closed. The PoolNode entry in the pool HashMap must be removed.
let meta = ConnectionMeta::new(101, 1);
let cp: ConnectionPool<String> = ConnectionPool::new(2);
cp.put(&meta, "v1".to_string());
assert_eq!(cp.pool.read().len(), 1, "pool should have 1 node");
cp.pop_closed(&meta);
assert_eq!(
cp.pool.read().len(),
0,
"empty PoolNode should be removed after pop_closed"
);
}
#[tokio::test]
async fn test_empty_node_removed_after_get() {
// When the last connection is retrieved via get(), the PoolNode should
// be cleaned up. This path is distinct from pop_closed because the
// idle_poll/idle_timeout tasks exit via the watch_use channel and never
// call pop_closed.
let meta = ConnectionMeta::new(101, 1);
let cp: ConnectionPool<String> = ConnectionPool::new(2);
cp.put(&meta, "v1".to_string());
assert_eq!(cp.pool.read().len(), 1);
let conn = cp.get(&meta.key);
assert!(conn.is_some());
assert_eq!(
cp.pool.read().len(),
0,
"empty PoolNode should be removed after get() takes the last connection"
);
}
#[tokio::test]
async fn test_empty_node_removed_when_get_finds_empty_node() {
// If a node exists but has no connections (e.g. they were all evicted
// by the LRU), get() should clean up the empty node.
let meta1 = ConnectionMeta::new(101, 1);
let meta2 = ConnectionMeta::new(101, 2);
let cp: ConnectionPool<String> = ConnectionPool::new(4);
cp.put(&meta1, "v1".to_string());
cp.put(&meta2, "v2".to_string());
// Remove both connections via pop_closed, but the first pop_closed
// won't remove the node since meta2 is still there.
cp.pop_closed(&meta1);
assert_eq!(cp.pool.read().len(), 1, "node should still exist");
cp.pop_closed(&meta2);
assert_eq!(
cp.pool.read().len(),
0,
"node should be removed after last connection is popped"
);
}
#[tokio::test]
async fn test_node_not_removed_when_connections_remain() {
// Removing one connection from a node that has others must NOT remove
// the node itself.
let meta1 = ConnectionMeta::new(101, 1);
let meta2 = ConnectionMeta::new(101, 2);
let cp: ConnectionPool<String> = ConnectionPool::new(4);
cp.put(&meta1, "v1".to_string());
cp.put(&meta2, "v2".to_string());
cp.pop_closed(&meta1);
assert!(
cp.pool.read().contains_key(&101),
"node should still exist because meta2's connection is still in it"
);
assert_eq!(cp.pool.read().len(), 1);
// The remaining connection should still be retrievable
let conn = cp.get(&meta1.key);
assert!(conn.is_some());
}
#[tokio::test]
async fn test_empty_node_cleanup_only_affects_target_key() {
// Cleaning up an empty node for one key must not affect other keys.
let meta_a = ConnectionMeta::new(101, 1);
let meta_b = ConnectionMeta::new(202, 2);
let cp: ConnectionPool<String> = ConnectionPool::new(4);
cp.put(&meta_a, "a".to_string());
cp.put(&meta_b, "b".to_string());
assert_eq!(cp.pool.read().len(), 2);
// Remove all connections for key 101
cp.pop_closed(&meta_a);
assert_eq!(
cp.pool.read().len(),
1,
"only key 101's empty node should be removed"
);
assert!(!cp.pool.read().contains_key(&101), "key 101 should be gone");
assert!(cp.pool.read().contains_key(&202), "key 202 should remain");
// key 202's connection should still be retrievable
let conn = cp.get(&meta_b.key);
assert_eq!(conn, Some("b".to_string()));
}
#[tokio::test]
async fn test_empty_node_cleaned_after_lru_eviction() {
// When LRU eviction removes the last connection for a key, the empty
// node should be cleaned up by pop_evicted (called from put()).
let meta1 = ConnectionMeta::new(101, 1);
let meta2 = ConnectionMeta::new(202, 2);
let cp: ConnectionPool<String> = ConnectionPool::new(1);
cp.put(&meta1, "v1".to_string());
assert_eq!(cp.pool.read().len(), 1);
// This put evicts meta1 (LRU size = 1), making key 101's node empty.
cp.put(&meta2, "v2".to_string());
assert!(
!cp.pool.read().contains_key(&101),
"key 101's empty node should be removed after its only connection was evicted"
);
assert!(cp.pool.read().contains_key(&202));
}
#[tokio::test]
async fn test_node_reusable_after_cleanup() {
// After an empty node is cleaned up, inserting a new connection for the
// same key should work correctly (a new PoolNode is created).
let meta1 = ConnectionMeta::new(101, 1);
let cp: ConnectionPool<String> = ConnectionPool::new(4);
cp.put(&meta1, "first".to_string());
cp.pop_closed(&meta1);
assert_eq!(cp.pool.read().len(), 0, "node should be cleaned up");
// Re-insert for the same key
let meta2 = ConnectionMeta::new(101, 2);
cp.put(&meta2, "second".to_string());
assert_eq!(cp.pool.read().len(), 1);
let conn = cp.get(&meta2.key);
assert_eq!(conn, Some("second".to_string()));
assert_eq!(
cp.pool.read().len(),
0,
"node should be cleaned up again after get"
);
}
}