From 30a43b019569625d2d9972c4fe89e06dbdc09e64 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 4 Aug 2023 16:06:16 +0200 Subject: [PATCH] Close connection pool even if some connections are leased/in creation (#86) Currently closing a pool, that has leased connections or currently creates connections fails. Such a close attempt brings the pool in an unrecoverable closing state and may lead to crashes. This patch changes the pool shutdown behavior to allow closing the pool even if the pool is not a predefined state. --- .../ConnectionPool/ConnectionPool.swift | 157 +++++++++++------- .../ConnectionPool/RedisConnectionPool.swift | 1 - .../RediStackTests/ConnectionPoolTests.swift | 89 ++++++++++ 3 files changed, 187 insertions(+), 60 deletions(-) diff --git a/Sources/RediStack/ConnectionPool/ConnectionPool.swift b/Sources/RediStack/ConnectionPool/ConnectionPool.swift index f2a794e..af4b32f 100644 --- a/Sources/RediStack/ConnectionPool/ConnectionPool.swift +++ b/Sources/RediStack/ConnectionPool/ConnectionPool.swift @@ -143,7 +143,8 @@ internal final class ConnectionPool { } /// Deactivates this connection pool. Once this is called, no further connections can be obtained - /// from the pool. Leased connections are not deactivated and can continue to be used. + /// from the pool. Leased connections are not deactivated and can continue to be used. All waiters + /// are failed with a pool is closed error. func close(promise: EventLoopPromise? = nil, logger: Logger) { if self.loop.inEventLoop { self.closePool(promise: promise, logger: logger) @@ -228,7 +229,7 @@ extension ConnectionPool { switch self.state { case .closing: // We don't want this anymore, drop it. - _ = connection.close() + self.closeConnectionForShutdown(connection) case .closed: // This is programmer error, we shouldn't have entered this state. logger.critical("new connection created on a closed pool", metadata: [ @@ -250,10 +251,21 @@ extension ConnectionPool { RedisLogging.MetadataKeys.error: "\(error)" ]) - guard case .active = self.state else { - // No point continuing connection creation if we're not active. - logger.trace("not creating new connections due to inactivity") + switch self.state { + case .active: + break // continue further down + + case .closing(let remaining, let promise): + if remaining == 1 { + self.state = .closed + promise?.succeed() + } else { + self.state = .closing(remaining: remaining - 1, promise) + } return + + case .closed: + preconditionFailure("Invalid state: \(self.state)") } // Ok, we're still active. Before we do anything, we want to check whether anyone is still waiting @@ -315,39 +327,42 @@ extension ConnectionPool { private func closePool(promise: EventLoopPromise?, logger: Logger) { self.loop.preconditionInEventLoop() - // Pool closure must be monotonic. - guard case .active = self.state else { - logger.info("received duplicate request to close connection pool") - promise?.succeed(()) + switch self.state { + case .active: + self.state = .closing(remaining: self.activeConnectionCount, promise) + + case .closing(let count, let existingPromise): + if let existingPromise = existingPromise { + existingPromise.futureResult.cascade(to: promise) + } else { + self.state = .closing(remaining: count, promise) + } + return + + case .closed: + promise?.succeed() return } - self.state = .closing - - // To close the pool we need to drop all active connections. - let connections = self.availableConnections - self.availableConnections = [] - let closeFutures = connections.map { $0.close() } - // We also cancel all pending leases. while let pendingLease = self.connectionWaiters.popFirst() { pendingLease.fail(RedisConnectionPoolError.poolClosed) } - guard self.activeConnectionCount == 0 else { - logger.debug("not closing pool, waiting for all connections to be returned", metadata: [ - RedisLogging.MetadataKeys.poolConnectionCount: "\(self.activeConnectionCount)" - ]) - promise?.fail(RedisConnectionPoolError.poolHasActiveConnections) + if self.activeConnectionCount == 0 { + // That was all the connections, so this is now closed. + logger.trace("pool is now closed") + self.state = .closed + promise?.succeed() return } - // That was all the connections, so this is now closed. - logger.trace("pool is now closed") - self.state = .closed - EventLoopFuture - .andAllSucceed(closeFutures, on: self.loop) - .cascade(to: promise) + // To close the pool we need to drop all active connections. + let connections = self.availableConnections + self.availableConnections = [] + for connection in connections { + self.closeConnectionForShutdown(connection) + } } /// This is the on-thread implementation for leasing connections out to users. Here we work out how to get a new @@ -401,13 +416,24 @@ extension ConnectionPool { private func _returnLeasedConnection(_ connection: RedisConnection, logger: Logger) { self.loop.assertInEventLoop() self.leasedConnectionCount -= 1 - self._returnConnection(connection, logger: logger) + + switch self.state { + case .active: + self._returnConnection(connection, logger: logger) + + case .closing: + return self.closeConnectionForShutdown(connection) + + case .closed: + preconditionFailure("Invalid state: \(self.state)") + } } /// This is the on-thread implementation for returning connections to the pool. Here we work out what to do with a newly-acquired /// connection. private func _returnConnection(_ connection: RedisConnection, logger: Logger) { self.loop.assertInEventLoop() + precondition(self.state.isActive) guard connection.isConnected else { // This connection isn't active anymore. We'll dump it and potentially kick off a reconnection. @@ -415,37 +441,41 @@ extension ConnectionPool { return } - switch self.state { - case .active: - // If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room - // in the pool, we'll put it there. Otherwise, we'll close it. - if let waiter = self.connectionWaiters.popFirst() { - self.leaseConnection(connection, to: waiter) - } else if self.canAddConnectionToPool { - self.availableConnections.append(connection) - } else if let evictable = self.availableConnections.popFirst() { - // We have at least one pooled connection. The returned is more recently active, so kick out the pooled - // connection in favour of this one and close the recently evicted one. - self.availableConnections.append(connection) - _ = evictable.close() - } else { - // We don't need it, close it. - _ = connection.close() - } - case .closed: - // In general we shouldn't see leased connections return in .closed, as we should only be able to - // transition to .closed when all the leases are back. We tolerate this in production builds by just closing the - // connection, but in debug builds we assert to be sure. - logger.warning("connection returned to closed pool", metadata: [ - RedisLogging.MetadataKeys.connectionID: "\(connection.id)" - ]) - assertionFailure("Returned connection to closed pool") - fallthrough - case .closing: - // We don't need this connection, close it. + // If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room + // in the pool, we'll put it there. Otherwise, we'll close it. + if let waiter = self.connectionWaiters.popFirst() { + self.leaseConnection(connection, to: waiter) + } else if self.canAddConnectionToPool { + self.availableConnections.append(connection) + } else if let evictable = self.availableConnections.popFirst() { + // We have at least one pooled connection. The returned is more recently active, so kick out the pooled + // connection in favour of this one and close the recently evicted one. + self.availableConnections.append(connection) + _ = evictable.close() + } else { + // We don't need it, close it. _ = connection.close() - guard self.leasedConnectionCount == 0 else { return } - self.state = .closed + } + } + + private func closeConnectionForShutdown(_ connection: RedisConnection) { + connection.close().whenComplete { _ in + self.loop.preconditionInEventLoop() + + switch self.state { + case .closing(let remaining, let promise): + if remaining == 1 { + self.state = .closed + promise?.succeed() + } else { + self.state = .closing(remaining: remaining - 1, promise) + } + + case .closed, .active: + // The state must not change if we are closing a connection, while we are + // closing the pool. + preconditionFailure("Invalid state: \(self.state)") + } } } } @@ -457,10 +487,19 @@ extension ConnectionPool { /// The user has requested the connection pool to close, but there are still active connections leased to users /// and in the pool. - case closing + case closing(remaining: Int, EventLoopPromise?) /// The connection pool is closed: no connections are outstanding case closed + + var isActive: Bool { + switch self { + case .active: + return true + case .closing, .closed: + return false + } + } } } diff --git a/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift b/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift index 5c22d7d..c6be8b8 100644 --- a/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift +++ b/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift @@ -104,7 +104,6 @@ extension RedisConnectionPool { /// Closes all connections in the pool and deactivates the pool from creating new connections. /// /// This method is safe to call multiple times. - /// - Important: If the pool has connections in active use, the close process will not complete. /// - Parameters: /// - promise: A notification promise to resolve once the close process has completed. /// - logger: An optional logger to use for any log statements generated while closing the pool. diff --git a/Tests/RediStackTests/ConnectionPoolTests.swift b/Tests/RediStackTests/ConnectionPoolTests.swift index bba0b62..3f224d3 100644 --- a/Tests/RediStackTests/ConnectionPoolTests.swift +++ b/Tests/RediStackTests/ConnectionPoolTests.swift @@ -402,6 +402,95 @@ final class ConnectionPoolTests: XCTestCase { connectionPromise?.fail(ConnectionPoolTestError.connectionFailedForSomeReason) } + func testShutdownPoolThatHasConnectionThatFailsInCreation() { + var connectionPromise: EventLoopPromise? = nil + let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in + XCTAssertTrue(loop === self.server.loop) + connectionPromise = self.server.loop.makePromise() + return connectionPromise!.futureResult + } + pool.activate() + XCTAssertNoThrow(try self.server.runWhileActive()) + XCTAssertEqual(self.server.channels.count, 0) + XCTAssertNotNil(connectionPromise) + + var failedLastConnection = false + let closePromise = self.server.loop.makePromise(of: Void.self) + + pool.close(promise: closePromise) + closePromise.futureResult.whenComplete { _ in + XCTAssertTrue(failedLastConnection) + } + + failedLastConnection = true + connectionPromise?.fail(ConnectionPoolTestError.connectionFailedForSomeReason) + XCTAssertNoThrow(try closePromise.futureResult.wait()) + } + + func testShutdownPoolThatHasConnectionThatSucceedsInCreation() { + var connectionPromise: EventLoopPromise? = nil + let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in + XCTAssertTrue(loop === self.server.loop) + connectionPromise = self.server.loop.makePromise() + return connectionPromise!.futureResult + } + pool.activate() + XCTAssertNoThrow(try self.server.runWhileActive()) + XCTAssertEqual(self.server.channels.count, 0) + XCTAssertNotNil(connectionPromise) + + var lastConnectionCreated = false + let closePromise = self.server.loop.makePromise(of: Void.self) + + pool.close(promise: closePromise) + closePromise.futureResult.whenComplete { _ in + XCTAssertTrue(lastConnectionCreated) + } + + let singleConnection = self.createAConnection() + lastConnectionCreated = true + connectionPromise?.succeed(singleConnection) + self.server.loop.run() + XCTAssertNoThrow(try singleConnection.channel.closeFuture.wait(), "New connection is closed right away.") + XCTAssertNoThrow(try closePromise.futureResult.wait()) + } + + func testShutdownPoolThatHasLeasedConnection() { + var connectionPromise: EventLoopPromise? = nil + let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in + XCTAssertTrue(loop === self.server.loop) + connectionPromise = self.server.loop.makePromise() + return connectionPromise!.futureResult + } + pool.activate() + XCTAssertNoThrow(try self.server.runWhileActive()) + XCTAssertEqual(self.server.channels.count, 0) + XCTAssertNotNil(connectionPromise) + + var leasedConnectionReturned = false + let closePromise = self.server.loop.makePromise(of: Void.self) + + let singleConnection = self.createAConnection() + connectionPromise?.succeed(singleConnection) + + var leasedConnection: RedisConnection? + XCTAssertNoThrow(leasedConnection = try pool.leaseConnection(deadline: .distantFuture).wait()) + XCTAssert(singleConnection === leasedConnection) + + pool.close(promise: closePromise) + closePromise.futureResult.whenComplete { _ in + XCTAssertTrue(leasedConnectionReturned) + } + + leasedConnectionReturned = true + pool.returnConnection(singleConnection) + + self.server.loop.run() + XCTAssertNoThrow(try singleConnection.channel.closeFuture.wait(), "New connection is closed right away.") + XCTAssertNoThrow(try closePromise.futureResult.wait()) + } + + func testNonLeakyBucketWillKeepConnectingIfThereIsSpaceAndWaiters() throws { var connectionPromise: EventLoopPromise? = nil let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 0, leaky: false) { loop in