mirror of
https://github.com/swift-server/RediStack.git
synced 2026-06-02 07:37:33 +00:00
432 lines
15 KiB
Swift
432 lines
15 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the RediStack open source project
|
|
//
|
|
// Copyright (c) 2020-2022 RediStack project authors
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of RediStack project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
import NIOCore
|
|
import NIOEmbedded
|
|
@testable import RediStack
|
|
import RediStackTestUtils
|
|
import XCTest
|
|
|
|
final class RedisPubSubCommandsTests: RediStackIntegrationTestCase {
|
|
func test_singleChannel() throws {
|
|
let subscribeExpectation = self.expectation(description: "subscriber receives initial subscription message")
|
|
let messageExpectation = self.expectation(description: "subscriber receives published message")
|
|
let unsubscribeExpectation = self.expectation(description: "subscriber receives unsubscribe message")
|
|
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let message = "Hello from Redis!"
|
|
|
|
try subscriber.subscribe(
|
|
to: #function,
|
|
{ event in
|
|
switch event {
|
|
case let .subscribed(key, currentSubscriptionCount):
|
|
guard
|
|
key == #function,
|
|
currentSubscriptionCount == 1
|
|
else { return }
|
|
subscribeExpectation.fulfill()
|
|
|
|
case let .unsubscribed(key, currentSubscriptionCount, source):
|
|
guard
|
|
case .userInitiated = source,
|
|
key == #function,
|
|
currentSubscriptionCount == 0
|
|
else { return }
|
|
unsubscribeExpectation.fulfill()
|
|
|
|
case let .message(key, body):
|
|
guard
|
|
key == #function,
|
|
body.string == message
|
|
else { return }
|
|
messageExpectation.fulfill()
|
|
}
|
|
}
|
|
).wait()
|
|
|
|
let subscribersCount = try self.connection.publish(message, to: #function).wait()
|
|
XCTAssertEqual(subscribersCount, 1)
|
|
|
|
try subscriber.unsubscribe(from: #function).wait()
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_multiChannel() throws {
|
|
let channelMessageExpectation = self.expectation(description: "subscriber receives channel message")
|
|
let patternMessageExpectation = self.expectation(description: "subscriber receives pattern message")
|
|
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let channel = RedisChannelName(#function)
|
|
let pattern = "\(channel.rawValue.dropLast(channel.rawValue.count / 2))*"
|
|
|
|
try subscriber
|
|
.subscribe(
|
|
to: channel,
|
|
{
|
|
guard case .message = $0 else { return }
|
|
channelMessageExpectation.fulfill()
|
|
}
|
|
)
|
|
.wait()
|
|
try subscriber
|
|
.psubscribe(
|
|
to: pattern,
|
|
{
|
|
guard case .message = $0 else { return }
|
|
patternMessageExpectation.fulfill()
|
|
}
|
|
)
|
|
.wait()
|
|
|
|
let subscriberCount = try self.connection.publish("hello!", to: channel).wait()
|
|
XCTAssertEqual(subscriberCount, 2)
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_unsubscribeWithoutSubscriptions() throws {
|
|
XCTAssertNoThrow(try self.connection.unsubscribe(from: #function).wait())
|
|
}
|
|
|
|
func test_blockedCommandsThrowInPubSubMode() throws {
|
|
try self.connection.subscribe(to: #function, { _ in }).wait()
|
|
defer { try? self.connection.unsubscribe(from: #function).wait() }
|
|
|
|
XCTAssertThrowsError(try self.connection.send(.lpush("value", into: "List")).wait()) {
|
|
XCTAssertTrue($0 is RedisError)
|
|
}
|
|
}
|
|
|
|
func test_pingInPubSub() throws {
|
|
try self.connection.subscribe(to: #function, { _ in }).wait()
|
|
defer { try? self.connection.unsubscribe(from: #function).wait() }
|
|
|
|
let pong = try self.connection.ping().wait()
|
|
XCTAssertEqual(pong, "PONG")
|
|
|
|
let message = try self.connection.ping(with: "Hello").wait()
|
|
XCTAssertEqual(message, "Hello")
|
|
}
|
|
|
|
func test_quitInPubSub() throws {
|
|
try self.connection.subscribe(to: #function, { _ in }).wait()
|
|
defer { try? self.connection.unsubscribe(from: #function).wait() }
|
|
|
|
let quit = RedisCommand<String>(keyword: "QUIT", arguments: [])
|
|
let result = try self.connection.send(quit).wait()
|
|
XCTAssertEqual(result, "OK")
|
|
}
|
|
|
|
func test_unsubscribeFromAllChannels() throws {
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let channels = (1...5).map { RedisChannelName("\(#function)\($0)") }
|
|
|
|
let expectation = self.expectation(description: "all channel subscriptions should be cancelled")
|
|
expectation.expectedFulfillmentCount = channels.count
|
|
|
|
try subscriber.subscribe(
|
|
to: channels,
|
|
{
|
|
guard case .unsubscribed = $0 else { return }
|
|
expectation.fulfill()
|
|
}
|
|
).wait()
|
|
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
try subscriber.unsubscribe().wait()
|
|
XCTAssertFalse(subscriber.isSubscribed)
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_unsubscribeFromAllPatterns() throws {
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let patterns = (1...3).map { ("*\(#function)\($0)") }
|
|
|
|
let expectation = self.expectation(description: "all pattern subscriptions should be cancelled")
|
|
expectation.expectedFulfillmentCount = patterns.count
|
|
|
|
try subscriber.psubscribe(
|
|
to: patterns,
|
|
{
|
|
guard case .unsubscribed = $0 else { return }
|
|
expectation.fulfill()
|
|
}
|
|
).wait()
|
|
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
try subscriber.punsubscribe().wait()
|
|
XCTAssertFalse(subscriber.isSubscribed)
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_unsubscribeFromAllMixed() throws {
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let expectation = self.expectation(description: "both unsubscribes should be completed")
|
|
expectation.expectedFulfillmentCount = 2
|
|
|
|
XCTAssertFalse(subscriber.isSubscribed)
|
|
|
|
try subscriber.subscribe(
|
|
to: #function,
|
|
{
|
|
guard case .unsubscribed = $0 else { return }
|
|
expectation.fulfill()
|
|
}
|
|
).wait()
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
|
|
try subscriber.psubscribe(
|
|
to: "*\(#function)",
|
|
{
|
|
guard case .unsubscribed = $0 else { return }
|
|
expectation.fulfill()
|
|
}
|
|
).wait()
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
|
|
try subscriber.unsubscribe().wait()
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
|
|
try subscriber.punsubscribe().wait()
|
|
XCTAssertFalse(subscriber.isSubscribed)
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_pubSubNumpat() throws {
|
|
let queryConnection = try self.makeNewConnection()
|
|
defer { try? queryConnection.close().wait() }
|
|
|
|
let numPat = try queryConnection.send(.pubsubNumpat()).wait()
|
|
XCTAssertGreaterThanOrEqual(numPat, 0)
|
|
}
|
|
|
|
func test_pubSubChannels() throws {
|
|
let fn = #function
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let channelNames = (1...10).map {
|
|
RedisChannelName("\(fn)\($0)\($0 % 2 == 0 ? "_even" : "_odd")")
|
|
}
|
|
|
|
for channelName in channelNames {
|
|
try subscriber
|
|
.subscribe(to: channelName, { _ in })
|
|
.wait()
|
|
}
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
defer {
|
|
// Unsubscribe (clean up)
|
|
try? subscriber.unsubscribe(from: channelNames).wait()
|
|
XCTAssertFalse(subscriber.isSubscribed)
|
|
}
|
|
|
|
// Make another connection to query on.
|
|
let queryConnection = try self.makeNewConnection()
|
|
defer { try? queryConnection.close().wait() }
|
|
|
|
let oddChannels = try queryConnection.send(.pubsubChannels(matching: "\(fn)*_odd")).wait()
|
|
XCTAssertEqual(oddChannels.count, channelNames.count / 2)
|
|
|
|
let allChannels = try queryConnection.send(.pubsubChannels()).wait()
|
|
XCTAssertGreaterThanOrEqual(allChannels.count, channelNames.count)
|
|
}
|
|
|
|
func test_pubSubNumsub() throws {
|
|
let fn = #function
|
|
let subscriber = try self.makeNewConnection()
|
|
defer { try? subscriber.close().wait() }
|
|
|
|
let channelNames = (1...5).map {
|
|
RedisChannelName("\(fn)\($0)")
|
|
}
|
|
|
|
for channelName in channelNames {
|
|
try subscriber
|
|
.subscribe(to: channelName, { _ in })
|
|
.wait()
|
|
}
|
|
XCTAssertTrue(subscriber.isSubscribed)
|
|
defer {
|
|
// Unsubscribe (clean up)
|
|
try? subscriber.unsubscribe(from: channelNames).wait()
|
|
XCTAssertFalse(subscriber.isSubscribed)
|
|
}
|
|
|
|
// Make another connection to query on.
|
|
let queryConnection = try self.makeNewConnection()
|
|
defer { try? queryConnection.close().wait() }
|
|
|
|
let notSubscribedChannel = RedisChannelName("\(fn)_notsubbed")
|
|
let numSubs = try queryConnection.send(.pubsubNumsub(forChannels: [channelNames[0], notSubscribedChannel])).wait()
|
|
XCTAssertEqual(numSubs.count, 2)
|
|
|
|
XCTAssertGreaterThanOrEqual(numSubs[channelNames[0]] ?? 0, 1)
|
|
XCTAssertEqual(numSubs[notSubscribedChannel], 0)
|
|
}
|
|
}
|
|
|
|
final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTestCase {
|
|
func test_pool_singleChannel() throws {
|
|
let subscribeExpectation = self.expectation(description: "subscriber receives initial subscription message")
|
|
let messageExpectation = self.expectation(description: "subscriber receives published message")
|
|
let unsubscribeExpectation = self.expectation(description: "subscriber receives unsubscribe message")
|
|
|
|
let subscriber = try self.makeNewPool()
|
|
defer { subscriber.close() }
|
|
|
|
let message = "Hello from Redis!"
|
|
|
|
try subscriber.subscribe(
|
|
to: #function,
|
|
{ event in
|
|
switch event {
|
|
case let .subscribed(key, count):
|
|
guard
|
|
key == #function,
|
|
count == 1
|
|
else { return }
|
|
subscribeExpectation.fulfill()
|
|
|
|
case let .unsubscribed(key, count, source):
|
|
guard
|
|
case .userInitiated = source,
|
|
key == #function,
|
|
count == 0
|
|
else { return }
|
|
unsubscribeExpectation.fulfill()
|
|
|
|
case let .message(key, body):
|
|
guard
|
|
key == #function,
|
|
body.string == message
|
|
else { return }
|
|
messageExpectation.fulfill()
|
|
}
|
|
}
|
|
).wait()
|
|
XCTAssertEqual(subscriber.leasedConnectionCount, 1)
|
|
|
|
let subscribersCount = try self.pool.publish(message, to: #function).wait()
|
|
XCTAssertEqual(subscribersCount, 1)
|
|
|
|
try subscriber.unsubscribe(from: #function).wait()
|
|
XCTAssertEqual(subscriber.leasedConnectionCount, 0)
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_pool_multiChannel() throws {
|
|
let channelMessageExpectation = self.expectation(description: "subscriber receives channel message")
|
|
let patternMessageExpectation = self.expectation(description: "subscriber receives pattern message")
|
|
|
|
let subscriber = try self.makeNewPool()
|
|
defer { subscriber.close() }
|
|
|
|
let channel = RedisChannelName(#function)
|
|
let pattern = "\(channel.rawValue.dropLast(channel.rawValue.count / 2))*"
|
|
|
|
try subscriber
|
|
.subscribe(to: channel, {
|
|
guard case .message = $0 else { return }
|
|
channelMessageExpectation.fulfill()
|
|
})
|
|
.wait()
|
|
XCTAssertEqual(subscriber.leasedConnectionCount, 1)
|
|
try subscriber
|
|
.psubscribe(to: pattern, {
|
|
guard case .message = $0 else { return }
|
|
patternMessageExpectation.fulfill()
|
|
})
|
|
.wait()
|
|
XCTAssertEqual(subscriber.leasedConnectionCount, 1)
|
|
|
|
let subscriberCount = try self.pool.publish("hello!", to: channel).wait()
|
|
XCTAssertEqual(subscriberCount, 2)
|
|
|
|
self.waitForExpectations(timeout: 1)
|
|
}
|
|
|
|
func test_unsubscribeWithoutSubscriptions() throws {
|
|
XCTAssertEqual(self.pool.leasedConnectionCount, 0)
|
|
XCTAssertNoThrow(try self.pool.unsubscribe(from: #function).wait())
|
|
XCTAssertEqual(self.pool.leasedConnectionCount, 0)
|
|
}
|
|
}
|
|
|
|
// MARK: - #103 tests
|
|
|
|
extension RedisPubSubCommandsTests {
|
|
func test_pubsub_calls_unsubscribe_whenUnexpectedClose() throws {
|
|
let channel = EmbeddedChannel()
|
|
try channel
|
|
.pipeline
|
|
.addBaseRedisHandlers()
|
|
.wait()
|
|
|
|
let subscribeExpectation = self.expectation(description: "should see subscribe")
|
|
let unsubscribeExpectation = self.expectation(description: "should see unsubscribe")
|
|
|
|
let connection = RedisConnection(configuredRESPChannel: channel, defaultLogger: .init(label: ""))
|
|
let subscribeFuture = connection
|
|
.subscribe(
|
|
to: [.init(#function)],
|
|
{ event in
|
|
switch event {
|
|
case .message: break
|
|
|
|
case .subscribed: subscribeExpectation.fulfill()
|
|
|
|
case let .unsubscribed(_, _, source):
|
|
guard case .clientError = source else { return }
|
|
unsubscribeExpectation.fulfill()
|
|
}
|
|
}
|
|
)
|
|
|
|
// mimics a successful subscription response from the server
|
|
let allocator = ByteBufferAllocator()
|
|
var buffer = allocator.buffer(capacity: 300)
|
|
buffer.writeRESPValue(.array([
|
|
.init(bulk: "subscribe"),
|
|
.init(bulk: "\(#function)"),
|
|
.integer(1)
|
|
]))
|
|
try channel.writeInbound(buffer)
|
|
|
|
// lets the initial subscription work finish
|
|
try subscribeFuture.wait()
|
|
|
|
// 'unexpected' close, should trigger expectations
|
|
try channel.close().wait()
|
|
|
|
self.waitForExpectations(timeout: 0.5)
|
|
}
|
|
}
|