From de4eb8e34c72e660d2c65564370f5006f4c97ffc Mon Sep 17 00:00:00 2001 From: petrelevich Date: Wed, 27 Dec 2023 23:19:08 +0300 Subject: [PATCH] L40-kafka-webflux --- .../java/com/datasrc/StringValueStorage.java | 42 ++++++++++--------- .../java/com/datasrc/config/ApplConfig.java | 7 ++-- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java index 08b618f5..78d672b1 100644 --- a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java +++ b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java @@ -2,41 +2,45 @@ package com.datasrc; import com.datasrc.model.RequestId; import com.datasrc.model.StringValue; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.ConnectableFlux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; -public class StringValueStorage implements AutoCloseable{ +public class StringValueStorage implements Sinks.EmitFailureHandler { - private final ScheduledExecutorService executor; + private final Sinks.Many sink; + private final ConnectableFlux sinkConnectable; private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class); - private final Map storage = new ConcurrentHashMap<>(); - public StringValueStorage(ScheduledExecutorService executor) { - this.executor = executor; + public StringValueStorage() { + sink = Sinks.many().multicast().onBackpressureBuffer(); + sinkConnectable = sink.asFlux().publish(); + sinkConnectable.connect(); } public void put(RequestId requestId, StringValue value) { log.info("put. requestId:{}, value:{}", requestId, value); - storage.put(requestId, value); + sink.emitNext(new ResponseData(requestId, value), this); } public Mono get(RequestId requestId) { - return Mono.fromCallable(() -> { - StringValue value; - do - value = executor.schedule(() -> storage.get(requestId), 100, TimeUnit.MILLISECONDS).get(); - while (value == null); - return value; - }); + return Mono.from(sinkConnectable + .filter(responseData -> { + log.info("waiting:{}, fact:{}", requestId, responseData.requestId); + return responseData.requestId.id() == requestId.id(); + }) + .map(responseData -> responseData.stringValue)); } + @Override - public void close() { - executor.shutdownNow(); + public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) { + return false; + } + + private record ResponseData(RequestId requestId, StringValue stringValue) { } } diff --git a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java index 7b73a22b..3224b6b8 100644 --- a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java +++ b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java @@ -13,10 +13,9 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.client.reactive.ReactorResourceFactory; - -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; + import org.springframework.web.reactive.function.client.WebClient; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -96,9 +95,9 @@ public class ApplConfig { return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest); } - @Bean(destroyMethod = "close") + @Bean public StringValueStorage stringValueStorage() { - return new StringValueStorage(new ScheduledThreadPoolExecutor(1)); + return new StringValueStorage(); } @Bean(destroyMethod = "close")