diff --git a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java index ee2fa492..08b618f5 100644 --- a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java +++ b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java @@ -4,14 +4,22 @@ import com.datasrc.model.RequestId; import com.datasrc.model.StringValue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -public class StringValueStorage { +public class StringValueStorage implements AutoCloseable{ + + private final ScheduledExecutorService executor; private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class); private final Map storage = new ConcurrentHashMap<>(); + public StringValueStorage(ScheduledExecutorService executor) { + this.executor = executor; + } + public void put(RequestId requestId, StringValue value) { log.info("put. requestId:{}, value:{}", requestId, value); storage.put(requestId, value); @@ -21,10 +29,14 @@ public class StringValueStorage { return Mono.fromCallable(() -> { StringValue value; do - value = storage.get(requestId); + value = executor.schedule(() -> storage.get(requestId), 100, TimeUnit.MILLISECONDS).get(); while (value == null); return value; }); } + @Override + public void close() { + executor.shutdownNow(); + } } diff --git a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java index 9ebd975a..7b73a22b 100644 --- a/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java +++ b/2023-07/spring-40-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java @@ -14,6 +14,7 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.client.reactive.ReactorResourceFactory; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import org.springframework.web.reactive.function.client.WebClient; @@ -22,6 +23,7 @@ import reactor.core.scheduler.Schedulers; import reactor.util.annotation.NonNull; @Configuration +@SuppressWarnings("java:S2095") public class ApplConfig { private static final int THREAD_POOL_SIZE = 4; private static final int RESPONSE_RECEIVER_POOL_SIZE = 1; @@ -94,12 +96,12 @@ public class ApplConfig { return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest); } - @Bean + @Bean(destroyMethod = "close") public StringValueStorage stringValueStorage() { - return new StringValueStorage(); + return new StringValueStorage(new ScheduledThreadPoolExecutor(1)); } - @Bean + @Bean(destroyMethod = "close") public ReactiveReceiver responseReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, @Value("${application.topic-response}") String topicResponse, @Value("${application.kafka-group-id}") String groupId, diff --git a/2023-07/spring-40-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java b/2023-07/spring-40-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java index 4e849a03..de936ef3 100644 --- a/2023-07/spring-40-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java +++ b/2023-07/spring-40-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java @@ -28,6 +28,7 @@ import reactor.util.annotation.NonNull; import reactor.util.annotation.Nullable; @Configuration +@SuppressWarnings("java:S2095") public class ApplConfig { private static final Logger log = LoggerFactory.getLogger(ApplConfig.class); private static final int THREAD_POOL_SIZE = 2; diff --git a/2023-07/spring-40-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java b/2023-07/spring-40-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java index ab1e6a3a..f02e8773 100644 --- a/2023-07/spring-40-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java +++ b/2023-07/spring-40-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java @@ -16,6 +16,7 @@ import reactor.core.scheduler.Schedulers; import reactor.util.annotation.NonNull; @Configuration +@SuppressWarnings("java:S2095") public class ApplConfig { private static final int THREAD_POOL_SIZE = 2; private static final int BLOCKING_THREAD_POOL_SIZE = 2;