L40-kafka-webflux

This commit is contained in:
petrelevich
2023-12-27 21:39:33 +03:00
parent 1fd5724d77
commit 2b6b7d3b0b
4 changed files with 21 additions and 5 deletions
@@ -4,14 +4,22 @@ import com.datasrc.model.RequestId;
import com.datasrc.model.StringValue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
public class StringValueStorage {
public class StringValueStorage implements AutoCloseable{
private final ScheduledExecutorService executor;
private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class);
private final Map<RequestId, StringValue> storage = new ConcurrentHashMap<>();
public StringValueStorage(ScheduledExecutorService executor) {
this.executor = executor;
}
public void put(RequestId requestId, StringValue value) {
log.info("put. requestId:{}, value:{}", requestId, value);
storage.put(requestId, value);
@@ -21,10 +29,14 @@ public class StringValueStorage {
return Mono.fromCallable(() -> {
StringValue value;
do
value = storage.get(requestId);
value = executor.schedule(() -> storage.get(requestId), 100, TimeUnit.MILLISECONDS).get();
while (value == null);
return value;
});
}
@Override
public void close() {
executor.shutdownNow();
}
}
@@ -14,6 +14,7 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.web.reactive.function.client.WebClient;
@@ -22,6 +23,7 @@ import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
@Configuration
@SuppressWarnings("java:S2095")
public class ApplConfig {
private static final int THREAD_POOL_SIZE = 4;
private static final int RESPONSE_RECEIVER_POOL_SIZE = 1;
@@ -94,12 +96,12 @@ public class ApplConfig {
return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest);
}
@Bean
@Bean(destroyMethod = "close")
public StringValueStorage stringValueStorage() {
return new StringValueStorage();
return new StringValueStorage(new ScheduledThreadPoolExecutor(1));
}
@Bean
@Bean(destroyMethod = "close")
public ReactiveReceiver<Response> responseReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers,
@Value("${application.topic-response}") String topicResponse,
@Value("${application.kafka-group-id}") String groupId,
@@ -28,6 +28,7 @@ import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
@Configuration
@SuppressWarnings("java:S2095")
public class ApplConfig {
private static final Logger log = LoggerFactory.getLogger(ApplConfig.class);
private static final int THREAD_POOL_SIZE = 2;
@@ -16,6 +16,7 @@ import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
@Configuration
@SuppressWarnings("java:S2095")
public class ApplConfig {
private static final int THREAD_POOL_SIZE = 2;
private static final int BLOCKING_THREAD_POOL_SIZE = 2;