mirror of
https://github.com/swift-server/RediStack.git
synced 2026-05-03 07:32:28 +00:00
Refactor RedisConnection
Motivation: During proposal review, and while working within the codebase, several issues were identified with how `RedisConnection` was architectured. Modifications: - Change implementation of `RedisConnection` in some areas for new logic of internal `ConnectionState` - Change behavior of logging in a few places - The initializer for `RedisConnection` is now **internal** - How users can override the default `ClientBootstrap` for a connection is by passing an instance to the `.connect` static method - Change unit tests to inherit from a common XCTestCase class that handles creation and cleanup of `RedisConnection` in tests - Remove Redis namespace enum Result: The API for `RedisConnection` should be much simpler, with the implementation being less buggy. This resolves issues #49, #54, and #57.
This commit is contained in:
@@ -24,7 +24,7 @@ To install **RedisNIO**, just add the package as a dependency in your [**Package
|
||||
|
||||
```swift
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/Mordil/swift-redis-nio-client.git", from: "1.0.0-alpha.1")
|
||||
.package(url: "https://github.com/Mordil/swift-redis-nio-client.git", from: "1.0.0-alpha.4")
|
||||
]
|
||||
```
|
||||
|
||||
@@ -32,18 +32,20 @@ and run the following command: `swift package resolve`
|
||||
|
||||
## :zap: Getting Started
|
||||
|
||||
**RedisNIO** is ready to use right after installation.
|
||||
**RedisNIO** is quick to use - all you need is an [`EventLoop`](https://apple.github.io/swift-nio/docs/current/NIO/Protocols/EventLoop.html) from **SwiftNIO**.
|
||||
|
||||
```swift
|
||||
import NIO
|
||||
import RedisNIO
|
||||
|
||||
let connection = Redis.makeConnection(
|
||||
let eventLoop: EventLoop = ...
|
||||
let connection = RedisConnection.connect(
|
||||
to: try .init(ipAddress: "127.0.0.1", port: RedisConnection.defaultPort),
|
||||
password: "my_pass"
|
||||
on: eventLoop
|
||||
).wait()
|
||||
|
||||
let result = try connection.set("my_key", to: "some value")
|
||||
.flatMap { return connection.get("my_key" }
|
||||
.flatMap { return connection.get("my_key") }
|
||||
.wait()
|
||||
|
||||
print(result) // Optional("some value")
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the RedisNIO open source project
|
||||
//
|
||||
// Copyright (c) 2019 RedisNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import struct Logging.Logger
|
||||
import NIO
|
||||
|
||||
/// Top-level namespace for the `RedisNIO` package.
|
||||
///
|
||||
/// To avoid a cluttered global namespace, named definitions that do not start with a `Redis` prefix
|
||||
/// are scoped within this namespace.
|
||||
public enum Redis { }
|
||||
|
||||
// MARK: Connection Factory
|
||||
|
||||
extension Redis {
|
||||
/// Makes a new connection to a Redis instance.
|
||||
///
|
||||
/// As soon as the connection has been opened on the host, an "AUTH" command will be sent to
|
||||
/// Redis to authorize use of additional commands on this new connection.
|
||||
///
|
||||
/// See [https://redis.io/commands/auth](https://redis.io/commands/auth)
|
||||
///
|
||||
/// Example:
|
||||
///
|
||||
/// let elg = MultiThreadedEventLoopGroup(numberOfThreads: 3)
|
||||
/// let connection = Redis.makeConnection(
|
||||
/// to: .init(ipAddress: "127.0.0.1", port: RedisConnection.defaultPort),
|
||||
/// using: elg,
|
||||
/// password: "my_pass"
|
||||
/// )
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - socket: The `SocketAddress` information of the Redis instance to connect to.
|
||||
/// - group: The `EventLoopGroup` to build the connection on. Default is a single threaded `EventLoopGroup`.
|
||||
/// - password: The optional password to authorize the client with.
|
||||
/// - logger: The `Logger` instance to log with.
|
||||
/// - Returns: A `RedisConnection` instance representing this new connection.
|
||||
public static func makeConnection(
|
||||
to socket: SocketAddress,
|
||||
using group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1),
|
||||
password: String? = nil,
|
||||
logger: Logger = Logger(label: "RedisNIO.RedisConnection")
|
||||
) -> EventLoopFuture<RedisConnection> {
|
||||
let client = ClientBootstrap.makeRedisTCPClient(group: group)
|
||||
|
||||
return client.connect(to: socket)
|
||||
.map { return RedisConnection(channel: $0, logger: logger) }
|
||||
.flatMap { client in
|
||||
guard let pw = password else {
|
||||
return group.next().makeSucceededFuture(client)
|
||||
}
|
||||
|
||||
let args = [RESPValue(bulk: pw)]
|
||||
return client.send(command: "AUTH", with: args)
|
||||
.map { _ in return client }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -12,12 +12,7 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import struct Foundation.UUID
|
||||
import struct Dispatch.DispatchTime
|
||||
import Logging
|
||||
import Metrics
|
||||
import NIO
|
||||
import NIOConcurrencyHelpers
|
||||
|
||||
/// An object capable of sending commands and receiving responses.
|
||||
///
|
||||
@@ -46,144 +41,3 @@ extension RedisClient {
|
||||
return self.send(command: command, with: [])
|
||||
}
|
||||
}
|
||||
|
||||
private let loggingKeyID = "RedisConnection"
|
||||
|
||||
/// A `RedisClient` implementation that represents an individual connection
|
||||
/// to a Redis database instance.
|
||||
///
|
||||
/// `RedisConnection` comes with logging by default.
|
||||
///
|
||||
/// See `RedisClient`
|
||||
public final class RedisConnection: RedisClient {
|
||||
public static let defaultPort = 6379
|
||||
|
||||
private enum ConnectionState {
|
||||
case open
|
||||
case closed
|
||||
}
|
||||
|
||||
/// See `RedisClient.eventLoop`
|
||||
public var eventLoop: EventLoop { return channel.eventLoop }
|
||||
/// Is the client still connected to Redis?
|
||||
public var isConnected: Bool { return state != .closed }
|
||||
/// Controls the timing behavior of sending commands over this connection. The default is `true`.
|
||||
///
|
||||
/// When set to `false`, the host will "queue" commands and determine when to send all at once,
|
||||
/// while `true` will force each command to be sent as soon as they are "queued".
|
||||
/// - Note: Setting this to `true` will trigger all "queued" commands to be sent.
|
||||
public var sendCommandsImmediately: Bool {
|
||||
get { return autoflush.load() }
|
||||
set(newValue) {
|
||||
if newValue { channel.flush() }
|
||||
autoflush.store(newValue)
|
||||
}
|
||||
}
|
||||
|
||||
let channel: Channel
|
||||
private var logger: Logger
|
||||
|
||||
private let autoflush = Atomic<Bool>(value: true)
|
||||
private let _stateLock = Lock()
|
||||
private var _state: ConnectionState
|
||||
private var state: ConnectionState {
|
||||
get { return _stateLock.withLock { self._state } }
|
||||
set(newValue) { _stateLock.withLockVoid { self._state = newValue } }
|
||||
}
|
||||
|
||||
deinit {
|
||||
if isConnected {
|
||||
assertionFailure("close() was not called before deinit!")
|
||||
logger.warning("RedisConnection did not properly shutdown before deinit!")
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new connection on the provided `Channel`.
|
||||
/// - Important: Call `close()` before deinitializing to properly cleanup resources.
|
||||
/// - Note: This connection will take ownership of the channel.
|
||||
/// - Parameters:
|
||||
/// - channel: The `Channel` to read and write from.
|
||||
/// - logger: The `Logger` instance to use for all logging purposes.
|
||||
public init(channel: Channel, logger: Logger = Logger(label: "RedisNIO.RedisConnection")) {
|
||||
self.channel = channel
|
||||
self.logger = logger
|
||||
|
||||
self.logger[metadataKey: loggingKeyID] = "\(UUID())"
|
||||
self.logger.debug("Connection created.")
|
||||
self._state = .open
|
||||
RedisMetrics.activeConnectionCount += 1
|
||||
RedisMetrics.totalConnectionCount.increment()
|
||||
}
|
||||
|
||||
/// Sends a `QUIT` command, then closes the `Channel` this instance was initialized with.
|
||||
///
|
||||
/// See [https://redis.io/commands/quit](https://redis.io/commands/quit)
|
||||
/// - Returns: An `EventLoopFuture` that resolves when the connection has been closed.
|
||||
@discardableResult
|
||||
public func close() -> EventLoopFuture<Void> {
|
||||
guard isConnected else {
|
||||
logger.notice("Connection received more than one close() request.")
|
||||
return channel.eventLoop.makeSucceededFuture(())
|
||||
}
|
||||
|
||||
let result = send(command: "QUIT")
|
||||
.flatMap { _ in
|
||||
let promise = self.channel.eventLoop.makePromise(of: Void.self)
|
||||
self.channel.close(promise: promise)
|
||||
return promise.futureResult
|
||||
}
|
||||
.map {
|
||||
self.logger.debug("Connection closed.")
|
||||
RedisMetrics.activeConnectionCount -= 1
|
||||
}
|
||||
.recover {
|
||||
self.logger.error("Encountered error during close(): \($0)")
|
||||
self.state = .open
|
||||
}
|
||||
|
||||
// setting it to closed now prevents multiple close() chains, but doesn't stop the QUIT command
|
||||
// if the connection wasn't closed, it's reset in the callback chain
|
||||
state = .closed
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
/// Sends commands to the Redis instance this connection is tied to.
|
||||
///
|
||||
/// See `RedisClient.send(command:with:)`
|
||||
///
|
||||
/// - Note: The timing of when commands are actually sent to Redis are controlled by
|
||||
/// the `sendCommandsImmediately` property.
|
||||
public func send(
|
||||
command: String,
|
||||
with arguments: [RESPValue]
|
||||
) -> EventLoopFuture<RESPValue> {
|
||||
guard isConnected else {
|
||||
logger.error("\(RedisNIOError.connectionClosed.localizedDescription)")
|
||||
return channel.eventLoop.makeFailedFuture(RedisNIOError.connectionClosed)
|
||||
}
|
||||
|
||||
var commandParts: [RESPValue] = [.init(bulk: command)]
|
||||
commandParts.append(contentsOf: arguments)
|
||||
|
||||
let promise = channel.eventLoop.makePromise(of: RESPValue.self)
|
||||
let context = RedisCommand(
|
||||
command: .array(commandParts),
|
||||
promise: promise
|
||||
)
|
||||
|
||||
let startTime = DispatchTime.now().uptimeNanoseconds
|
||||
promise.futureResult.whenComplete { result in
|
||||
let duration = DispatchTime.now().uptimeNanoseconds - startTime
|
||||
RedisMetrics.commandRoundTripTime.recordNanoseconds(duration)
|
||||
guard case let .failure(error) = result else { return }
|
||||
self.logger.error("\(error.localizedDescription)")
|
||||
}
|
||||
logger.debug("Sending command \"\(command)\" with \(arguments)")
|
||||
|
||||
guard sendCommandsImmediately else {
|
||||
return channel.write(context).flatMap { promise.futureResult }
|
||||
}
|
||||
return channel.writeAndFlush(context).flatMap { promise.futureResult }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,292 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the RedisNIO open source project
|
||||
//
|
||||
// Copyright (c) 2019 RedisNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import struct Foundation.UUID
|
||||
import struct Dispatch.DispatchTime
|
||||
import Logging
|
||||
import Metrics
|
||||
import NIO
|
||||
import NIOConcurrencyHelpers
|
||||
|
||||
private let loggingKeyID = String(describing: RedisConnection.self)
|
||||
|
||||
extension RedisConnection {
|
||||
/// The documented default port that Redis connects through.
|
||||
///
|
||||
/// See [https://redis.io/topics/quickstart](https://redis.io/topics/quickstart)
|
||||
public static let defaultPort = 6379
|
||||
|
||||
/// Creates a new connection to a Redis instance.
|
||||
///
|
||||
/// If you would like to specialize the `NIO.ClientBootstrap` that the connection communicates on, override the default by passing it in as `tcpClient`.
|
||||
///
|
||||
/// let eventLoopGroup: EventLoopGroup = ...
|
||||
/// var customTCPClient = ClientBootstrap.makeRedisTCPClient(group: eventLoopGroup)
|
||||
/// customTCPClient.channelInitializer { channel in
|
||||
/// // channel customizations
|
||||
/// }
|
||||
/// let connection = RedisConnection.connect(
|
||||
/// to: ...,
|
||||
/// on: eventLoopGroup.next(),
|
||||
/// password: ...,
|
||||
/// tcpClient: customTCPClient
|
||||
/// ).wait()
|
||||
///
|
||||
/// It is recommended that you be familiar with `ClientBootstrap.makeRedisTCPClient(group:)` and `NIO.ClientBootstrap` in general before doing so.
|
||||
///
|
||||
/// Note: Use of `wait()` in the example is for simplicity. Never call `wait()` on an event loop.
|
||||
///
|
||||
/// - Important: Call `close()` on the connection before letting the instance deinit to properly cleanup resources.
|
||||
/// - Note: If a `password` is provided, the connection will send an "AUTH" command to Redis as soon as it has been opened.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - socket: The `NIO.SocketAddress` information of the Redis instance to connect to.
|
||||
/// - eventLoop: The `NIO.EventLoop` that this connection will execute all tasks on.
|
||||
/// - password: The optional password to use for authorizing the connection with Redis.
|
||||
/// - logger: The `Logging.Logger` instance to use for all logging purposes. If one is not provided, one will be created.
|
||||
/// A `Foundation.UUID` will be attached to the metadata to uniquely identify this connection instance's logs.
|
||||
/// - tcpClient: If you have chosen to configure a `NIO.ClientBootstrap` yourself, this will.
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves with the new connection after it has been opened, and if a `password` is provided, authenticated.
|
||||
public static func connect(
|
||||
to socket: SocketAddress,
|
||||
on eventLoop: EventLoop,
|
||||
password: String? = nil,
|
||||
logger: Logger = Logger(label: "RedisNIO.RedisConnection"),
|
||||
tcpClient: ClientBootstrap? = nil
|
||||
) -> EventLoopFuture<RedisConnection> {
|
||||
let client = tcpClient ?? ClientBootstrap.makeRedisTCPClient(group: eventLoop)
|
||||
|
||||
return client.connect(to: socket)
|
||||
.map { return RedisConnection(configuredRESPChannel: $0, logger: logger) }
|
||||
.flatMap { connection in
|
||||
guard let pw = password else {
|
||||
return connection.eventLoop.makeSucceededFuture(connection)
|
||||
}
|
||||
|
||||
let args = [RESPValue(bulk: pw)]
|
||||
return connection.send(command: "AUTH", with: args)
|
||||
.map { _ in return connection }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A concrete `RedisClient` implementation that represents an individual connection to a Redis database instance.
|
||||
///
|
||||
/// For basic setups, you will just need a `NIO.SocketAddress` and a `NIO.EventLoop` and perhaps a `password`.
|
||||
///
|
||||
/// let eventLoop: EventLoop = ...
|
||||
/// let connection = RedisConnection.connect(
|
||||
/// to: try .makeAddressResolvingHost("my.redis.url", port: RedisConnection.defaultPort),
|
||||
/// on: eventLoop
|
||||
/// ).wait()
|
||||
///
|
||||
/// let result = try connection.set("my_key", to: "some value")
|
||||
/// .flatMap { return connection.get("my_key") }
|
||||
/// .wait()
|
||||
///
|
||||
/// print(result) // Optional("some value")
|
||||
///
|
||||
/// Note: `wait()` is used in the example for simplicity. Never call `wait()` on an event loop.
|
||||
///
|
||||
/// See `NIO.SocketAddress`, `NIO.EventLoop`, and `RedisClient`
|
||||
public final class RedisConnection: RedisClient {
|
||||
/// See `RedisClient.eventLoop`
|
||||
public var eventLoop: EventLoop { return self.channel.eventLoop }
|
||||
/// Is the connection to Redis still open?
|
||||
public var isConnected: Bool {
|
||||
// `Channel.isActive` is set to false before the `closeFuture` resolves in cases where the channel might be
|
||||
// closed, or closing, before our state has been updated
|
||||
return self.channel.isActive && self.state == .open
|
||||
}
|
||||
/// Controls the behavior of when sending commands over this connection. The default is `true.
|
||||
///
|
||||
/// When set to `false`, the commands will be placed into a buffer, and the host machine will determine when to drain the buffer.
|
||||
/// When set to `true`, the buffer will be drained as soon as commands are added.
|
||||
/// - Important: Even when set to `true`, the host machine may still choose to delay sending commands.
|
||||
/// - Note: Setting this to `true` will immediately drain the buffer.
|
||||
var sendCommandsImmediately: Bool {
|
||||
get { return autoflush.load() }
|
||||
set(newValue) {
|
||||
if newValue { self.channel.flush() }
|
||||
autoflush.store(newValue)
|
||||
}
|
||||
}
|
||||
|
||||
internal let channel: Channel
|
||||
private var logger: Logger
|
||||
|
||||
private let autoflush = Atomic<Bool>(value: true)
|
||||
private let _stateLock = Lock()
|
||||
private var _state: ConnectionState
|
||||
private var state: ConnectionState {
|
||||
get { return _stateLock.withLock { self._state } }
|
||||
set(newValue) { _stateLock.withLockVoid { self._state = newValue } }
|
||||
}
|
||||
|
||||
deinit {
|
||||
if isConnected {
|
||||
assertionFailure("close() was not called before deinit!")
|
||||
logger.warning("RedisConnection did not properly shutdown before deinit!")
|
||||
}
|
||||
}
|
||||
|
||||
internal init(configuredRESPChannel: Channel, logger: Logger) {
|
||||
self.channel = configuredRESPChannel
|
||||
self.logger = logger
|
||||
|
||||
self.logger[metadataKey: loggingKeyID] = "\(UUID())"
|
||||
self._state = .open
|
||||
self.logger.debug("Connection created.")
|
||||
RedisMetrics.activeConnectionCount += 1
|
||||
RedisMetrics.totalConnectionCount.increment()
|
||||
|
||||
// attach a callback to the channel to capture situations where the channel might be closed out from under
|
||||
// the connection
|
||||
self.channel.closeFuture.whenSuccess {
|
||||
// if our state is still open, that means we didn't cause the closeFuture to resolve.
|
||||
// update state, metrics, and logging
|
||||
guard self.state == .open else { return }
|
||||
|
||||
self.state = .closed
|
||||
self.logger.warning("Channel was closed unexpectedly.")
|
||||
RedisMetrics.activeConnectionCount -= 1
|
||||
}
|
||||
}
|
||||
|
||||
internal enum ConnectionState {
|
||||
case open
|
||||
case shuttingDown
|
||||
case closed
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Sending Commands
|
||||
|
||||
extension RedisConnection {
|
||||
/// Sends the command with the provided arguments to Redis.
|
||||
///
|
||||
/// See `RedisClient.send(command:with:)`.
|
||||
///
|
||||
/// - Note: The timing of when commands are actually sent to Redis can be controlled with the `RedisConnection.sendCommandsImmediately` property.
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves with the command's result stored in a `RESPValue`.
|
||||
/// If a `RedisError` is returned, the future will be failed instead.
|
||||
public func send(command: String, with arguments: [RESPValue]) -> EventLoopFuture<RESPValue> {
|
||||
guard self.isConnected else {
|
||||
let error = RedisNIOError.connectionClosed
|
||||
logger.warning("\(error.localizedDescription)")
|
||||
return self.channel.eventLoop.makeFailedFuture(error)
|
||||
}
|
||||
|
||||
var message: [RESPValue] = [.init(bulk: command)]
|
||||
message.append(contentsOf: arguments)
|
||||
|
||||
let promise = channel.eventLoop.makePromise(of: RESPValue.self)
|
||||
let command = RedisCommand(
|
||||
command: .array(message),
|
||||
promise: promise
|
||||
)
|
||||
|
||||
let startTime = DispatchTime.now().uptimeNanoseconds
|
||||
promise.futureResult.whenComplete { result in
|
||||
let duration = DispatchTime.now().uptimeNanoseconds - startTime
|
||||
RedisMetrics.commandRoundTripTime.recordNanoseconds(duration)
|
||||
|
||||
// log the error here instead
|
||||
guard case let .failure(error) = result else { return }
|
||||
self.logger.error("\(error.localizedDescription)")
|
||||
}
|
||||
|
||||
self.logger.debug("Sending command \"\(command)\"\(arguments.count > 0 ? " with \(arguments)" : "")")
|
||||
|
||||
if self.sendCommandsImmediately {
|
||||
return channel.writeAndFlush(command).flatMap { promise.futureResult }
|
||||
} else {
|
||||
return channel.write(command).flatMap { promise.futureResult }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Closing a Connection
|
||||
|
||||
extension RedisConnection {
|
||||
/// Sends a `QUIT` command to Redis, then closes the `NIO.Channel` that supports this connection.
|
||||
///
|
||||
/// See [https://redis.io/commands/quit](https://redis.io/commands/quit)
|
||||
/// - Important: Regardless if the returned `NIO.EventLoopFuture` fails or succeeds - after calling this method the connection should no longer be
|
||||
/// used for sending commands to Redis.
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves when the connection has been closed.
|
||||
@discardableResult
|
||||
public func close() -> EventLoopFuture<Void> {
|
||||
guard self.isConnected else {
|
||||
// return the channel's close future, which is resolved as the last step in channel shutdown
|
||||
return self.channel.closeFuture
|
||||
}
|
||||
|
||||
// we're now in a shutdown state, starting with the command queue.
|
||||
self.state = .shuttingDown
|
||||
|
||||
let notification = self.sendQuitCommand() // send "QUIT" so that all the responses are written out
|
||||
.flatMap { self.closeChannel() } // close the channel from our end
|
||||
|
||||
notification.whenFailure { self.logger.error("Encountered error during close(): \($0)") }
|
||||
notification.whenSuccess {
|
||||
self.state = .closed
|
||||
self.logger.debug("Connection closed.")
|
||||
RedisMetrics.activeConnectionCount -= 1
|
||||
}
|
||||
|
||||
return notification
|
||||
}
|
||||
|
||||
/// Bypasses everything for a normal command and explicitly just sends a "QUIT" command to Redis.
|
||||
/// - Note: If the command fails, the `NIO.EventLoopFuture` will still succeed - as it's not critical for the command to succeed.
|
||||
private func sendQuitCommand() -> EventLoopFuture<Void> {
|
||||
let promise = channel.eventLoop.makePromise(of: RESPValue.self)
|
||||
let command = RedisCommand(
|
||||
command: .array([RESPValue(bulk: "QUIT")]),
|
||||
promise: promise
|
||||
)
|
||||
|
||||
logger.debug("Sending QUIT command.")
|
||||
|
||||
return channel.writeAndFlush(command) // write the command
|
||||
.flatMap { promise.futureResult } // chain the callback to the response's
|
||||
.map { _ in () } // ignore the result's value
|
||||
.recover { _ in () } // if there's an error, just return to void
|
||||
}
|
||||
|
||||
/// Attempts to close the `NIO.Channel`.
|
||||
/// SwiftNIO throws a `NIO.EventLoopError.shutdown` if the channel is already closed,
|
||||
/// so that case is captured to let this method's `NIO.EventLoopFuture` still succeed.
|
||||
private func closeChannel() -> EventLoopFuture<Void> {
|
||||
let promise = self.channel.eventLoop.makePromise(of: Void.self)
|
||||
|
||||
self.channel.close(promise: promise)
|
||||
|
||||
// if we succeed, great, if not - check the error that happened
|
||||
return promise.futureResult
|
||||
.flatMapError { error in
|
||||
guard let e = error as? EventLoopError else {
|
||||
return self.eventLoop.makeFailedFuture(error)
|
||||
}
|
||||
|
||||
// if the error is that the channel is already closed, great - just succeed.
|
||||
// otherwise, fail the chain
|
||||
switch e {
|
||||
case .shutdown: return self.eventLoop.makeSucceededFuture(())
|
||||
default: return self.eventLoop.makeFailedFuture(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,20 +16,30 @@ import Foundation
|
||||
import NIO
|
||||
import RedisNIO
|
||||
|
||||
extension Redis {
|
||||
/// Creates a `RedisConnection` using `REDIS_URL` and `REDIS_PW` environment variables if available.
|
||||
extension RedisConnection {
|
||||
/// Creates a connection intended for tests using `REDIS_URL` and `REDIS_PW` environment variables if available.
|
||||
///
|
||||
/// The default URL is `127.0.0.1` while the default port is `RedisConnection.defaultPort`.
|
||||
///
|
||||
/// If `REDIS_PW` is not defined, no authentication will happen on the connection.
|
||||
public static func makeConnection() throws -> EventLoopFuture<RedisConnection> {
|
||||
/// - Parameters:
|
||||
/// - eventLoop: The event loop that the connection should execute on.
|
||||
/// - port: The port to connect on.
|
||||
/// - Returns: A `NIO.EventLoopFuture` that resolves with the new connection.
|
||||
public static func connect(
|
||||
on eventLoop: EventLoop,
|
||||
port: Int = RedisConnection.defaultPort
|
||||
) -> EventLoopFuture<RedisConnection> {
|
||||
let env = ProcessInfo.processInfo.environment
|
||||
return Redis.makeConnection(
|
||||
to: try .makeAddressResolvingHost(
|
||||
env["REDIS_URL"] ?? "127.0.0.1",
|
||||
port: RedisConnection.defaultPort
|
||||
),
|
||||
password: env["REDIS_PW"]
|
||||
)
|
||||
let host = env["REDIS_URL"] ?? "127.0.0.1"
|
||||
|
||||
let address: SocketAddress
|
||||
do {
|
||||
address = try SocketAddress.makeAddressResolvingHost(host, port: port)
|
||||
} catch {
|
||||
return eventLoop.makeFailedFuture(error)
|
||||
}
|
||||
|
||||
return RedisConnection.connect(to: address, on: eventLoop, password: env["REDIS_PW"])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the RedisNIO open source project
|
||||
//
|
||||
// Copyright (c) 2019 RedisNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import NIO
|
||||
import RedisNIO
|
||||
import XCTest
|
||||
|
||||
/// A helper `XCTestCase` subclass that does the standard work of creating a connection to use in test cases.
|
||||
///
|
||||
/// See `RedisConnection.connect(to:port:)` to understand how connections are made.
|
||||
open class RedisIntegrationTestCase: XCTestCase {
|
||||
public var connection: RedisConnection!
|
||||
|
||||
private let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
|
||||
|
||||
deinit {
|
||||
do {
|
||||
try self.eventLoopGroup.syncShutdownGracefully()
|
||||
} catch {
|
||||
print("Failed to gracefully shutdown ELG: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a `RedisNIO.RedisConnection` for the next test case, calling `fatalError` if it was not successful.
|
||||
///
|
||||
/// See `XCTest.XCTestCase.setUp()`
|
||||
open override func setUp() {
|
||||
do {
|
||||
connection = try self.makeNewConnection()
|
||||
} catch {
|
||||
fatalError("Failed to make a RedisConnection: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a "FLUSHALL" command to Redis to clear it of any data from the previous test, then closes the connection.
|
||||
///
|
||||
/// If any steps fail, a `fatalError` is thrown.
|
||||
///
|
||||
/// See `XCTest.XCTestCase.tearDown()`
|
||||
open override func tearDown() {
|
||||
do {
|
||||
if self.connection.isConnected {
|
||||
_ = try self.connection.send(command: "FLUSHALL")
|
||||
.flatMap { _ in self.connection.close() }
|
||||
.wait()
|
||||
}
|
||||
|
||||
self.connection = nil
|
||||
} catch {
|
||||
fatalError("Failed to properly cleanup connection: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new connection for use in tests.
|
||||
///
|
||||
/// See `RedisConnection.connect(to:port:)`
|
||||
/// - Returns: The new `RedisNIO.RedisConnection`.
|
||||
public func makeNewConnection() throws -> RedisConnection {
|
||||
return try RedisConnection.connect(on: eventLoopGroup.next()).wait()
|
||||
}
|
||||
}
|
||||
@@ -16,31 +16,13 @@
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class BasicCommandsTests: XCTestCase {
|
||||
private var connection: RedisConnection!
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
connection = try Redis.makeConnection().wait()
|
||||
} catch {
|
||||
XCTFail("Failed to create RedisConnection! \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
_ = try? connection.send(command: "FLUSHALL").wait()
|
||||
try? connection.close().wait()
|
||||
connection = nil
|
||||
}
|
||||
|
||||
final class BasicCommandsTests: RedisIntegrationTestCase {
|
||||
func test_select() {
|
||||
XCTAssertNoThrow(try connection.select(database: 3).wait())
|
||||
}
|
||||
|
||||
func test_delete() {
|
||||
do {
|
||||
func test_delete() throws {
|
||||
let keys = [ #function + "1", #function + "2", #function + "3" ]
|
||||
try connection.close().wait()
|
||||
try connection.set(keys[0], to: "value").wait()
|
||||
try connection.set(keys[1], to: "value").wait()
|
||||
try connection.set(keys[2], to: "value").wait()
|
||||
@@ -53,10 +35,6 @@ final class BasicCommandsTests: XCTestCase {
|
||||
|
||||
let third = try connection.delete([keys[1], keys[2]]).wait()
|
||||
XCTAssertEqual(third, 2)
|
||||
}
|
||||
catch {
|
||||
print("failed")
|
||||
}
|
||||
}
|
||||
|
||||
func test_expire() throws {
|
||||
|
||||
@@ -13,25 +13,10 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
@testable import RedisNIO
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class HashCommandsTests: XCTestCase {
|
||||
private var connection: RedisConnection!
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
connection = try Redis.makeConnection().wait()
|
||||
} catch {
|
||||
XCTFail("Failed to create RedisConnection! \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
_ = try? connection.send(command: "FLUSHALL").wait()
|
||||
try? connection.close().wait()
|
||||
connection = nil
|
||||
}
|
||||
|
||||
final class HashCommandsTests: RedisIntegrationTestCase {
|
||||
func test_hset() throws {
|
||||
var result = try connection.hset("test", to: "\(#line)", in: #function).wait()
|
||||
XCTAssertTrue(result)
|
||||
|
||||
@@ -13,25 +13,10 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
@testable import RedisNIO
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class ListCommandsTests: XCTestCase {
|
||||
private var connection: RedisConnection!
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
connection = try Redis.makeConnection().wait()
|
||||
} catch {
|
||||
XCTFail("Failed to create RedisConnection! \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
_ = try? connection.send(command: "FLUSHALL").wait()
|
||||
try? connection.close().wait()
|
||||
connection = nil
|
||||
}
|
||||
|
||||
final class ListCommandsTests: RedisIntegrationTestCase {
|
||||
func test_llen() throws {
|
||||
var length = try connection.llen(of: #function).wait()
|
||||
XCTAssertEqual(length, 0)
|
||||
@@ -108,14 +93,16 @@ final class ListCommandsTests: XCTestCase {
|
||||
let element = try connection.brpoplpush(from: "first", to: "second").wait() ?? .null
|
||||
XCTAssertEqual(Int(fromRESP: element), 10)
|
||||
|
||||
let blockingConnection = try Redis.makeConnection().wait()
|
||||
let blockingConnection = try self.makeNewConnection()
|
||||
let expectation = XCTestExpectation(description: "brpoplpush should never return")
|
||||
_ = blockingConnection.bzpopmin(from: #function)
|
||||
.always { _ in expectation.fulfill() }
|
||||
.always { _ in
|
||||
expectation.fulfill()
|
||||
blockingConnection.close()
|
||||
}
|
||||
|
||||
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
|
||||
XCTAssertEqual(result, .timedOut)
|
||||
try blockingConnection.channel.close().wait()
|
||||
}
|
||||
|
||||
func test_linsert() throws {
|
||||
@@ -156,14 +143,16 @@ final class ListCommandsTests: XCTestCase {
|
||||
let pop2 = try connection.blpop(from: ["fake", "first"]).wait()
|
||||
XCTAssertEqual(pop2?.0, "first")
|
||||
|
||||
let blockingConnection = try Redis.makeConnection().wait()
|
||||
let blockingConnection = try self.makeNewConnection()
|
||||
let expectation = XCTestExpectation(description: "blpop should never return")
|
||||
_ = blockingConnection.bzpopmin(from: #function)
|
||||
.always { _ in expectation.fulfill() }
|
||||
.always { _ in
|
||||
expectation.fulfill()
|
||||
blockingConnection.close()
|
||||
}
|
||||
|
||||
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
|
||||
XCTAssertEqual(result, .timedOut)
|
||||
try blockingConnection.channel.close().wait()
|
||||
}
|
||||
|
||||
func test_lpush() throws {
|
||||
@@ -213,14 +202,16 @@ final class ListCommandsTests: XCTestCase {
|
||||
let pop2 = try connection.brpop(from: ["fake", "first"]).wait()
|
||||
XCTAssertEqual(pop2?.0, "first")
|
||||
|
||||
let blockingConnection = try Redis.makeConnection().wait()
|
||||
let blockingConnection = try self.makeNewConnection()
|
||||
let expectation = XCTestExpectation(description: "brpop should never return")
|
||||
_ = blockingConnection.bzpopmin(from: #function)
|
||||
.always { _ in expectation.fulfill() }
|
||||
.always { _ in
|
||||
expectation.fulfill()
|
||||
blockingConnection.close()
|
||||
}
|
||||
|
||||
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
|
||||
XCTAssertEqual(result, .timedOut)
|
||||
try blockingConnection.channel.close().wait()
|
||||
}
|
||||
|
||||
func test_rpush() throws {
|
||||
|
||||
@@ -13,25 +13,10 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
@testable import RedisNIO
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class SetCommandsTests: XCTestCase {
|
||||
private var connection: RedisConnection!
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
connection = try Redis.makeConnection().wait()
|
||||
} catch {
|
||||
XCTFail("Failed to create RedisConnection! \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
_ = try? connection.send(command: "FLUSHALL").wait()
|
||||
try? connection.close().wait()
|
||||
connection = nil
|
||||
}
|
||||
|
||||
final class SetCommandsTests: RedisIntegrationTestCase {
|
||||
func test_sadd() throws {
|
||||
var insertCount = try connection.sadd([1, 2, 3], to: #function).wait()
|
||||
XCTAssertEqual(insertCount, 3)
|
||||
|
||||
@@ -14,19 +14,17 @@
|
||||
|
||||
import NIO
|
||||
@testable import RedisNIO
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class SortedSetCommandsTests: XCTestCase {
|
||||
final class SortedSetCommandsTests: RedisIntegrationTestCase {
|
||||
private static let testKey = "SortedSetCommandsTests"
|
||||
|
||||
private var connection: RedisConnection!
|
||||
|
||||
private var key: String { return SortedSetCommandsTests.testKey }
|
||||
|
||||
override func setUp() {
|
||||
super.setUp()
|
||||
do {
|
||||
connection = try Redis.makeConnection().wait()
|
||||
|
||||
var dataset: [(Int, Double)] = []
|
||||
for index in 1...10 {
|
||||
dataset.append((index, Double(index)))
|
||||
@@ -38,12 +36,6 @@ final class SortedSetCommandsTests: XCTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
_ = try? connection.send(command: "FLUSHALL").wait()
|
||||
try? connection.close().wait()
|
||||
connection = nil
|
||||
}
|
||||
|
||||
func test_zadd() throws {
|
||||
_ = try connection.send(command: "FLUSHALL").wait()
|
||||
|
||||
@@ -168,14 +160,16 @@ final class SortedSetCommandsTests: XCTestCase {
|
||||
XCTAssertEqual(min2?.0, key)
|
||||
XCTAssertEqual(min2?.1, 2)
|
||||
|
||||
let blockingConnection = try Redis.makeConnection().wait()
|
||||
let blockingConnection = try self.makeNewConnection()
|
||||
let expectation = XCTestExpectation(description: "bzpopmin should never return")
|
||||
_ = blockingConnection.bzpopmin(from: #function)
|
||||
.always { _ in expectation.fulfill() }
|
||||
.always { _ in
|
||||
expectation.fulfill()
|
||||
blockingConnection.close()
|
||||
}
|
||||
|
||||
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
|
||||
XCTAssertEqual(result, .timedOut)
|
||||
try blockingConnection.channel.close().wait()
|
||||
}
|
||||
|
||||
func test_zpopmax() throws {
|
||||
@@ -200,14 +194,16 @@ final class SortedSetCommandsTests: XCTestCase {
|
||||
XCTAssertEqual(max2?.0, key)
|
||||
XCTAssertEqual(max2?.1, 9)
|
||||
|
||||
let blockingConnection = try Redis.makeConnection().wait()
|
||||
let blockingConnection = try self.makeNewConnection()
|
||||
let expectation = XCTestExpectation(description: "bzpopmax should never return")
|
||||
_ = blockingConnection.bzpopmax(from: #function)
|
||||
.always { _ in expectation.fulfill() }
|
||||
.always { _ in
|
||||
expectation.fulfill()
|
||||
blockingConnection.close()
|
||||
}
|
||||
|
||||
let result = XCTWaiter.wait(for: [expectation], timeout: 1)
|
||||
XCTAssertEqual(result, .timedOut)
|
||||
try blockingConnection.channel.close().wait()
|
||||
}
|
||||
|
||||
func test_zincrby() throws {
|
||||
|
||||
@@ -13,27 +13,12 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
@testable import RedisNIO
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class StringCommandsTests: XCTestCase {
|
||||
final class StringCommandsTests: RedisIntegrationTestCase {
|
||||
private static let testKey = "SortedSetCommandsTests"
|
||||
|
||||
private var connection: RedisConnection!
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
connection = try Redis.makeConnection().wait()
|
||||
} catch {
|
||||
XCTFail("Failed to create RedisConnection! \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
_ = try? connection.send(command: "FLUSHALL").wait()
|
||||
try? connection.close().wait()
|
||||
connection = nil
|
||||
}
|
||||
|
||||
func test_get() throws {
|
||||
try connection.set(#function, to: "value").wait()
|
||||
let result = try connection.get(#function).wait()
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the RedisNIO open source project
|
||||
//
|
||||
// Copyright (c) 2019 RedisNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of RedisNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
@testable import RedisNIO
|
||||
import RedisNIOTestUtils
|
||||
import XCTest
|
||||
|
||||
final class RedisConnectionTests: RedisIntegrationTestCase {
|
||||
static let expectedLogsMessage = "The following log(s) in this test are expected."
|
||||
|
||||
func test_unexpectedChannelClose() throws {
|
||||
print(RedisConnectionTests.expectedLogsMessage)
|
||||
|
||||
XCTAssertTrue(self.connection.isConnected)
|
||||
try self.connection.channel.close().wait()
|
||||
XCTAssertFalse(self.connection.isConnected)
|
||||
}
|
||||
|
||||
func test_callingCloseMultipleTimes() throws {
|
||||
let first = self.connection.close()
|
||||
let second = self.connection.close()
|
||||
XCTAssertNotEqual(first, self.connection.channel.closeFuture)
|
||||
XCTAssertEqual(second, self.connection.channel.closeFuture)
|
||||
}
|
||||
|
||||
func test_sendingCommandAfterClosing() throws {
|
||||
print(RedisConnectionTests.expectedLogsMessage)
|
||||
|
||||
self.connection.close()
|
||||
do {
|
||||
_ = try self.connection.ping().wait()
|
||||
XCTFail("ping() should throw when connection is closed.")
|
||||
} catch {
|
||||
XCTAssertTrue(error is RedisNIOError)
|
||||
}
|
||||
}
|
||||
|
||||
static var allTests = [
|
||||
("test_unexpectedChannelClose", test_unexpectedChannelClose),
|
||||
("test_callingCloseMultipleTimes", test_callingCloseMultipleTimes),
|
||||
("test_sendingCommandAfterClosing", test_sendingCommandAfterClosing),
|
||||
]
|
||||
}
|
||||
@@ -25,7 +25,8 @@ public func allTests() -> [XCTestCaseEntry] {
|
||||
testCase(HashCommandsTests.allTests),
|
||||
testCase(ListCommandsTests.allTests),
|
||||
testCase(SortedSetCommandsTests.allTests),
|
||||
testCase(StringCommandsTests.allTests)
|
||||
testCase(StringCommandsTests.allTests),
|
||||
testCase(RedisConnectionTests.allTests)
|
||||
]
|
||||
}
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user