mirror of
https://github.com/swift-server/async-http-client.git
synced 2026-05-03 07:32:29 +00:00
3c45dbde2d
### Motivation When creating a connection, we wrongfully assumed that `failedToCreateNewConnection` will always be called before `http*ConnectionClosed` in the `HTTPConnectionPoolStateMachine`. However this is far from correct. In NIO Futures are fulfilled before `ChannelHandler` callbacks. Ordering in futures should not be assumed in such a complex project. ### Change We change the `http*ConnectionClosed` methods to be noops, if the connection is in the starting state. We instead wait for the `failedToCreateNewConnection` to create backoff timers and friends. rdar://164674912 --------- Co-authored-by: George Barnett <gbarnett@apple.com>
838 lines
36 KiB
Swift
838 lines
36 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the AsyncHTTPClient open source project
|
|
//
|
|
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
import NIOCore
|
|
|
|
extension HTTPConnectionPool {
|
|
private struct HTTP2ConnectionState {
|
|
private enum State {
|
|
/// the pool is establishing a connection. Valid transitions are to: .backingOff, .active and .closed
|
|
case starting(maximumUses: Int?)
|
|
/// the connection is waiting to retry to establish a connection. Valid transitions are to .closed.
|
|
/// From .closed a new connection state must be created for a retry.
|
|
case backingOff
|
|
/// the connection is active and is able to run requests. Valid transitions are to: .draining and .closed
|
|
case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline, remainingUses: Int?)
|
|
/// the connection is active and is running requests. No new requests must be scheduled.
|
|
/// Valid transitions to: .draining and .closed
|
|
case draining(Connection, maxStreams: Int, usedStreams: Int)
|
|
/// the connection is closed
|
|
case closed
|
|
}
|
|
|
|
var isStartingOrBackingOff: Bool {
|
|
switch self.state {
|
|
case .starting, .backingOff:
|
|
return true
|
|
case .active, .draining, .closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
var canOrWillBeAbleToExecuteRequests: Bool {
|
|
switch self.state {
|
|
case .starting, .backingOff, .active:
|
|
return true
|
|
case .draining, .closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
var isStartingOrActive: Bool {
|
|
switch self.state {
|
|
case .starting, .active:
|
|
return true
|
|
case .draining, .backingOff, .closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
/// A connection is established and can potentially execute requests if not all streams are leased
|
|
var isActive: Bool {
|
|
switch self.state {
|
|
case .active:
|
|
return true
|
|
case .starting, .backingOff, .draining, .closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
/// A request can be scheduled on the connection
|
|
var isAvailable: Bool {
|
|
switch self.state {
|
|
case .active(_, let maxStreams, let usedStreams, _, let remainingUses):
|
|
if let remainingUses = remainingUses {
|
|
return usedStreams < maxStreams && remainingUses > 0
|
|
} else {
|
|
return usedStreams < maxStreams
|
|
}
|
|
case .starting, .backingOff, .draining, .closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
/// The connection is active, but there are no running requests on the connection.
|
|
/// Every idle connection is available, but not every available connection is idle.
|
|
var isIdle: Bool {
|
|
switch self.state {
|
|
case .active(_, _, let usedStreams, _, _):
|
|
return usedStreams == 0
|
|
case .starting, .backingOff, .draining, .closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
var isClosed: Bool {
|
|
switch self.state {
|
|
case .starting, .backingOff, .draining, .active:
|
|
return false
|
|
case .closed:
|
|
return true
|
|
}
|
|
}
|
|
|
|
private var state: State
|
|
let eventLoop: EventLoop
|
|
let connectionID: Connection.ID
|
|
|
|
/// should be called after the connection was successfully established
|
|
/// - Parameters:
|
|
/// - conn: HTTP2 connection
|
|
/// - maxStreams: max streams settings from the server
|
|
/// - Returns: number of available streams which can be leased
|
|
mutating func connected(_ conn: Connection, maxStreams: Int) -> Int {
|
|
switch self.state {
|
|
case .active, .draining, .backingOff, .closed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
|
|
case .starting(let maxUses):
|
|
self.state = .active(
|
|
conn,
|
|
maxStreams: maxStreams,
|
|
usedStreams: 0,
|
|
lastIdle: .now(),
|
|
remainingUses: maxUses
|
|
)
|
|
if let maxUses = maxUses {
|
|
return min(maxStreams, maxUses)
|
|
} else {
|
|
return maxStreams
|
|
}
|
|
}
|
|
}
|
|
|
|
/// should be called after receiving new http2 settings from the server
|
|
/// - Parameters:
|
|
/// - maxStreams: max streams settings from the server
|
|
/// - Returns: number of available streams which can be leased
|
|
mutating func newMaxConcurrentStreams(_ maxStreams: Int) -> Int {
|
|
switch self.state {
|
|
case .starting, .backingOff, .closed:
|
|
preconditionFailure("Invalid state for updating max concurrent streams: \(self.state)")
|
|
|
|
case .active(let conn, _, let usedStreams, let lastIdle, let remainingUses):
|
|
self.state = .active(
|
|
conn,
|
|
maxStreams: maxStreams,
|
|
usedStreams: usedStreams,
|
|
lastIdle: lastIdle,
|
|
remainingUses: remainingUses
|
|
)
|
|
let availableStreams = max(maxStreams - usedStreams, 0)
|
|
if let remainingUses = remainingUses {
|
|
return min(remainingUses, availableStreams)
|
|
} else {
|
|
return availableStreams
|
|
}
|
|
|
|
case .draining(let conn, _, let usedStreams):
|
|
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
|
|
return 0
|
|
}
|
|
}
|
|
|
|
mutating func goAwayReceived() -> EventLoop {
|
|
switch self.state {
|
|
case .starting, .backingOff, .closed:
|
|
preconditionFailure("Invalid state for draining a connection: \(self.state)")
|
|
|
|
case .active(let conn, let maxStreams, let usedStreams, _, _):
|
|
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
|
|
return conn.eventLoop
|
|
|
|
case .draining(let conn, _, _):
|
|
// we could potentially receive another go away while we drain all active streams and we just ignore it
|
|
return conn.eventLoop
|
|
}
|
|
}
|
|
|
|
/// The connection failed to start
|
|
mutating func failedToConnect() {
|
|
switch self.state {
|
|
case .starting:
|
|
self.state = .backingOff
|
|
case .backingOff, .active, .draining, .closed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
}
|
|
}
|
|
|
|
enum FailAction {
|
|
case removeConnection
|
|
case none
|
|
}
|
|
|
|
mutating func fail() -> FailAction {
|
|
switch self.state {
|
|
case .starting:
|
|
// If the connection fails while we are starting it, the fail call raced with
|
|
// `failedToConnect` (promises are succeeded or failed before channel handler
|
|
// callbacks). let's keep the state in `starting`, so that `failedToConnect` can
|
|
// create a backoff timer.
|
|
return .none
|
|
case .active, .backingOff, .draining:
|
|
self.state = .closed
|
|
return .removeConnection
|
|
case .closed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
}
|
|
}
|
|
|
|
mutating func lease(_ count: Int) -> Connection {
|
|
switch self.state {
|
|
case .starting, .backingOff, .draining, .closed:
|
|
preconditionFailure("Invalid state for leasing a stream: \(self.state)")
|
|
|
|
case .active(let conn, let maxStreams, var usedStreams, let lastIdle, let remainingUses):
|
|
usedStreams += count
|
|
precondition(usedStreams <= maxStreams, "tried to lease a connection which is not available")
|
|
precondition(
|
|
remainingUses.map { $0 >= count } ?? true,
|
|
"tried to lease streams from a connection which does not have enough remaining streams"
|
|
)
|
|
self.state = .active(
|
|
conn,
|
|
maxStreams: maxStreams,
|
|
usedStreams: usedStreams,
|
|
lastIdle: lastIdle,
|
|
remainingUses: remainingUses.map { $0 - count }
|
|
)
|
|
return conn
|
|
}
|
|
}
|
|
|
|
/// should be called after a request has finished and the stream can be used again for a new request
|
|
/// - Returns: number of available streams which can be leased
|
|
mutating func release() -> Int {
|
|
switch self.state {
|
|
case .starting, .backingOff, .closed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
|
|
case .active(let conn, let maxStreams, var usedStreams, var lastIdle, let remainingUses):
|
|
precondition(usedStreams > 0, "we cannot release more streams than we have leased")
|
|
usedStreams &-= 1
|
|
if usedStreams == 0 {
|
|
lastIdle = .now()
|
|
}
|
|
|
|
self.state = .active(
|
|
conn,
|
|
maxStreams: maxStreams,
|
|
usedStreams: usedStreams,
|
|
lastIdle: lastIdle,
|
|
remainingUses: remainingUses
|
|
)
|
|
let availableStreams = max(maxStreams &- usedStreams, 0)
|
|
if let remainingUses = remainingUses {
|
|
return min(availableStreams, remainingUses)
|
|
} else {
|
|
return availableStreams
|
|
}
|
|
|
|
case .draining(let conn, let maxStreams, var usedStreams):
|
|
precondition(usedStreams > 0, "we cannot release more streams than we have leased")
|
|
usedStreams &-= 1
|
|
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
|
|
return 0
|
|
}
|
|
}
|
|
|
|
mutating func close() -> Connection {
|
|
switch self.state {
|
|
case .active(let conn, _, 0, _, _):
|
|
self.state = .closed
|
|
return conn
|
|
|
|
case .starting, .backingOff, .draining, .closed, .active:
|
|
preconditionFailure("Invalid state for closing a connection: \(self.state)")
|
|
}
|
|
}
|
|
|
|
enum CleanupAction {
|
|
case removeConnection
|
|
case keepConnection
|
|
}
|
|
|
|
/// Cleanup the current connection for shutdown.
|
|
///
|
|
/// This method is called, when the connections shall shutdown. Depending on the state
|
|
/// the connection is in, it adds itself to one of the arrays that are used to signal shutdown
|
|
/// intent to the underlying connections. Connections that are backing off can be easily
|
|
/// dropped (since, we only need to cancel the backoff timer), connections that are leased
|
|
/// need to be cancelled (notifying the `ChannelHandler` that we want to cancel the
|
|
/// running request), connections that are idle can be closed right away. Sadly we can't
|
|
/// cancel connection starts right now. For this reason we need to wait for them to succeed
|
|
/// or fail until we finalize the shutdown.
|
|
///
|
|
/// - Parameter context: A cleanup context to add the connection to based on its state.
|
|
/// - Returns: A cleanup action indicating if the connection can be removed from the
|
|
/// connection list.
|
|
func cleanup(_ context: inout CleanupContext) -> CleanupAction {
|
|
switch self.state {
|
|
case .starting:
|
|
return .keepConnection
|
|
|
|
case .backingOff:
|
|
context.connectBackoff.append(self.connectionID)
|
|
return .removeConnection
|
|
|
|
case .active(let connection, _, let usedStreams, _, _):
|
|
precondition(usedStreams >= 0)
|
|
if usedStreams == 0 {
|
|
context.close.append(connection)
|
|
return .removeConnection
|
|
} else {
|
|
context.cancel.append(connection)
|
|
return .keepConnection
|
|
}
|
|
|
|
case .draining(let connection, _, _):
|
|
context.cancel.append(connection)
|
|
return .keepConnection
|
|
|
|
case .closed:
|
|
preconditionFailure(
|
|
"Unexpected state for cleanup: Did not expect to have closed connections in the state machine."
|
|
)
|
|
}
|
|
}
|
|
|
|
func addStats(into stats: inout HTTP2Connections.Stats) {
|
|
switch self.state {
|
|
case .starting:
|
|
stats.startingConnections &+= 1
|
|
|
|
case .backingOff:
|
|
stats.backingOffConnections &+= 1
|
|
|
|
case .active(_, let maxStreams, let usedStreams, _, _):
|
|
stats.availableStreams += max(maxStreams - usedStreams, 0)
|
|
stats.leasedStreams += usedStreams
|
|
stats.availableConnections &+= 1
|
|
precondition(usedStreams >= 0)
|
|
if usedStreams == 0 {
|
|
stats.idleConnections &+= 1
|
|
}
|
|
case .draining(_, _, let usedStreams):
|
|
stats.drainingConnections &+= 1
|
|
stats.leasedStreams += usedStreams
|
|
precondition(usedStreams >= 0)
|
|
case .closed:
|
|
break
|
|
}
|
|
}
|
|
|
|
enum MigrateAction {
|
|
case removeConnection
|
|
case keepConnection
|
|
}
|
|
|
|
func migrateToHTTP1(
|
|
context: inout HTTP2Connections.HTTP2ToHTTP1MigrationContext
|
|
) -> MigrateAction {
|
|
switch self.state {
|
|
case .starting:
|
|
context.starting.append((self.connectionID, self.eventLoop))
|
|
return .removeConnection
|
|
|
|
case .active(let connection, _, let usedStreams, _, _):
|
|
precondition(usedStreams >= 0)
|
|
if usedStreams == 0 {
|
|
context.close.append(connection)
|
|
return .removeConnection
|
|
} else {
|
|
return .keepConnection
|
|
}
|
|
|
|
case .draining:
|
|
return .keepConnection
|
|
|
|
case .backingOff:
|
|
context.backingOff.append((self.connectionID, self.eventLoop))
|
|
return .removeConnection
|
|
|
|
case .closed:
|
|
preconditionFailure(
|
|
"Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)"
|
|
)
|
|
}
|
|
}
|
|
|
|
init(connectionID: Connection.ID, eventLoop: EventLoop, maximumUses: Int?) {
|
|
self.connectionID = connectionID
|
|
self.eventLoop = eventLoop
|
|
self.state = .starting(maximumUses: maximumUses)
|
|
}
|
|
}
|
|
|
|
struct HTTP2Connections {
|
|
/// A connectionID generator.
|
|
private let generator: Connection.ID.Generator
|
|
/// The connections states
|
|
private var connections: [HTTP2ConnectionState]
|
|
/// The number of times each connection can be used before it is closed and replaced.
|
|
private let maximumConnectionUses: Int?
|
|
|
|
var isEmpty: Bool {
|
|
self.connections.isEmpty
|
|
}
|
|
|
|
var stats: Stats {
|
|
self.connections.reduce(into: Stats()) { stats, connection in
|
|
connection.addStats(into: &stats)
|
|
}
|
|
}
|
|
|
|
init(generator: Connection.ID.Generator, maximumConnectionUses: Int?) {
|
|
self.generator = generator
|
|
self.connections = []
|
|
self.maximumConnectionUses = maximumConnectionUses
|
|
}
|
|
|
|
// MARK: Migration
|
|
|
|
/// We only handle starting and backing off connection here.
|
|
/// All already running connections must be handled by the enclosing state machine.
|
|
/// - Parameters:
|
|
/// - starting: starting HTTP connections from previous state machine
|
|
/// - backingOff: backing off HTTP connections from previous state machine
|
|
mutating func migrateFromHTTP1(
|
|
starting: [(Connection.ID, EventLoop)],
|
|
backingOff: [(Connection.ID, EventLoop)]
|
|
) {
|
|
for (connectionID, eventLoop) in starting {
|
|
let newConnection = HTTP2ConnectionState(
|
|
connectionID: connectionID,
|
|
eventLoop: eventLoop,
|
|
maximumUses: self.maximumConnectionUses
|
|
)
|
|
self.connections.append(newConnection)
|
|
}
|
|
|
|
for (connectionID, eventLoop) in backingOff {
|
|
var backingOffConnection = HTTP2ConnectionState(
|
|
connectionID: connectionID,
|
|
eventLoop: eventLoop,
|
|
maximumUses: self.maximumConnectionUses
|
|
)
|
|
// TODO: Maybe we want to add a static init for backing off connections to HTTP2ConnectionState
|
|
backingOffConnection.failedToConnect()
|
|
self.connections.append(backingOffConnection)
|
|
}
|
|
}
|
|
|
|
/// We will create new connections for `requiredEventLoopsOfPendingRequests`
|
|
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
|
|
/// - Parameters:
|
|
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
|
|
/// - Returns: new connections that need to be created
|
|
mutating func createConnectionsAfterMigrationIfNeeded(
|
|
requiredEventLoopsOfPendingRequests: [EventLoop]
|
|
) -> [(Connection.ID, EventLoop)] {
|
|
// create new connections for requests with a required event loop
|
|
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
|
|
self.connections.lazy
|
|
.filter {
|
|
$0.canOrWillBeAbleToExecuteRequests
|
|
}.map {
|
|
$0.eventLoop.id
|
|
}
|
|
)
|
|
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
|
|
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
|
|
else { return nil }
|
|
let connectionID = self.createNewConnection(on: eventLoop)
|
|
return (connectionID, eventLoop)
|
|
}
|
|
}
|
|
|
|
struct HTTP2ToHTTP1MigrationContext {
|
|
var backingOff: [(Connection.ID, EventLoop)] = []
|
|
var starting: [(Connection.ID, EventLoop)] = []
|
|
var close: [Connection] = []
|
|
}
|
|
|
|
mutating func migrateToHTTP1() -> HTTP2ToHTTP1MigrationContext {
|
|
var context = HTTP2ToHTTP1MigrationContext()
|
|
self.connections.removeAll { connection in
|
|
switch connection.migrateToHTTP1(context: &context) {
|
|
case .removeConnection:
|
|
return true
|
|
case .keepConnection:
|
|
return false
|
|
}
|
|
}
|
|
return context
|
|
}
|
|
|
|
// MARK: Connection creation
|
|
|
|
/// true if one ore more connections are active
|
|
var hasActiveConnections: Bool {
|
|
self.connections.contains { $0.isActive }
|
|
}
|
|
|
|
/// used in general purpose connection scenarios to check if at least one connection is starting, backing off or active
|
|
var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool {
|
|
self.connections.contains { $0.canOrWillBeAbleToExecuteRequests }
|
|
}
|
|
|
|
/// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one?
|
|
/// - Parameter eventLoop: connection `EventLoop` to search for
|
|
/// - Returns: true if at least one connection is starting, backing off or active for the given `eventLoop`
|
|
func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool {
|
|
self.connections.contains {
|
|
$0.eventLoop === eventLoop && $0.canOrWillBeAbleToExecuteRequests
|
|
}
|
|
}
|
|
|
|
func hasActiveConnection(for eventLoop: EventLoop) -> Bool {
|
|
self.connections.contains {
|
|
$0.eventLoop === eventLoop && $0.isActive
|
|
}
|
|
}
|
|
|
|
/// used after backoff is done to determine if we need to create a new connection
|
|
/// - Parameters:
|
|
/// - eventLoop: connection `EventLoop` to search for
|
|
/// - Returns: if we have a starting or active general purpose connection and if we have also one for the given `eventLoop`
|
|
func backingOffTimerDone(
|
|
for eventLoop: EventLoop
|
|
) -> RetryConnectionCreationContext {
|
|
var hasGeneralPurposeConnection: Bool = false
|
|
var hasConnectionOnSpecifiedEventLoop: Bool = false
|
|
for connection in self.connections {
|
|
guard connection.isStartingOrActive else { continue }
|
|
hasGeneralPurposeConnection = true
|
|
guard connection.eventLoop === eventLoop else { continue }
|
|
hasConnectionOnSpecifiedEventLoop = true
|
|
break
|
|
}
|
|
return RetryConnectionCreationContext(
|
|
hasGeneralPurposeConnection: hasGeneralPurposeConnection,
|
|
hasConnectionOnSpecifiedEventLoop: hasConnectionOnSpecifiedEventLoop
|
|
)
|
|
}
|
|
|
|
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
|
|
assert(
|
|
!self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop),
|
|
"we should not create more than one connection per event loop"
|
|
)
|
|
|
|
let connection = HTTP2ConnectionState(
|
|
connectionID: self.generator.next(),
|
|
eventLoop: eventLoop,
|
|
maximumUses: self.maximumConnectionUses
|
|
)
|
|
self.connections.append(connection)
|
|
return connection.connectionID
|
|
}
|
|
|
|
/// A new HTTP/2 connection was established.
|
|
///
|
|
/// This will put the connection into the idle state.
|
|
///
|
|
/// - Parameter connection: The new established connection.
|
|
/// - Returns: An index and an ``EstablishedConnectionContext`` to determine the next action for the now idle connection.
|
|
/// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after
|
|
/// this.
|
|
mutating func newHTTP2ConnectionEstablished(
|
|
_ connection: Connection,
|
|
maxConcurrentStreams: Int
|
|
) -> (Int, EstablishedConnectionContext) {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else {
|
|
preconditionFailure("There is a new connection that we didn't request!")
|
|
}
|
|
precondition(
|
|
connection.eventLoop === self.connections[index].eventLoop,
|
|
"Expected the new connection to be on EL"
|
|
)
|
|
let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams)
|
|
let context = EstablishedConnectionContext(
|
|
availableStreams: availableStreams,
|
|
eventLoop: connection.eventLoop,
|
|
isIdle: self.connections[index].isIdle,
|
|
connectionID: connection.id
|
|
)
|
|
return (index, context)
|
|
}
|
|
|
|
/// Move the connection state to backingOff.
|
|
///
|
|
/// - Parameter connectionID: The connectionID of the failed connection attempt
|
|
/// - Returns: The eventLoop on which to schedule the backoff timer
|
|
/// - Precondition: connection needs to be in the `.starting` state
|
|
mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
|
|
preconditionFailure("We tried to create a new connection that we know nothing about?")
|
|
}
|
|
|
|
self.connections[index].failedToConnect()
|
|
return self.connections[index].eventLoop
|
|
}
|
|
|
|
// MARK: Connection lifecycle events
|
|
|
|
/// Sets the connection with the given `connectionId` to the draining state.
|
|
/// - Returns: the `EventLoop` to create a new connection on if applicable
|
|
/// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state
|
|
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext? {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
|
|
// When a connection close is initiated by the connection pool (e.g. because the
|
|
// connection was idle for too long), the connection will still report further
|
|
// events to the state machine even though we don't care about its state anymore.
|
|
//
|
|
// This is because the HTTP2Connection has a strong let reference to its delegate.
|
|
return nil
|
|
}
|
|
let eventLoop = self.connections[index].goAwayReceived()
|
|
return GoAwayContext(eventLoop: eventLoop)
|
|
}
|
|
|
|
/// Update the maximum number of concurrent streams for the given connection.
|
|
/// - Parameters:
|
|
/// - connectionID: The connectionID for which we received new settings
|
|
/// - newMaxStreams: new maximum concurrent streams
|
|
/// - Returns: index of the connection and new number of available streams in the `EstablishedConnectionContext`
|
|
/// - Precondition: Connections must be in the `.active` or `.draining` state.
|
|
mutating func newHTTP2MaxConcurrentStreamsReceived(
|
|
_ connectionID: Connection.ID,
|
|
newMaxStreams: Int
|
|
) -> (Int, EstablishedConnectionContext)? {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
|
|
// When a connection close is initiated by the connection pool (e.g. because the
|
|
// connection was idle for too long), the connection will still report its events to
|
|
// the state machine and hence to this `HTTP2Connections` struct. In those cases we
|
|
// must ignore the event.
|
|
return nil
|
|
}
|
|
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
|
|
let context = EstablishedConnectionContext(
|
|
availableStreams: availableStreams,
|
|
eventLoop: self.connections[index].eventLoop,
|
|
isIdle: self.connections[index].isIdle,
|
|
connectionID: connectionID
|
|
)
|
|
return (index, context)
|
|
}
|
|
|
|
// MARK: Leasing and releasing
|
|
|
|
mutating func leaseStream(onPreferred eventLoop: EventLoop) -> (Connection, LeasedStreamContext)? {
|
|
guard let index = self.findAvailableConnection(onPreferred: eventLoop) else { return nil }
|
|
return self.leaseStreams(at: index, count: 1)
|
|
}
|
|
|
|
/// tries to find an available connection on the preferred `eventLoop`. If it can't find one with the given `eventLoop`, it returns the first available connection
|
|
private func findAvailableConnection(onPreferred eventLoop: EventLoop) -> Int? {
|
|
var availableConnectionIndex: Int?
|
|
for (offset, connection) in self.connections.enumerated() {
|
|
guard connection.isAvailable else { continue }
|
|
if connection.eventLoop === eventLoop {
|
|
return self.connections.index(self.connections.startIndex, offsetBy: offset)
|
|
} else if availableConnectionIndex == nil {
|
|
availableConnectionIndex = self.connections.index(self.connections.startIndex, offsetBy: offset)
|
|
}
|
|
}
|
|
return availableConnectionIndex
|
|
}
|
|
|
|
mutating func leaseStream(onRequired eventLoop: EventLoop) -> (Connection, LeasedStreamContext)? {
|
|
guard let index = self.findAvailableConnection(onRequired: eventLoop) else { return nil }
|
|
return self.leaseStreams(at: index, count: 1)
|
|
}
|
|
|
|
/// tries to find an available connection on the required `eventLoop`
|
|
private func findAvailableConnection(onRequired eventLoop: EventLoop) -> Int? {
|
|
self.connections.firstIndex(where: { $0.eventLoop === eventLoop && $0.isAvailable })
|
|
}
|
|
|
|
/// lease `count` streams after connections establishment
|
|
/// - Parameters:
|
|
/// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)`
|
|
/// - count: number of streams you want to lease. You get the current available streams from the `EstablishedConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
|
|
/// - Returns: connection to execute `count` requests on
|
|
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *1* and not exceed the number of available streams.
|
|
mutating func leaseStreams(at index: Int, count: Int) -> (Connection, LeasedStreamContext) {
|
|
precondition(count >= 1, "stream lease count must be greater than or equal to 1")
|
|
let isIdle = self.connections[index].isIdle
|
|
let connection = self.connections[index].lease(count)
|
|
let context = LeasedStreamContext(wasIdle: isIdle)
|
|
return (connection, context)
|
|
}
|
|
|
|
mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, EstablishedConnectionContext) {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
|
|
preconditionFailure("We tried to release a connection we do not know anything about")
|
|
}
|
|
let availableStreams = self.connections[index].release()
|
|
let context = EstablishedConnectionContext(
|
|
availableStreams: availableStreams,
|
|
eventLoop: self.connections[index].eventLoop,
|
|
isIdle: self.connections[index].isIdle,
|
|
connectionID: connectionID
|
|
)
|
|
return (index, context)
|
|
}
|
|
|
|
// MARK: Connection close/removal
|
|
|
|
/// Closes the connection at the given index. This will also remove the connection right away.
|
|
/// - Parameter index: index of the connection which we get from `releaseStream(_:)`
|
|
/// - Returns: closed and removed connection
|
|
mutating func closeConnection(at index: Int) -> Connection {
|
|
let connection = self.connections[index].close()
|
|
self.removeConnection(at: index)
|
|
return connection
|
|
}
|
|
|
|
/// removes a closed connection.
|
|
/// - Parameter index: index of the connection which we get from `failConnection(_:)`
|
|
/// - Precondition: connection must be closed
|
|
mutating func removeConnection(at index: Int) {
|
|
precondition(self.connections[index].isClosed, "We tried to remove a connection which is not closed")
|
|
self.connections.remove(at: index)
|
|
}
|
|
|
|
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> Connection? {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
|
|
// because of a race this connection (connection close runs against trigger of timeout)
|
|
// was already removed from the state machine.
|
|
return nil
|
|
}
|
|
guard self.connections[index].isIdle else {
|
|
// connection is not idle anymore, we may have just leased it for a request
|
|
return nil
|
|
}
|
|
return self.closeConnection(at: index)
|
|
}
|
|
|
|
/// replaces a closed connection by creating a new starting connection.
|
|
/// - Parameter index: index of the connection which we got from `failConnection(_:)`
|
|
/// - Precondition: connection must be closed
|
|
mutating func createNewConnectionByReplacingClosedConnection(at index: Int) -> (Connection.ID, EventLoop) {
|
|
precondition(self.connections[index].isClosed)
|
|
let newConnection = HTTP2ConnectionState(
|
|
connectionID: self.generator.next(),
|
|
eventLoop: self.connections[index].eventLoop,
|
|
maximumUses: self.maximumConnectionUses
|
|
)
|
|
|
|
self.connections[index] = newConnection
|
|
return (newConnection.connectionID, newConnection.eventLoop)
|
|
}
|
|
|
|
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
|
|
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
|
|
// When a connection close is initiated by the connection pool (e.g. because the
|
|
// connection was idle for too long), the connection will still report its close to
|
|
// the state machine and then to this `HTTP2Connections` struct. In those cases we
|
|
// must ignore the event.
|
|
return nil
|
|
}
|
|
|
|
switch self.connections[index].fail() {
|
|
case .none:
|
|
return nil
|
|
|
|
case .removeConnection:
|
|
let eventLoop = self.connections[index].eventLoop
|
|
let context = FailedConnectionContext(eventLoop: eventLoop)
|
|
return (index, context)
|
|
}
|
|
}
|
|
|
|
mutating func shutdown() -> CleanupContext {
|
|
var cleanupContext = CleanupContext()
|
|
self.connections.removeAll(where: { connectionState in
|
|
switch connectionState.cleanup(&cleanupContext) {
|
|
case .removeConnection:
|
|
return true
|
|
case .keepConnection:
|
|
return false
|
|
}
|
|
})
|
|
return cleanupContext
|
|
}
|
|
|
|
// MARK: Result structs
|
|
|
|
struct RetryConnectionCreationContext {
|
|
/// true if at least one connection is starting or active regardless of the event loop.
|
|
let hasGeneralPurposeConnection: Bool
|
|
|
|
/// true if at least one connection is starting or active for the given `eventLoop`
|
|
let hasConnectionOnSpecifiedEventLoop: Bool
|
|
}
|
|
|
|
/// Information around a connection which is either in the .active or .draining state.
|
|
struct EstablishedConnectionContext {
|
|
/// number of streams which can be leased
|
|
var availableStreams: Int
|
|
/// The eventLoop the connection is running on.
|
|
var eventLoop: EventLoop
|
|
/// true if no stream is leased
|
|
var isIdle: Bool
|
|
/// id of the connection
|
|
var connectionID: Connection.ID
|
|
}
|
|
|
|
struct LeasedStreamContext {
|
|
/// true if the connection was idle before leasing the stream
|
|
var wasIdle: Bool
|
|
}
|
|
|
|
struct GoAwayContext {
|
|
/// The eventLoop the connection is running on.
|
|
var eventLoop: EventLoop
|
|
}
|
|
|
|
/// Information around the failed/closed connection.
|
|
struct FailedConnectionContext {
|
|
/// The eventLoop the connection ran on.
|
|
var eventLoop: EventLoop
|
|
}
|
|
|
|
struct Stats: Equatable {
|
|
var startingConnections: Int = 0
|
|
var backingOffConnections: Int = 0
|
|
var idleConnections: Int = 0
|
|
var availableConnections: Int = 0
|
|
var drainingConnections: Int = 0
|
|
var leasedStreams: Int = 0
|
|
var availableStreams: Int = 0
|
|
}
|
|
}
|
|
}
|