L40-kafka-webflux

This commit is contained in:
petrelevich
2023-12-27 23:19:08 +03:00
parent 2b6b7d3b0b
commit de4eb8e34c
2 changed files with 26 additions and 23 deletions
@@ -2,41 +2,45 @@ package com.datasrc;
import com.datasrc.model.RequestId;
import com.datasrc.model.StringValue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
public class StringValueStorage implements AutoCloseable{
public class StringValueStorage implements Sinks.EmitFailureHandler {
private final ScheduledExecutorService executor;
private final Sinks.Many<ResponseData> sink;
private final ConnectableFlux<ResponseData> sinkConnectable;
private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class);
private final Map<RequestId, StringValue> storage = new ConcurrentHashMap<>();
public StringValueStorage(ScheduledExecutorService executor) {
this.executor = executor;
public StringValueStorage() {
sink = Sinks.many().multicast().onBackpressureBuffer();
sinkConnectable = sink.asFlux().publish();
sinkConnectable.connect();
}
public void put(RequestId requestId, StringValue value) {
log.info("put. requestId:{}, value:{}", requestId, value);
storage.put(requestId, value);
sink.emitNext(new ResponseData(requestId, value), this);
}
public Mono<StringValue> get(RequestId requestId) {
return Mono.fromCallable(() -> {
StringValue value;
do
value = executor.schedule(() -> storage.get(requestId), 100, TimeUnit.MILLISECONDS).get();
while (value == null);
return value;
});
return Mono.from(sinkConnectable
.filter(responseData -> {
log.info("waiting:{}, fact:{}", requestId, responseData.requestId);
return responseData.requestId.id() == requestId.id();
})
.map(responseData -> responseData.stringValue));
}
@Override
public void close() {
executor.shutdownNow();
public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
return false;
}
private record ResponseData(RequestId requestId, StringValue stringValue) {
}
}
@@ -13,10 +13,9 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@@ -96,9 +95,9 @@ public class ApplConfig {
return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest);
}
@Bean(destroyMethod = "close")
@Bean
public StringValueStorage stringValueStorage() {
return new StringValueStorage(new ScheduledThreadPoolExecutor(1));
return new StringValueStorage();
}
@Bean(destroyMethod = "close")