mirror of
https://github.com/swift-server/RediStack.git
synced 2026-05-03 07:32:28 +00:00
Revisit user Logging configuration for connections and clients
Motivation: Logging is more dynamic in real world usage than the current static heavy API allows. Users generally want to be capable of updating connection logger metadata to attach dynamic properties such as an HTTP request ID for log tracing. Modifications: - Move all logs to `RedisConnection` - Add `id: UUID` property to `RedisConnection` - Add `logging` property and `setLogging(to:)` method requirements to `RedisClient` - Add chainable `logging(to:)` method extension to `RedisClient` - Add additional `trace` log statements to `RedisConnection` - Change when `RedisConnection.init` logging and metric calls are made - Change some `debug` log statements to `trace in `RedisConnection` Result: Users should have infinitely more flexibility in how RedisConnection, and RedisClient implementations in general, behave in regards to logging.
This commit is contained in:
@@ -12,8 +12,6 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import struct Foundation.UUID
|
||||
import Logging
|
||||
import NIO
|
||||
|
||||
/// The `NIO.ChannelOutboundHandler.OutboundIn` type for `RedisCommandHandler`.
|
||||
@@ -39,24 +37,18 @@ public struct RedisCommand {
|
||||
public final class RedisCommandHandler {
|
||||
/// FIFO queue of promises waiting to receive a response value from a sent command.
|
||||
private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>>
|
||||
private var logger: Logger
|
||||
|
||||
deinit {
|
||||
guard self.commandResponseQueue.count > 0 else { return }
|
||||
self.logger[metadataKey: "Queue Size"] = "\(self.commandResponseQueue.count)"
|
||||
self.logger.warning("Command handler deinit when queue is not empty")
|
||||
if !self.commandResponseQueue.isEmpty {
|
||||
assertionFailure("Command handler deinit when queue is not empty! Queue size: \(self.commandResponseQueue.count)")
|
||||
}
|
||||
}
|
||||
|
||||
/// - Parameters:
|
||||
/// - initialQueueCapacity: The initial queue size to start with. The default is `3`. `RedisCommandHandler` stores all
|
||||
/// - Parameter initialQueueCapacity: The initial queue size to start with. The default is `3`. `RedisCommandHandler` stores all
|
||||
/// `RedisCommand.responsePromise` objects into a buffer, and unless you intend to execute several concurrent commands against Redis,
|
||||
/// and don't want the buffer to resize, you shouldn't need to set this parameter.
|
||||
/// - logger: The `Logging.Logger` instance to use.
|
||||
/// The logger will have a `Foundation.UUID` value attached as metadata to uniquely identify this instance.
|
||||
public init(initialQueueCapacity: Int = 3, logger: Logger = Logger(label: "RediStack.CommandHandler")) {
|
||||
public init(initialQueueCapacity: Int = 3) {
|
||||
self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity)
|
||||
self.logger = logger
|
||||
self.logger[metadataKey: "CommandHandler"] = "\(UUID())"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,8 +70,6 @@ extension RedisCommandHandler: ChannelInboundHandler {
|
||||
|
||||
self.commandResponseQueue.removeAll()
|
||||
queue.forEach { $0.fail(error) }
|
||||
|
||||
self.logger.critical("Error in channel pipeline.", metadata: ["error": "\(error.localizedDescription)"])
|
||||
|
||||
context.close(promise: nil)
|
||||
}
|
||||
@@ -92,10 +82,7 @@ extension RedisCommandHandler: ChannelInboundHandler {
|
||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let value = self.unwrapInboundIn(data)
|
||||
|
||||
guard let leadPromise = self.commandResponseQueue.popFirst() else {
|
||||
self.logger.critical("Read triggered with no promise waiting in the queue!")
|
||||
return
|
||||
}
|
||||
guard let leadPromise = self.commandResponseQueue.popFirst() else { return }
|
||||
|
||||
switch value {
|
||||
case .error(let e):
|
||||
|
||||
@@ -12,37 +12,34 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import struct Logging.Logger
|
||||
import NIO
|
||||
|
||||
/// Encodes outgoing `RESPValue` data into a raw `ByteBuffer`
|
||||
/// according to the Redis Serialization Protocol (RESP).
|
||||
#if DEBUG
|
||||
// used only for debugging purposes where we build a formatted string for the encoded bytes
|
||||
private func getPrintableString(for buffer: inout ByteBuffer) -> String {
|
||||
return String(describing: buffer.getString(at: 0, length: buffer.readableBytes))
|
||||
.dropFirst(9)
|
||||
.dropLast()
|
||||
.description
|
||||
}
|
||||
#endif
|
||||
|
||||
/// Encodes outgoing `RESPValue` data into a raw `ByteBuffer` according to the Redis Serialization Protocol (RESP).
|
||||
///
|
||||
/// See `NIO.MessageToByteEncoder`, `RESPTranslator`, and [https://redis.io/topics/protocol](https://redis.io/topics/protocol)
|
||||
public final class RedisMessageEncoder: MessageToByteEncoder {
|
||||
/// See `MessageToByteEncoder.OutboundIn`
|
||||
public typealias OutboundIn = RESPValue
|
||||
|
||||
private let logger: Logger
|
||||
|
||||
public init(logger: Logger = Logger(label: "RediStack.RedisMessageEncoder")) {
|
||||
self.logger = logger
|
||||
}
|
||||
public init() { }
|
||||
|
||||
/// Encodes the `RedisValue` to bytes, following the RESP specification.
|
||||
///
|
||||
/// See [https://redis.io/topics/protocol](https://redis.io/topics/protocol) and `RESPEncoder.encode(data:out:)`
|
||||
public func encode(data: RESPValue, out: inout ByteBuffer) throws {
|
||||
out.writeRESPValue(data)
|
||||
|
||||
logger.debug("Encoded \(data) to \(getPrintableString(for: &out))")
|
||||
}
|
||||
|
||||
// used only for debugging purposes where we build a formatted string for the encoded bytes
|
||||
private func getPrintableString(for buffer: inout ByteBuffer) -> String {
|
||||
return String(describing: buffer.getString(at: 0, length: buffer.readableBytes))
|
||||
.dropFirst(9)
|
||||
.dropLast()
|
||||
.description
|
||||
// if you're looking to debug the value, set a breakpoint on the return and use `getPrintableString(for:)`
|
||||
// e.g. `po getPrintableString(for: &out)`
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import struct Logging.Logger
|
||||
import NIO
|
||||
|
||||
/// An object capable of sending commands and receiving responses.
|
||||
@@ -24,6 +25,9 @@ import NIO
|
||||
public protocol RedisClient {
|
||||
/// The `EventLoop` that this client operates on.
|
||||
var eventLoop: EventLoop { get }
|
||||
|
||||
/// The `Logging.Logger` that this client uses.
|
||||
var logger: Logger { get }
|
||||
|
||||
/// Sends the desired command with the specified arguments.
|
||||
/// - Parameters:
|
||||
@@ -31,6 +35,10 @@ public protocol RedisClient {
|
||||
/// - arguments: The arguments, if any, to be sent with the command.
|
||||
/// - Returns: An `EventLoopFuture` that will resolve with the Redis command response.
|
||||
func send(command: String, with arguments: [RESPValue]) -> EventLoopFuture<RESPValue>
|
||||
|
||||
/// Updates the client's logger.
|
||||
/// - Parameter logger: The `Logging.Logger` that is desired to receive all client logs.
|
||||
func setLogging(to logger: Logger)
|
||||
}
|
||||
|
||||
extension RedisClient {
|
||||
@@ -41,3 +49,14 @@ extension RedisClient {
|
||||
return self.send(command: command, with: [])
|
||||
}
|
||||
}
|
||||
|
||||
extension RedisClient {
|
||||
/// Updates the client's logger and returns a reference to itself for chaining method calls.
|
||||
/// - Parameter logger: The `Logging.Logger` that is desired to receive all client logs.
|
||||
/// - Returns: A reference to the client for chaining method calls.
|
||||
@inlinable
|
||||
public func logging(to logger: Logger) -> Self {
|
||||
self.setLogging(to: logger)
|
||||
return self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,6 @@ import Metrics
|
||||
import NIO
|
||||
import NIOConcurrencyHelpers
|
||||
|
||||
private let loggingKeyID = String(describing: RedisConnection.self)
|
||||
|
||||
extension RedisConnection {
|
||||
/// The documented default port that Redis connects through.
|
||||
///
|
||||
@@ -54,15 +52,15 @@ extension RedisConnection {
|
||||
/// - socket: The `NIO.SocketAddress` information of the Redis instance to connect to.
|
||||
/// - eventLoop: The `NIO.EventLoop` that this connection will execute all tasks on.
|
||||
/// - password: The optional password to use for authorizing the connection with Redis.
|
||||
/// - logger: The `Logging.Logger` instance to use for all logging purposes. If one is not provided, one will be created.
|
||||
/// - logger: The `Logging.Logger` instance to use for all client logging purposes. If one is not provided, one will be created.
|
||||
/// A `Foundation.UUID` will be attached to the metadata to uniquely identify this connection instance's logs.
|
||||
/// - tcpClient: If you have chosen to configure a `NIO.ClientBootstrap` yourself, this will.
|
||||
/// - tcpClient: If you have chosen to configure a `NIO.ClientBootstrap` yourself, this will be used instead of the `makeRedisTCPClient` instance.
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves with the new connection after it has been opened, and if a `password` is provided, authenticated.
|
||||
public static func connect(
|
||||
to socket: SocketAddress,
|
||||
on eventLoop: EventLoop,
|
||||
password: String? = nil,
|
||||
logger: Logger = Logger(label: "RediStack.RedisConnection"),
|
||||
logger: Logger = .init(label: "RediStack.RedisConnection"),
|
||||
tcpClient: ClientBootstrap? = nil
|
||||
) -> EventLoopFuture<RedisConnection> {
|
||||
let client = tcpClient ?? ClientBootstrap.makeRedisTCPClient(group: eventLoop)
|
||||
@@ -101,7 +99,13 @@ extension RedisConnection {
|
||||
///
|
||||
/// See `NIO.SocketAddress`, `NIO.EventLoop`, and `RedisClient`
|
||||
public final class RedisConnection: RedisClient {
|
||||
/// See `RedisClient.eventLoop`
|
||||
/// A unique identifer to represent this connection.
|
||||
public let id = UUID()
|
||||
public private(set) var logger: Logger {
|
||||
didSet {
|
||||
self.logger[metadataKey: String(describing: RedisConnection.self)] = .string(self.id.description)
|
||||
}
|
||||
}
|
||||
public var eventLoop: EventLoop { return self.channel.eventLoop }
|
||||
/// Is the connection to Redis still open?
|
||||
public var isConnected: Bool {
|
||||
@@ -124,11 +128,10 @@ public final class RedisConnection: RedisClient {
|
||||
}
|
||||
|
||||
internal let channel: Channel
|
||||
private var logger: Logger
|
||||
|
||||
private let autoflush = Atomic<Bool>(value: true)
|
||||
private let _stateLock = Lock()
|
||||
private var _state: ConnectionState
|
||||
private var _state = ConnectionState.open
|
||||
private var state: ConnectionState {
|
||||
get { return _stateLock.withLock { self._state } }
|
||||
set(newValue) { _stateLock.withLockVoid { self._state = newValue } }
|
||||
@@ -144,10 +147,7 @@ public final class RedisConnection: RedisClient {
|
||||
internal init(configuredRESPChannel: Channel, logger: Logger) {
|
||||
self.channel = configuredRESPChannel
|
||||
self.logger = logger
|
||||
|
||||
self.logger[metadataKey: loggingKeyID] = "\(UUID())"
|
||||
self._state = .open
|
||||
self.logger.debug("Connection created.")
|
||||
|
||||
RedisMetrics.activeConnectionCount.increment()
|
||||
RedisMetrics.totalConnectionCount.increment()
|
||||
|
||||
@@ -159,9 +159,11 @@ public final class RedisConnection: RedisClient {
|
||||
guard self.state == .open else { return }
|
||||
|
||||
self.state = .closed
|
||||
self.logger.warning("Channel was closed unexpectedly.")
|
||||
self.logger.error("Channel was closed unexpectedly.")
|
||||
RedisMetrics.activeConnectionCount.decrement()
|
||||
}
|
||||
|
||||
self.logger.trace("Connection created.")
|
||||
}
|
||||
|
||||
internal enum ConnectionState {
|
||||
@@ -182,6 +184,8 @@ extension RedisConnection {
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves with the command's result stored in a `RESPValue`.
|
||||
/// If a `RedisError` is returned, the future will be failed instead.
|
||||
public func send(command: String, with arguments: [RESPValue]) -> EventLoopFuture<RESPValue> {
|
||||
self.logger.trace("Received command")
|
||||
|
||||
guard self.isConnected else {
|
||||
let error = RedisClientError.connectionClosed
|
||||
logger.warning("\(error.localizedDescription)")
|
||||
@@ -203,12 +207,17 @@ extension RedisConnection {
|
||||
RedisMetrics.commandRoundTripTime.recordNanoseconds(duration)
|
||||
|
||||
// log the error here instead
|
||||
guard case let .failure(error) = result else { return }
|
||||
guard case let .failure(error) = result else {
|
||||
self.logger.trace("Command completed.")
|
||||
return
|
||||
}
|
||||
self.logger.error("\(error.localizedDescription)")
|
||||
}
|
||||
|
||||
self.logger.debug("Sending command \"\(command)\"\(arguments.count > 0 ? " with \(arguments)" : "")")
|
||||
|
||||
defer { self.logger.trace("Command sent through channel.") }
|
||||
|
||||
if self.sendCommandsImmediately {
|
||||
return channel.writeAndFlush(command).flatMap { promise.futureResult }
|
||||
} else {
|
||||
@@ -228,6 +237,8 @@ extension RedisConnection {
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves when the connection has been closed.
|
||||
@discardableResult
|
||||
public func close() -> EventLoopFuture<Void> {
|
||||
self.logger.trace("Received request to close the connection.")
|
||||
|
||||
guard self.isConnected else {
|
||||
// return the channel's close future, which is resolved as the last step in channel shutdown
|
||||
return self.channel.closeFuture
|
||||
@@ -257,9 +268,9 @@ extension RedisConnection {
|
||||
message: .array([RESPValue(bulk: "QUIT")]),
|
||||
responsePromise: promise
|
||||
)
|
||||
|
||||
logger.debug("Sending QUIT command.")
|
||||
|
||||
|
||||
logger.trace("Sending QUIT command.")
|
||||
|
||||
return channel.writeAndFlush(command) // write the command
|
||||
.flatMap { promise.futureResult } // chain the callback to the response's
|
||||
.map { _ in () } // ignore the result's value
|
||||
@@ -290,3 +301,15 @@ extension RedisConnection {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Logging
|
||||
|
||||
extension RedisConnection {
|
||||
/// Updates the client's logger, attaching this connection's ID to the metadata.
|
||||
///
|
||||
/// See `RedisClient.setLogging(to:)` and `RedisConnection.id`.
|
||||
/// - Parameter logger: The `Logging.Logger` that is desired to receive all client logs.
|
||||
public func setLogging(to logger: Logger) {
|
||||
self.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,16 +12,13 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Logging
|
||||
@testable import RediStack
|
||||
import RediStackTestUtils
|
||||
import XCTest
|
||||
|
||||
final class RedisConnectionTests: RediStackIntegrationTestCase {
|
||||
static let expectedLogsMessage = "The following log(s) in this test are expected."
|
||||
|
||||
func test_unexpectedChannelClose() throws {
|
||||
print(RedisConnectionTests.expectedLogsMessage)
|
||||
|
||||
XCTAssertTrue(self.connection.isConnected)
|
||||
try self.connection.channel.close().wait()
|
||||
XCTAssertFalse(self.connection.isConnected)
|
||||
@@ -35,8 +32,6 @@ final class RedisConnectionTests: RediStackIntegrationTestCase {
|
||||
}
|
||||
|
||||
func test_sendingCommandAfterClosing() throws {
|
||||
print(RedisConnectionTests.expectedLogsMessage)
|
||||
|
||||
self.connection.close()
|
||||
do {
|
||||
_ = try self.connection.ping().wait()
|
||||
@@ -46,3 +41,47 @@ final class RedisConnectionTests: RediStackIntegrationTestCase {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Logging
|
||||
|
||||
extension RedisConnectionTests {
|
||||
final class TestLogHandler: LogHandler {
|
||||
var messages: [Logger.Message]
|
||||
var metadata: Logger.Metadata
|
||||
var logLevel: Logger.Level
|
||||
|
||||
init() {
|
||||
self.messages = []
|
||||
self.metadata = [:]
|
||||
self.logLevel = .trace
|
||||
}
|
||||
|
||||
func log(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, file: String, function: String, line: UInt) {
|
||||
self.messages.append(message)
|
||||
}
|
||||
|
||||
subscript(metadataKey key: String) -> Logger.Metadata.Value? {
|
||||
get { return self.metadata[key] }
|
||||
set(newValue) { self.metadata[key] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
func test_customLogging() throws {
|
||||
let handler = TestLogHandler()
|
||||
let logger = Logger(label: "test", factory: { _ in return handler })
|
||||
_ = try self.connection.logging(to: logger).ping().wait()
|
||||
XCTAssert(!handler.messages.isEmpty)
|
||||
}
|
||||
|
||||
func test_loggingMetadata() throws {
|
||||
let handler = TestLogHandler()
|
||||
let logger = Logger(label: #function, factory: { _ in return handler })
|
||||
self.connection.setLogging(to: logger)
|
||||
let metadataKey = String(describing: RedisConnection.self)
|
||||
XCTAssertTrue(handler.metadata.keys.contains(metadataKey))
|
||||
XCTAssertEqual(
|
||||
handler.metadata[metadataKey],
|
||||
.string(self.connection.id.description)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user