diff --git a/Sources/RediStack/ConnectionPool/ConnectionPool.swift b/Sources/RediStack/ConnectionPool/ConnectionPool.swift index f2a794e..af4b32f 100644 --- a/Sources/RediStack/ConnectionPool/ConnectionPool.swift +++ b/Sources/RediStack/ConnectionPool/ConnectionPool.swift @@ -143,7 +143,8 @@ internal final class ConnectionPool { } /// Deactivates this connection pool. Once this is called, no further connections can be obtained - /// from the pool. Leased connections are not deactivated and can continue to be used. + /// from the pool. Leased connections are not deactivated and can continue to be used. All waiters + /// are failed with a pool is closed error. func close(promise: EventLoopPromise? = nil, logger: Logger) { if self.loop.inEventLoop { self.closePool(promise: promise, logger: logger) @@ -228,7 +229,7 @@ extension ConnectionPool { switch self.state { case .closing: // We don't want this anymore, drop it. - _ = connection.close() + self.closeConnectionForShutdown(connection) case .closed: // This is programmer error, we shouldn't have entered this state. logger.critical("new connection created on a closed pool", metadata: [ @@ -250,10 +251,21 @@ extension ConnectionPool { RedisLogging.MetadataKeys.error: "\(error)" ]) - guard case .active = self.state else { - // No point continuing connection creation if we're not active. - logger.trace("not creating new connections due to inactivity") + switch self.state { + case .active: + break // continue further down + + case .closing(let remaining, let promise): + if remaining == 1 { + self.state = .closed + promise?.succeed() + } else { + self.state = .closing(remaining: remaining - 1, promise) + } return + + case .closed: + preconditionFailure("Invalid state: \(self.state)") } // Ok, we're still active. Before we do anything, we want to check whether anyone is still waiting @@ -315,39 +327,42 @@ extension ConnectionPool { private func closePool(promise: EventLoopPromise?, logger: Logger) { self.loop.preconditionInEventLoop() - // Pool closure must be monotonic. - guard case .active = self.state else { - logger.info("received duplicate request to close connection pool") - promise?.succeed(()) + switch self.state { + case .active: + self.state = .closing(remaining: self.activeConnectionCount, promise) + + case .closing(let count, let existingPromise): + if let existingPromise = existingPromise { + existingPromise.futureResult.cascade(to: promise) + } else { + self.state = .closing(remaining: count, promise) + } + return + + case .closed: + promise?.succeed() return } - self.state = .closing - - // To close the pool we need to drop all active connections. - let connections = self.availableConnections - self.availableConnections = [] - let closeFutures = connections.map { $0.close() } - // We also cancel all pending leases. while let pendingLease = self.connectionWaiters.popFirst() { pendingLease.fail(RedisConnectionPoolError.poolClosed) } - guard self.activeConnectionCount == 0 else { - logger.debug("not closing pool, waiting for all connections to be returned", metadata: [ - RedisLogging.MetadataKeys.poolConnectionCount: "\(self.activeConnectionCount)" - ]) - promise?.fail(RedisConnectionPoolError.poolHasActiveConnections) + if self.activeConnectionCount == 0 { + // That was all the connections, so this is now closed. + logger.trace("pool is now closed") + self.state = .closed + promise?.succeed() return } - // That was all the connections, so this is now closed. - logger.trace("pool is now closed") - self.state = .closed - EventLoopFuture - .andAllSucceed(closeFutures, on: self.loop) - .cascade(to: promise) + // To close the pool we need to drop all active connections. + let connections = self.availableConnections + self.availableConnections = [] + for connection in connections { + self.closeConnectionForShutdown(connection) + } } /// This is the on-thread implementation for leasing connections out to users. Here we work out how to get a new @@ -401,13 +416,24 @@ extension ConnectionPool { private func _returnLeasedConnection(_ connection: RedisConnection, logger: Logger) { self.loop.assertInEventLoop() self.leasedConnectionCount -= 1 - self._returnConnection(connection, logger: logger) + + switch self.state { + case .active: + self._returnConnection(connection, logger: logger) + + case .closing: + return self.closeConnectionForShutdown(connection) + + case .closed: + preconditionFailure("Invalid state: \(self.state)") + } } /// This is the on-thread implementation for returning connections to the pool. Here we work out what to do with a newly-acquired /// connection. private func _returnConnection(_ connection: RedisConnection, logger: Logger) { self.loop.assertInEventLoop() + precondition(self.state.isActive) guard connection.isConnected else { // This connection isn't active anymore. We'll dump it and potentially kick off a reconnection. @@ -415,37 +441,41 @@ extension ConnectionPool { return } - switch self.state { - case .active: - // If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room - // in the pool, we'll put it there. Otherwise, we'll close it. - if let waiter = self.connectionWaiters.popFirst() { - self.leaseConnection(connection, to: waiter) - } else if self.canAddConnectionToPool { - self.availableConnections.append(connection) - } else if let evictable = self.availableConnections.popFirst() { - // We have at least one pooled connection. The returned is more recently active, so kick out the pooled - // connection in favour of this one and close the recently evicted one. - self.availableConnections.append(connection) - _ = evictable.close() - } else { - // We don't need it, close it. - _ = connection.close() - } - case .closed: - // In general we shouldn't see leased connections return in .closed, as we should only be able to - // transition to .closed when all the leases are back. We tolerate this in production builds by just closing the - // connection, but in debug builds we assert to be sure. - logger.warning("connection returned to closed pool", metadata: [ - RedisLogging.MetadataKeys.connectionID: "\(connection.id)" - ]) - assertionFailure("Returned connection to closed pool") - fallthrough - case .closing: - // We don't need this connection, close it. + // If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room + // in the pool, we'll put it there. Otherwise, we'll close it. + if let waiter = self.connectionWaiters.popFirst() { + self.leaseConnection(connection, to: waiter) + } else if self.canAddConnectionToPool { + self.availableConnections.append(connection) + } else if let evictable = self.availableConnections.popFirst() { + // We have at least one pooled connection. The returned is more recently active, so kick out the pooled + // connection in favour of this one and close the recently evicted one. + self.availableConnections.append(connection) + _ = evictable.close() + } else { + // We don't need it, close it. _ = connection.close() - guard self.leasedConnectionCount == 0 else { return } - self.state = .closed + } + } + + private func closeConnectionForShutdown(_ connection: RedisConnection) { + connection.close().whenComplete { _ in + self.loop.preconditionInEventLoop() + + switch self.state { + case .closing(let remaining, let promise): + if remaining == 1 { + self.state = .closed + promise?.succeed() + } else { + self.state = .closing(remaining: remaining - 1, promise) + } + + case .closed, .active: + // The state must not change if we are closing a connection, while we are + // closing the pool. + preconditionFailure("Invalid state: \(self.state)") + } } } } @@ -457,10 +487,19 @@ extension ConnectionPool { /// The user has requested the connection pool to close, but there are still active connections leased to users /// and in the pool. - case closing + case closing(remaining: Int, EventLoopPromise?) /// The connection pool is closed: no connections are outstanding case closed + + var isActive: Bool { + switch self { + case .active: + return true + case .closing, .closed: + return false + } + } } } diff --git a/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift b/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift index 5c22d7d..c6be8b8 100644 --- a/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift +++ b/Sources/RediStack/ConnectionPool/RedisConnectionPool.swift @@ -104,7 +104,6 @@ extension RedisConnectionPool { /// Closes all connections in the pool and deactivates the pool from creating new connections. /// /// This method is safe to call multiple times. - /// - Important: If the pool has connections in active use, the close process will not complete. /// - Parameters: /// - promise: A notification promise to resolve once the close process has completed. /// - logger: An optional logger to use for any log statements generated while closing the pool. diff --git a/Tests/RediStackTests/ConnectionPoolTests.swift b/Tests/RediStackTests/ConnectionPoolTests.swift index bba0b62..3f224d3 100644 --- a/Tests/RediStackTests/ConnectionPoolTests.swift +++ b/Tests/RediStackTests/ConnectionPoolTests.swift @@ -402,6 +402,95 @@ final class ConnectionPoolTests: XCTestCase { connectionPromise?.fail(ConnectionPoolTestError.connectionFailedForSomeReason) } + func testShutdownPoolThatHasConnectionThatFailsInCreation() { + var connectionPromise: EventLoopPromise? = nil + let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in + XCTAssertTrue(loop === self.server.loop) + connectionPromise = self.server.loop.makePromise() + return connectionPromise!.futureResult + } + pool.activate() + XCTAssertNoThrow(try self.server.runWhileActive()) + XCTAssertEqual(self.server.channels.count, 0) + XCTAssertNotNil(connectionPromise) + + var failedLastConnection = false + let closePromise = self.server.loop.makePromise(of: Void.self) + + pool.close(promise: closePromise) + closePromise.futureResult.whenComplete { _ in + XCTAssertTrue(failedLastConnection) + } + + failedLastConnection = true + connectionPromise?.fail(ConnectionPoolTestError.connectionFailedForSomeReason) + XCTAssertNoThrow(try closePromise.futureResult.wait()) + } + + func testShutdownPoolThatHasConnectionThatSucceedsInCreation() { + var connectionPromise: EventLoopPromise? = nil + let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in + XCTAssertTrue(loop === self.server.loop) + connectionPromise = self.server.loop.makePromise() + return connectionPromise!.futureResult + } + pool.activate() + XCTAssertNoThrow(try self.server.runWhileActive()) + XCTAssertEqual(self.server.channels.count, 0) + XCTAssertNotNil(connectionPromise) + + var lastConnectionCreated = false + let closePromise = self.server.loop.makePromise(of: Void.self) + + pool.close(promise: closePromise) + closePromise.futureResult.whenComplete { _ in + XCTAssertTrue(lastConnectionCreated) + } + + let singleConnection = self.createAConnection() + lastConnectionCreated = true + connectionPromise?.succeed(singleConnection) + self.server.loop.run() + XCTAssertNoThrow(try singleConnection.channel.closeFuture.wait(), "New connection is closed right away.") + XCTAssertNoThrow(try closePromise.futureResult.wait()) + } + + func testShutdownPoolThatHasLeasedConnection() { + var connectionPromise: EventLoopPromise? = nil + let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in + XCTAssertTrue(loop === self.server.loop) + connectionPromise = self.server.loop.makePromise() + return connectionPromise!.futureResult + } + pool.activate() + XCTAssertNoThrow(try self.server.runWhileActive()) + XCTAssertEqual(self.server.channels.count, 0) + XCTAssertNotNil(connectionPromise) + + var leasedConnectionReturned = false + let closePromise = self.server.loop.makePromise(of: Void.self) + + let singleConnection = self.createAConnection() + connectionPromise?.succeed(singleConnection) + + var leasedConnection: RedisConnection? + XCTAssertNoThrow(leasedConnection = try pool.leaseConnection(deadline: .distantFuture).wait()) + XCTAssert(singleConnection === leasedConnection) + + pool.close(promise: closePromise) + closePromise.futureResult.whenComplete { _ in + XCTAssertTrue(leasedConnectionReturned) + } + + leasedConnectionReturned = true + pool.returnConnection(singleConnection) + + self.server.loop.run() + XCTAssertNoThrow(try singleConnection.channel.closeFuture.wait(), "New connection is closed right away.") + XCTAssertNoThrow(try closePromise.futureResult.wait()) + } + + func testNonLeakyBucketWillKeepConnectingIfThereIsSpaceAndWaiters() throws { var connectionPromise: EventLoopPromise? = nil let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 0, leaky: false) { loop in