Files
Fabian Fett 3c45dbde2d Fix Connection Creation Crash (#873)
### Motivation

When creating a connection, we wrongfully assumed that
`failedToCreateNewConnection` will always be called before
`http*ConnectionClosed` in the `HTTPConnectionPoolStateMachine`. However
this is far from correct. In NIO Futures are fulfilled before
`ChannelHandler` callbacks. Ordering in futures should not be assumed in
such a complex project.

### Change

We change the `http*ConnectionClosed` methods to be noops, if the
connection is in the starting state. We instead wait for the
`failedToCreateNewConnection` to create backoff timers and friends.

rdar://164674912

---------

Co-authored-by: George Barnett <gbarnett@apple.com>
2025-12-01 09:31:32 +01:00

838 lines
36 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
extension HTTPConnectionPool {
private struct HTTP2ConnectionState {
private enum State {
/// the pool is establishing a connection. Valid transitions are to: .backingOff, .active and .closed
case starting(maximumUses: Int?)
/// the connection is waiting to retry to establish a connection. Valid transitions are to .closed.
/// From .closed a new connection state must be created for a retry.
case backingOff
/// the connection is active and is able to run requests. Valid transitions are to: .draining and .closed
case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline, remainingUses: Int?)
/// the connection is active and is running requests. No new requests must be scheduled.
/// Valid transitions to: .draining and .closed
case draining(Connection, maxStreams: Int, usedStreams: Int)
/// the connection is closed
case closed
}
var isStartingOrBackingOff: Bool {
switch self.state {
case .starting, .backingOff:
return true
case .active, .draining, .closed:
return false
}
}
var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .starting, .backingOff, .active:
return true
case .draining, .closed:
return false
}
}
var isStartingOrActive: Bool {
switch self.state {
case .starting, .active:
return true
case .draining, .backingOff, .closed:
return false
}
}
/// A connection is established and can potentially execute requests if not all streams are leased
var isActive: Bool {
switch self.state {
case .active:
return true
case .starting, .backingOff, .draining, .closed:
return false
}
}
/// A request can be scheduled on the connection
var isAvailable: Bool {
switch self.state {
case .active(_, let maxStreams, let usedStreams, _, let remainingUses):
if let remainingUses = remainingUses {
return usedStreams < maxStreams && remainingUses > 0
} else {
return usedStreams < maxStreams
}
case .starting, .backingOff, .draining, .closed:
return false
}
}
/// The connection is active, but there are no running requests on the connection.
/// Every idle connection is available, but not every available connection is idle.
var isIdle: Bool {
switch self.state {
case .active(_, _, let usedStreams, _, _):
return usedStreams == 0
case .starting, .backingOff, .draining, .closed:
return false
}
}
var isClosed: Bool {
switch self.state {
case .starting, .backingOff, .draining, .active:
return false
case .closed:
return true
}
}
private var state: State
let eventLoop: EventLoop
let connectionID: Connection.ID
/// should be called after the connection was successfully established
/// - Parameters:
/// - conn: HTTP2 connection
/// - maxStreams: max streams settings from the server
/// - Returns: number of available streams which can be leased
mutating func connected(_ conn: Connection, maxStreams: Int) -> Int {
switch self.state {
case .active, .draining, .backingOff, .closed:
preconditionFailure("Invalid state: \(self.state)")
case .starting(let maxUses):
self.state = .active(
conn,
maxStreams: maxStreams,
usedStreams: 0,
lastIdle: .now(),
remainingUses: maxUses
)
if let maxUses = maxUses {
return min(maxStreams, maxUses)
} else {
return maxStreams
}
}
}
/// should be called after receiving new http2 settings from the server
/// - Parameters:
/// - maxStreams: max streams settings from the server
/// - Returns: number of available streams which can be leased
mutating func newMaxConcurrentStreams(_ maxStreams: Int) -> Int {
switch self.state {
case .starting, .backingOff, .closed:
preconditionFailure("Invalid state for updating max concurrent streams: \(self.state)")
case .active(let conn, _, let usedStreams, let lastIdle, let remainingUses):
self.state = .active(
conn,
maxStreams: maxStreams,
usedStreams: usedStreams,
lastIdle: lastIdle,
remainingUses: remainingUses
)
let availableStreams = max(maxStreams - usedStreams, 0)
if let remainingUses = remainingUses {
return min(remainingUses, availableStreams)
} else {
return availableStreams
}
case .draining(let conn, _, let usedStreams):
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
return 0
}
}
mutating func goAwayReceived() -> EventLoop {
switch self.state {
case .starting, .backingOff, .closed:
preconditionFailure("Invalid state for draining a connection: \(self.state)")
case .active(let conn, let maxStreams, let usedStreams, _, _):
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
return conn.eventLoop
case .draining(let conn, _, _):
// we could potentially receive another go away while we drain all active streams and we just ignore it
return conn.eventLoop
}
}
/// The connection failed to start
mutating func failedToConnect() {
switch self.state {
case .starting:
self.state = .backingOff
case .backingOff, .active, .draining, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
enum FailAction {
case removeConnection
case none
}
mutating func fail() -> FailAction {
switch self.state {
case .starting:
// If the connection fails while we are starting it, the fail call raced with
// `failedToConnect` (promises are succeeded or failed before channel handler
// callbacks). let's keep the state in `starting`, so that `failedToConnect` can
// create a backoff timer.
return .none
case .active, .backingOff, .draining:
self.state = .closed
return .removeConnection
case .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func lease(_ count: Int) -> Connection {
switch self.state {
case .starting, .backingOff, .draining, .closed:
preconditionFailure("Invalid state for leasing a stream: \(self.state)")
case .active(let conn, let maxStreams, var usedStreams, let lastIdle, let remainingUses):
usedStreams += count
precondition(usedStreams <= maxStreams, "tried to lease a connection which is not available")
precondition(
remainingUses.map { $0 >= count } ?? true,
"tried to lease streams from a connection which does not have enough remaining streams"
)
self.state = .active(
conn,
maxStreams: maxStreams,
usedStreams: usedStreams,
lastIdle: lastIdle,
remainingUses: remainingUses.map { $0 - count }
)
return conn
}
}
/// should be called after a request has finished and the stream can be used again for a new request
/// - Returns: number of available streams which can be leased
mutating func release() -> Int {
switch self.state {
case .starting, .backingOff, .closed:
preconditionFailure("Invalid state: \(self.state)")
case .active(let conn, let maxStreams, var usedStreams, var lastIdle, let remainingUses):
precondition(usedStreams > 0, "we cannot release more streams than we have leased")
usedStreams &-= 1
if usedStreams == 0 {
lastIdle = .now()
}
self.state = .active(
conn,
maxStreams: maxStreams,
usedStreams: usedStreams,
lastIdle: lastIdle,
remainingUses: remainingUses
)
let availableStreams = max(maxStreams &- usedStreams, 0)
if let remainingUses = remainingUses {
return min(availableStreams, remainingUses)
} else {
return availableStreams
}
case .draining(let conn, let maxStreams, var usedStreams):
precondition(usedStreams > 0, "we cannot release more streams than we have leased")
usedStreams &-= 1
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
return 0
}
}
mutating func close() -> Connection {
switch self.state {
case .active(let conn, _, 0, _, _):
self.state = .closed
return conn
case .starting, .backingOff, .draining, .closed, .active:
preconditionFailure("Invalid state for closing a connection: \(self.state)")
}
}
enum CleanupAction {
case removeConnection
case keepConnection
}
/// Cleanup the current connection for shutdown.
///
/// This method is called, when the connections shall shutdown. Depending on the state
/// the connection is in, it adds itself to one of the arrays that are used to signal shutdown
/// intent to the underlying connections. Connections that are backing off can be easily
/// dropped (since, we only need to cancel the backoff timer), connections that are leased
/// need to be cancelled (notifying the `ChannelHandler` that we want to cancel the
/// running request), connections that are idle can be closed right away. Sadly we can't
/// cancel connection starts right now. For this reason we need to wait for them to succeed
/// or fail until we finalize the shutdown.
///
/// - Parameter context: A cleanup context to add the connection to based on its state.
/// - Returns: A cleanup action indicating if the connection can be removed from the
/// connection list.
func cleanup(_ context: inout CleanupContext) -> CleanupAction {
switch self.state {
case .starting:
return .keepConnection
case .backingOff:
context.connectBackoff.append(self.connectionID)
return .removeConnection
case .active(let connection, _, let usedStreams, _, _):
precondition(usedStreams >= 0)
if usedStreams == 0 {
context.close.append(connection)
return .removeConnection
} else {
context.cancel.append(connection)
return .keepConnection
}
case .draining(let connection, _, _):
context.cancel.append(connection)
return .keepConnection
case .closed:
preconditionFailure(
"Unexpected state for cleanup: Did not expect to have closed connections in the state machine."
)
}
}
func addStats(into stats: inout HTTP2Connections.Stats) {
switch self.state {
case .starting:
stats.startingConnections &+= 1
case .backingOff:
stats.backingOffConnections &+= 1
case .active(_, let maxStreams, let usedStreams, _, _):
stats.availableStreams += max(maxStreams - usedStreams, 0)
stats.leasedStreams += usedStreams
stats.availableConnections &+= 1
precondition(usedStreams >= 0)
if usedStreams == 0 {
stats.idleConnections &+= 1
}
case .draining(_, _, let usedStreams):
stats.drainingConnections &+= 1
stats.leasedStreams += usedStreams
precondition(usedStreams >= 0)
case .closed:
break
}
}
enum MigrateAction {
case removeConnection
case keepConnection
}
func migrateToHTTP1(
context: inout HTTP2Connections.HTTP2ToHTTP1MigrationContext
) -> MigrateAction {
switch self.state {
case .starting:
context.starting.append((self.connectionID, self.eventLoop))
return .removeConnection
case .active(let connection, _, let usedStreams, _, _):
precondition(usedStreams >= 0)
if usedStreams == 0 {
context.close.append(connection)
return .removeConnection
} else {
return .keepConnection
}
case .draining:
return .keepConnection
case .backingOff:
context.backingOff.append((self.connectionID, self.eventLoop))
return .removeConnection
case .closed:
preconditionFailure(
"Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)"
)
}
}
init(connectionID: Connection.ID, eventLoop: EventLoop, maximumUses: Int?) {
self.connectionID = connectionID
self.eventLoop = eventLoop
self.state = .starting(maximumUses: maximumUses)
}
}
struct HTTP2Connections {
/// A connectionID generator.
private let generator: Connection.ID.Generator
/// The connections states
private var connections: [HTTP2ConnectionState]
/// The number of times each connection can be used before it is closed and replaced.
private let maximumConnectionUses: Int?
var isEmpty: Bool {
self.connections.isEmpty
}
var stats: Stats {
self.connections.reduce(into: Stats()) { stats, connection in
connection.addStats(into: &stats)
}
}
init(generator: Connection.ID.Generator, maximumConnectionUses: Int?) {
self.generator = generator
self.connections = []
self.maximumConnectionUses = maximumConnectionUses
}
// MARK: Migration
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
mutating func migrateFromHTTP1(
starting: [(Connection.ID, EventLoop)],
backingOff: [(Connection.ID, EventLoop)]
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP2ConnectionState(
connectionID: connectionID,
eventLoop: eventLoop,
maximumUses: self.maximumConnectionUses
)
self.connections.append(newConnection)
}
for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP2ConnectionState(
connectionID: connectionID,
eventLoop: eventLoop,
maximumUses: self.maximumConnectionUses
)
// TODO: Maybe we want to add a static init for backing off connections to HTTP2ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
}
}
/// We will create new connections for `requiredEventLoopsOfPendingRequests`
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
/// - Parameters:
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [EventLoop]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
self.connections.lazy
.filter {
$0.canOrWillBeAbleToExecuteRequests
}.map {
$0.eventLoop.id
}
)
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
else { return nil }
let connectionID = self.createNewConnection(on: eventLoop)
return (connectionID, eventLoop)
}
}
struct HTTP2ToHTTP1MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
var close: [Connection] = []
}
mutating func migrateToHTTP1() -> HTTP2ToHTTP1MigrationContext {
var context = HTTP2ToHTTP1MigrationContext()
self.connections.removeAll { connection in
switch connection.migrateToHTTP1(context: &context) {
case .removeConnection:
return true
case .keepConnection:
return false
}
}
return context
}
// MARK: Connection creation
/// true if one ore more connections are active
var hasActiveConnections: Bool {
self.connections.contains { $0.isActive }
}
/// used in general purpose connection scenarios to check if at least one connection is starting, backing off or active
var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool {
self.connections.contains { $0.canOrWillBeAbleToExecuteRequests }
}
/// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one?
/// - Parameter eventLoop: connection `EventLoop` to search for
/// - Returns: true if at least one connection is starting, backing off or active for the given `eventLoop`
func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool {
self.connections.contains {
$0.eventLoop === eventLoop && $0.canOrWillBeAbleToExecuteRequests
}
}
func hasActiveConnection(for eventLoop: EventLoop) -> Bool {
self.connections.contains {
$0.eventLoop === eventLoop && $0.isActive
}
}
/// used after backoff is done to determine if we need to create a new connection
/// - Parameters:
/// - eventLoop: connection `EventLoop` to search for
/// - Returns: if we have a starting or active general purpose connection and if we have also one for the given `eventLoop`
func backingOffTimerDone(
for eventLoop: EventLoop
) -> RetryConnectionCreationContext {
var hasGeneralPurposeConnection: Bool = false
var hasConnectionOnSpecifiedEventLoop: Bool = false
for connection in self.connections {
guard connection.isStartingOrActive else { continue }
hasGeneralPurposeConnection = true
guard connection.eventLoop === eventLoop else { continue }
hasConnectionOnSpecifiedEventLoop = true
break
}
return RetryConnectionCreationContext(
hasGeneralPurposeConnection: hasGeneralPurposeConnection,
hasConnectionOnSpecifiedEventLoop: hasConnectionOnSpecifiedEventLoop
)
}
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
assert(
!self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop),
"we should not create more than one connection per event loop"
)
let connection = HTTP2ConnectionState(
connectionID: self.generator.next(),
eventLoop: eventLoop,
maximumUses: self.maximumConnectionUses
)
self.connections.append(connection)
return connection.connectionID
}
/// A new HTTP/2 connection was established.
///
/// This will put the connection into the idle state.
///
/// - Parameter connection: The new established connection.
/// - Returns: An index and an ``EstablishedConnectionContext`` to determine the next action for the now idle connection.
/// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after
/// this.
mutating func newHTTP2ConnectionEstablished(
_ connection: Connection,
maxConcurrentStreams: Int
) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else {
preconditionFailure("There is a new connection that we didn't request!")
}
precondition(
connection.eventLoop === self.connections[index].eventLoop,
"Expected the new connection to be on EL"
)
let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams)
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: connection.eventLoop,
isIdle: self.connections[index].isIdle,
connectionID: connection.id
)
return (index, context)
}
/// Move the connection state to backingOff.
///
/// - Parameter connectionID: The connectionID of the failed connection attempt
/// - Returns: The eventLoop on which to schedule the backoff timer
/// - Precondition: connection needs to be in the `.starting` state
mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to create a new connection that we know nothing about?")
}
self.connections[index].failedToConnect()
return self.connections[index].eventLoop
}
// MARK: Connection lifecycle events
/// Sets the connection with the given `connectionId` to the draining state.
/// - Returns: the `EventLoop` to create a new connection on if applicable
/// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report further
// events to the state machine even though we don't care about its state anymore.
//
// This is because the HTTP2Connection has a strong let reference to its delegate.
return nil
}
let eventLoop = self.connections[index].goAwayReceived()
return GoAwayContext(eventLoop: eventLoop)
}
/// Update the maximum number of concurrent streams for the given connection.
/// - Parameters:
/// - connectionID: The connectionID for which we received new settings
/// - newMaxStreams: new maximum concurrent streams
/// - Returns: index of the connection and new number of available streams in the `EstablishedConnectionContext`
/// - Precondition: Connections must be in the `.active` or `.draining` state.
mutating func newHTTP2MaxConcurrentStreamsReceived(
_ connectionID: Connection.ID,
newMaxStreams: Int
) -> (Int, EstablishedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report its events to
// the state machine and hence to this `HTTP2Connections` struct. In those cases we
// must ignore the event.
return nil
}
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: self.connections[index].eventLoop,
isIdle: self.connections[index].isIdle,
connectionID: connectionID
)
return (index, context)
}
// MARK: Leasing and releasing
mutating func leaseStream(onPreferred eventLoop: EventLoop) -> (Connection, LeasedStreamContext)? {
guard let index = self.findAvailableConnection(onPreferred: eventLoop) else { return nil }
return self.leaseStreams(at: index, count: 1)
}
/// tries to find an available connection on the preferred `eventLoop`. If it can't find one with the given `eventLoop`, it returns the first available connection
private func findAvailableConnection(onPreferred eventLoop: EventLoop) -> Int? {
var availableConnectionIndex: Int?
for (offset, connection) in self.connections.enumerated() {
guard connection.isAvailable else { continue }
if connection.eventLoop === eventLoop {
return self.connections.index(self.connections.startIndex, offsetBy: offset)
} else if availableConnectionIndex == nil {
availableConnectionIndex = self.connections.index(self.connections.startIndex, offsetBy: offset)
}
}
return availableConnectionIndex
}
mutating func leaseStream(onRequired eventLoop: EventLoop) -> (Connection, LeasedStreamContext)? {
guard let index = self.findAvailableConnection(onRequired: eventLoop) else { return nil }
return self.leaseStreams(at: index, count: 1)
}
/// tries to find an available connection on the required `eventLoop`
private func findAvailableConnection(onRequired eventLoop: EventLoop) -> Int? {
self.connections.firstIndex(where: { $0.eventLoop === eventLoop && $0.isAvailable })
}
/// lease `count` streams after connections establishment
/// - Parameters:
/// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)`
/// - count: number of streams you want to lease. You get the current available streams from the `EstablishedConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
/// - Returns: connection to execute `count` requests on
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *1* and not exceed the number of available streams.
mutating func leaseStreams(at index: Int, count: Int) -> (Connection, LeasedStreamContext) {
precondition(count >= 1, "stream lease count must be greater than or equal to 1")
let isIdle = self.connections[index].isIdle
let connection = self.connections[index].lease(count)
let context = LeasedStreamContext(wasIdle: isIdle)
return (connection, context)
}
mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to release a connection we do not know anything about")
}
let availableStreams = self.connections[index].release()
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: self.connections[index].eventLoop,
isIdle: self.connections[index].isIdle,
connectionID: connectionID
)
return (index, context)
}
// MARK: Connection close/removal
/// Closes the connection at the given index. This will also remove the connection right away.
/// - Parameter index: index of the connection which we get from `releaseStream(_:)`
/// - Returns: closed and removed connection
mutating func closeConnection(at index: Int) -> Connection {
let connection = self.connections[index].close()
self.removeConnection(at: index)
return connection
}
/// removes a closed connection.
/// - Parameter index: index of the connection which we get from `failConnection(_:)`
/// - Precondition: connection must be closed
mutating func removeConnection(at index: Int) {
precondition(self.connections[index].isClosed, "We tried to remove a connection which is not closed")
self.connections.remove(at: index)
}
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> Connection? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
// because of a race this connection (connection close runs against trigger of timeout)
// was already removed from the state machine.
return nil
}
guard self.connections[index].isIdle else {
// connection is not idle anymore, we may have just leased it for a request
return nil
}
return self.closeConnection(at: index)
}
/// replaces a closed connection by creating a new starting connection.
/// - Parameter index: index of the connection which we got from `failConnection(_:)`
/// - Precondition: connection must be closed
mutating func createNewConnectionByReplacingClosedConnection(at index: Int) -> (Connection.ID, EventLoop) {
precondition(self.connections[index].isClosed)
let newConnection = HTTP2ConnectionState(
connectionID: self.generator.next(),
eventLoop: self.connections[index].eventLoop,
maximumUses: self.maximumConnectionUses
)
self.connections[index] = newConnection
return (newConnection.connectionID, newConnection.eventLoop)
}
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report its close to
// the state machine and then to this `HTTP2Connections` struct. In those cases we
// must ignore the event.
return nil
}
switch self.connections[index].fail() {
case .none:
return nil
case .removeConnection:
let eventLoop = self.connections[index].eventLoop
let context = FailedConnectionContext(eventLoop: eventLoop)
return (index, context)
}
}
mutating func shutdown() -> CleanupContext {
var cleanupContext = CleanupContext()
self.connections.removeAll(where: { connectionState in
switch connectionState.cleanup(&cleanupContext) {
case .removeConnection:
return true
case .keepConnection:
return false
}
})
return cleanupContext
}
// MARK: Result structs
struct RetryConnectionCreationContext {
/// true if at least one connection is starting or active regardless of the event loop.
let hasGeneralPurposeConnection: Bool
/// true if at least one connection is starting or active for the given `eventLoop`
let hasConnectionOnSpecifiedEventLoop: Bool
}
/// Information around a connection which is either in the .active or .draining state.
struct EstablishedConnectionContext {
/// number of streams which can be leased
var availableStreams: Int
/// The eventLoop the connection is running on.
var eventLoop: EventLoop
/// true if no stream is leased
var isIdle: Bool
/// id of the connection
var connectionID: Connection.ID
}
struct LeasedStreamContext {
/// true if the connection was idle before leasing the stream
var wasIdle: Bool
}
struct GoAwayContext {
/// The eventLoop the connection is running on.
var eventLoop: EventLoop
}
/// Information around the failed/closed connection.
struct FailedConnectionContext {
/// The eventLoop the connection ran on.
var eventLoop: EventLoop
}
struct Stats: Equatable {
var startingConnections: Int = 0
var backingOffConnections: Int = 0
var idleConnections: Int = 0
var availableConnections: Int = 0
var drainingConnections: Int = 0
var leasedStreams: Int = 0
var availableStreams: Int = 0
}
}
}