From d0086298b73254651c8643700a298dfcd72eb817 Mon Sep 17 00:00:00 2001 From: petrelevich Date: Fri, 8 Nov 2024 09:55:22 +0300 Subject: [PATCH] spring-41 kafka-webflux --- 2024-05/spring-41-kafka-webflux/.gitignore | 39 +++++ .../client/HttpRequests.http | 11 ++ .../client/curlLoop.sh | 5 + .../spring-41-kafka-webflux/client/pom.xml | 64 ++++++++ .../src/main/java/com/datasrc/ClientData.java | 12 ++ .../com/datasrc/ClientDataController.java | 58 ++++++++ .../java/com/datasrc/StringValueStorage.java | 47 ++++++ .../java/com/datasrc/config/ApplConfig.java | 117 +++++++++++++++ .../client/src/main/resources/application.yml | 10 ++ .../client/src/main/resources/logback.xml | 11 ++ .../src/main/resources/static/index.html | 16 ++ .../src/main/resources/static/webclient.js | 24 +++ .../common/HttpRequests.http | 11 ++ .../common/curlLoop.sh | 5 + .../spring-41-kafka-webflux/common/pom.xml | 39 +++++ .../com/datasrc/config/ConsumerException.java | 7 + .../com/datasrc/config/JsonDeserializer.java | 42 ++++++ .../com/datasrc/config/JsonSerializer.java | 34 +++++ .../com/datasrc/config/ReactiveReceiver.java | 127 ++++++++++++++++ .../com/datasrc/config/ReactiveSender.java | 92 ++++++++++++ .../com/datasrc/model/DataForSending.java | 7 + .../main/java/com/datasrc/model/Request.java | 34 +++++ .../java/com/datasrc/model/RequestId.java | 4 + .../main/java/com/datasrc/model/Response.java | 33 +++++ .../java/com/datasrc/model/ResponseId.java | 4 + .../java/com/datasrc/model/StreamData.java | 4 + .../java/com/datasrc/model/StringValue.java | 4 + .../common/src/main/resources/logback.xml | 11 ++ .../docker/docker-compose.yml | 24 +++ 2024-05/spring-41-kafka-webflux/pom.xml | 105 ++++++++++++++ .../spring-41-kafka-webflux/processor/pom.xml | 46 ++++++ .../main/java/com/datasrc/ProcessorData.java | 12 ++ .../com/datasrc/ProcessorDataController.java | 40 +++++ .../java/com/datasrc/config/ApplConfig.java | 137 ++++++++++++++++++ .../com/datasrc/processor/DataProcessor.java | 6 + .../DataProcessorStringReactorFlux.java | 33 +++++ .../DataProcessorStringReactorMono.java | 16 ++ .../src/main/resources/application.yml | 10 ++ .../processor/src/main/resources/logback.xml | 11 ++ .../source/HttpRequests.http | 12 ++ .../spring-41-kafka-webflux/source/pom.xml | 41 ++++++ .../src/main/java/com/datasrc/SourceData.java | 12 ++ .../com/datasrc/SourceDataController.java | 54 +++++++ .../java/com/datasrc/config/ApplConfig.java | 56 +++++++ .../com/datasrc/producer/DataProducer.java | 6 + .../producer/DataProducerStringBlocked.java | 26 ++++ .../producer/DataProducerStringFlux.java | 39 +++++ .../source/src/main/resources/application.yml | 2 + .../source/src/main/resources/logback.xml | 11 ++ 49 files changed, 1571 insertions(+) create mode 100755 2024-05/spring-41-kafka-webflux/.gitignore create mode 100755 2024-05/spring-41-kafka-webflux/client/HttpRequests.http create mode 100755 2024-05/spring-41-kafka-webflux/client/curlLoop.sh create mode 100755 2024-05/spring-41-kafka-webflux/client/pom.xml create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/resources/application.yml create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/resources/logback.xml create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/resources/static/index.html create mode 100755 2024-05/spring-41-kafka-webflux/client/src/main/resources/static/webclient.js create mode 100755 2024-05/spring-41-kafka-webflux/common/HttpRequests.http create mode 100755 2024-05/spring-41-kafka-webflux/common/curlLoop.sh create mode 100755 2024-05/spring-41-kafka-webflux/common/pom.xml create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java create mode 100755 2024-05/spring-41-kafka-webflux/common/src/main/resources/logback.xml create mode 100755 2024-05/spring-41-kafka-webflux/docker/docker-compose.yml create mode 100755 2024-05/spring-41-kafka-webflux/pom.xml create mode 100755 2024-05/spring-41-kafka-webflux/processor/pom.xml create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/resources/application.yml create mode 100755 2024-05/spring-41-kafka-webflux/processor/src/main/resources/logback.xml create mode 100755 2024-05/spring-41-kafka-webflux/source/HttpRequests.http create mode 100755 2024-05/spring-41-kafka-webflux/source/pom.xml create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/resources/application.yml create mode 100755 2024-05/spring-41-kafka-webflux/source/src/main/resources/logback.xml diff --git a/2024-05/spring-41-kafka-webflux/.gitignore b/2024-05/spring-41-kafka-webflux/.gitignore new file mode 100755 index 00000000..dae2cf99 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/.gitignore @@ -0,0 +1,39 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +!gradle-wrapper.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# Ignore Gradle project-specific cache directory +.gradle +/buildSrc/.gradle/ + +# Ignore Gradle build output directory +build +out + +#Idea +*.iml +*.iws +*.ipr +*.idea + diff --git a/2024-05/spring-41-kafka-webflux/client/HttpRequests.http b/2024-05/spring-41-kafka-webflux/client/HttpRequests.http new file mode 100755 index 00000000..3432cafa --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/HttpRequests.http @@ -0,0 +1,11 @@ +### +GET http://localhost:8082/data-mono/16 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8082/data/5 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache diff --git a/2024-05/spring-41-kafka-webflux/client/curlLoop.sh b/2024-05/spring-41-kafka-webflux/client/curlLoop.sh new file mode 100755 index 00000000..32d83a38 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/curlLoop.sh @@ -0,0 +1,5 @@ +date +for run in {1..100000}; do + curl -s "http://localhost:8082/data-mono/$run" > /dev/null +done +date \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/client/pom.xml b/2024-05/spring-41-kafka-webflux/client/pom.xml new file mode 100755 index 00000000..dd59e1e9 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + + ru.otus + spring-41-kafka-webflux + 1.0 + + + client + 1.0 + + + 17 + + + + + ru.otus + common + 1.0 + + + + org.springframework.boot + spring-boot-starter-webflux + + + + io.projectreactor.kafka + reactor-kafka + + + + org.springframework.boot + spring-boot-starter-test + test + + + + io.projectreactor + reactor-test + test + + + + com.github.tomakehurst + wiremock + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java new file mode 100755 index 00000000..341f9828 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java @@ -0,0 +1,12 @@ +package com.datasrc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ClientData { + + public static void main(String[] args) { + SpringApplication.run(ClientData.class, args); + } +} \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java new file mode 100755 index 00000000..1761ca74 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java @@ -0,0 +1,58 @@ +package com.datasrc; + + +import com.datasrc.config.ReactiveSender; +import com.datasrc.model.Request; +import com.datasrc.model.RequestId; +import com.datasrc.model.StreamData; +import com.datasrc.model.StringValue; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + + +// http://localhost:8082/data/5 +@RestController +public class ClientDataController { + private static final Logger log = LoggerFactory.getLogger(ClientDataController.class); + private final AtomicLong idGenerator = new AtomicLong(0); + + private final WebClient webClient; + private final ReactiveSender requestSender; + + private final StringValueStorage stringValueStorage; + + public ClientDataController(WebClient webClient, ReactiveSender requestSender, StringValueStorage stringValueStorage) { + this.webClient = webClient; + this.requestSender = requestSender; + this.stringValueStorage = stringValueStorage; + } + + @GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux data(@PathVariable("seed") long seed) { + log.info("request for data, seed:{}", seed); + + return webClient.get().uri(String.format("/data/%d", seed)) + .accept(MediaType.APPLICATION_NDJSON) + .retrieve() + .bodyToFlux(StreamData.class) + .doOnNext(val -> log.info("val:{}", val)); + } + + @GetMapping(value = "/data-mono/{seed}", produces = MediaType.APPLICATION_JSON_VALUE) + public Mono dataMono(@PathVariable("seed") long seed) { + log.info("request for string data-mono, seed:{}", seed); + var request = new Request(new RequestId(idGenerator.incrementAndGet()), seed); + + return requestSender.send(request, requestSend -> log.info("send ok: {}", requestSend)) + .flatMap(v -> stringValueStorage.get(new RequestId(v.correlationMetadata().id()))) + .doOnNext(stringValue -> log.info("value for client:{}", stringValue)); + } +} diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java new file mode 100755 index 00000000..8ffdd2a9 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java @@ -0,0 +1,47 @@ +package com.datasrc; + +import com.datasrc.model.RequestId; +import com.datasrc.model.StringValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.util.annotation.NonNull; + +public class StringValueStorage implements Sinks.EmitFailureHandler { + + private final Sinks.Many sink; + private final ConnectableFlux sinkConnectable; + private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class); + + public StringValueStorage() { + sink = Sinks.many().multicast().onBackpressureBuffer(); + sinkConnectable = sink.asFlux().publish(); + sinkConnectable.connect(); + } + + public void put(RequestId requestId, StringValue value) { + log.info("put. requestId:{}, value:{}", requestId, value); + sink.emitNext(new ResponseData(requestId, value), this); + } + + public Mono get(RequestId requestId) { + return Mono.from(sinkConnectable + .filter(responseData -> { + log.info("waiting:{}, fact:{}", requestId, responseData.requestId); + return responseData.requestId.id() == requestId.id(); + }) + .map(responseData -> responseData.stringValue)); + } + + + @Override + public boolean onEmitFailure(@NonNull SignalType signalType, @NonNull Sinks.EmitResult emitResult) { + return false; + } + + private record ResponseData(RequestId requestId, StringValue stringValue) { + } +} diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java new file mode 100755 index 00000000..96b3f55b --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java @@ -0,0 +1,117 @@ +package com.datasrc.config; + +import com.datasrc.StringValueStorage; +import com.datasrc.model.Request; +import com.datasrc.model.Response; +import io.netty.channel.nio.NioEventLoopGroup; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.client.reactive.ReactorResourceFactory; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.NonNull; + +@Configuration +@SuppressWarnings("java:S2095") +public class ApplConfig { + private static final int THREAD_POOL_SIZE = 4; + private static final int RESPONSE_RECEIVER_POOL_SIZE = 1; + private static final int KAFKA_POOL_SIZE = 1; + + @Bean(name ="serverThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup serverThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) { + var factory = new NettyReactiveWebServerFactory(); + factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop)); + return factory; + } + + @Bean(name ="clientThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup clientThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "client-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactorResourceFactory reactorResourceFactory(@Qualifier("clientThreadEventLoop") NioEventLoopGroup clientThreadEventLoop) { + var resourceFactory = new ReactorResourceFactory(); + resourceFactory.setLoopResources(b -> clientThreadEventLoop); + resourceFactory.setUseGlobalResources(false); + return resourceFactory; + } + + @Bean + public ReactorClientHttpConnector reactorClientHttpConnector(ReactorResourceFactory resourceFactory) { + return new ReactorClientHttpConnector(resourceFactory, mapper -> mapper); + } + + @Bean("responseReceiverScheduler") + public Scheduler responseReceiverScheduler() { + return Schedulers.newParallel("response-receiver", RESPONSE_RECEIVER_POOL_SIZE); + } + + @Bean("kafkaScheduler") + public Scheduler kafkaScheduler() { + return Schedulers.newParallel("kafka-scheduler", KAFKA_POOL_SIZE); + } + + @Bean + public WebClient webClient(WebClient.Builder builder, + @Value("${application.processor.url}") String url) { + return builder + .baseUrl(url) + .build(); + } + + @Bean(destroyMethod = "close") + public ReactiveSender requestSender(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-request}") String topicRequest, + @Qualifier("kafkaScheduler") Scheduler kafkaScheduler + ) { + return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest); + } + + @Bean + public StringValueStorage stringValueStorage() { + return new StringValueStorage(); + } + + @Bean(destroyMethod = "close") + public ReactiveReceiver responseReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-response}") String topicResponse, + @Value("${application.kafka-group-id}") String groupId, + @Qualifier("responseReceiverScheduler") Scheduler responseReceiverScheduler, + StringValueStorage stringValueStorage) { + return new ReactiveReceiver<>(bootstrapServers, Response.class, topicResponse, responseReceiverScheduler, groupId, + response -> stringValueStorage.put(response.data().requestId(), response.data())); + } + +} diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/resources/application.yml b/2024-05/spring-41-kafka-webflux/client/src/main/resources/application.yml new file mode 100755 index 00000000..29b56723 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 8082 + +application: + processor: + url: http://localhost:8081 + kafka-bootstrap-servers: localhost:9092 + kafka-group-id: clientConsumerGroup + topic-request: request + topic-response: response diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/resources/logback.xml b/2024-05/spring-41-kafka-webflux/client/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/resources/static/index.html b/2024-05/spring-41-kafka-webflux/client/src/main/resources/static/index.html new file mode 100755 index 00000000..d42e4d95 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/resources/static/index.html @@ -0,0 +1,16 @@ + + + + + + + + Stream Demo + + +

Stream Demo

+
+ + + + \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/client/src/main/resources/static/webclient.js b/2024-05/spring-41-kafka-webflux/client/src/main/resources/static/webclient.js new file mode 100755 index 00000000..9287c9b8 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/client/src/main/resources/static/webclient.js @@ -0,0 +1,24 @@ +const streamErr = e => { + console.warn("error"); + console.warn(e); +} + +fetch("http://localhost:8082/data/5").then((response) => { + return can.ndjsonStream(response.body); +}).then(dataStream => { + const reader = dataStream.getReader(); + const read = result => { + if (result.done) { + return; + } + render(result.value); + reader.read().then(read, streamErr); + } + reader.read().then(read, streamErr); +}); + +const render = value => { + const div = document.createElement('div'); + div.append(new Date() + ' stringValue:', JSON.stringify(value)); + document.getElementById('dataBlock').append(div); +}; \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/common/HttpRequests.http b/2024-05/spring-41-kafka-webflux/common/HttpRequests.http new file mode 100755 index 00000000..83afa6d0 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/HttpRequests.http @@ -0,0 +1,11 @@ +### +GET http://localhost:8082/data-mono/13 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8082/data/5 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache diff --git a/2024-05/spring-41-kafka-webflux/common/curlLoop.sh b/2024-05/spring-41-kafka-webflux/common/curlLoop.sh new file mode 100755 index 00000000..32d83a38 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/curlLoop.sh @@ -0,0 +1,5 @@ +date +for run in {1..100000}; do + curl -s "http://localhost:8082/data-mono/$run" > /dev/null +done +date \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/common/pom.xml b/2024-05/spring-41-kafka-webflux/common/pom.xml new file mode 100755 index 00000000..07a1fec1 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + ru.otus + spring-41-kafka-webflux + 1.0 + + + common + 1.0 + + + 17 + + + + + ch.qos.logback + logback-classic + provided + + + + io.projectreactor.kafka + reactor-kafka + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java new file mode 100755 index 00000000..1df2c495 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java @@ -0,0 +1,7 @@ +package com.datasrc.config; + +public class ConsumerException extends RuntimeException { + public ConsumerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java new file mode 100755 index 00000000..fa76ab49 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java @@ -0,0 +1,42 @@ +package com.datasrc.config; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +public class JsonDeserializer implements Deserializer { + public static final String OBJECT_MAPPER = "objectMapper"; + public static final String TYPE_REFERENCE = "typeReference"; + private final String encoding = StandardCharsets.UTF_8.name(); + private ObjectMapper mapper; + private JavaType javaType; + + @Override + public void configure(Map configs, boolean isKey) { + mapper = (ObjectMapper) configs.get(OBJECT_MAPPER); + if (mapper == null) { + throw new IllegalArgumentException("config property OBJECT_MAPPER was not set"); + } + javaType = (JavaType) configs.get(TYPE_REFERENCE); + if (javaType == null) { + throw new IllegalArgumentException("config property TYPE_REFERENCE was not set"); + } + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + if (data == null) { + return null; + } else { + var valueAsString = new String(data, encoding); + return mapper.readValue(valueAsString, javaType); + } + } catch (Exception e) { + throw new SerializationException("Error when deserializing byte[] to StringValue", e); + } + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java new file mode 100755 index 00000000..84b48d44 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java @@ -0,0 +1,34 @@ +package com.datasrc.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +public class JsonSerializer implements Serializer { + public static final String OBJECT_MAPPER = "objectMapper"; + private final String encoding = StandardCharsets.UTF_8.name(); + private ObjectMapper mapper; + + @Override + public void configure(Map configs, boolean isKey) { + mapper = (ObjectMapper) configs.get(OBJECT_MAPPER); + if (mapper == null) { + throw new IllegalArgumentException("config property OBJECT_MAPPER was not set"); + } + } + + @Override + public byte[] serialize(String topic, T data) { + try { + if (data == null) { + return new byte[]{}; + } else { + return mapper.writeValueAsString(data).getBytes(encoding); + } + } catch (Exception e) { + throw new SerializationException("Error when serializing StringValue to byte[] ", e); + } + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java new file mode 100755 index 00000000..02cd1d35 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java @@ -0,0 +1,127 @@ +package com.datasrc.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.InetAddress; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.CancellationException; +import java.util.function.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.scheduler.Scheduler; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.util.retry.Retry; + +import static com.datasrc.config.JsonDeserializer.OBJECT_MAPPER; +import static com.datasrc.config.JsonDeserializer.TYPE_REFERENCE; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +public class ReactiveReceiver { + private static final Logger log = LoggerFactory.getLogger(ReactiveReceiver.class); + private final Random random = new Random(); + + private final Disposable kafkaSubscriber; + private final Disposable kafkaConnection; + + public static final int MAX_POLL_INTERVAL_MS = 1_000; + + public ReactiveReceiver( + String bootstrapServers, Class valueClass, String topicName, Scheduler schedulerValueReceiver, String groupId, Consumer valueConsumer + ) { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(GROUP_ID_CONFIG, groupId); + props.put(GROUP_INSTANCE_ID_CONFIG, makeGroupInstanceIdConfig(groupId)); + props.put(ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); + props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + + var objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + props.put(OBJECT_MAPPER, objectMapper); + props.put(TYPE_REFERENCE, objectMapper.getTypeFactory().constructType(valueClass)); + + props.put(MAX_POLL_RECORDS_CONFIG, 3); + props.put(MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL_MS); + + ReceiverOptions receiverOptions = + ReceiverOptions.create(props) + .pollTimeout(Duration.ofSeconds(500)) + .schedulerSupplier(() -> schedulerValueReceiver) + .subscription(Collections.singleton(topicName)); + + Flux> inboundFlux = KafkaReceiver.create(receiverOptions) + .receiveAutoAck() + .concatMap( + consumerRecordFlux -> { + log.info("consumerRecordFlux done, commit"); + return consumerRecordFlux; + }) + .retryWhen(Retry.backoff(3, Duration.of(5, ChronoUnit.SECONDS))); + + Hooks.onErrorDropped( + error -> { + if (error instanceof CancellationException) { + log.info("Cancellation event:", error); + } else { + log.error("error:", error); + } + }); + + var kafkaFlow = inboundFlux.doOnCancel(() -> log.info("connection canceled")) + .doOnError(error -> log.error("Consuming error", error)) + .publish(); + + log.info("start consuming"); + kafkaConnection = kafkaFlow.connect(); + kafkaSubscriber = + kafkaFlow.subscribe( + receiverRecord -> { + var key = receiverRecord.key(); + var value = receiverRecord.value(); + log.info("key:{}, value:{}, record:{}", key, value, receiverRecord); + valueConsumer.accept(value); + }); + } + + public void close() { + log.info("stop consuming"); + kafkaSubscriber.dispose(); + kafkaConnection.dispose(); + } + + private String makeGroupInstanceIdConfig(String groupId) { + try { + var hostName = InetAddress.getLocalHost().getHostName(); + return String.join( + "-", + groupId, + hostName, + String.valueOf(random.nextInt(100_999_999))); + } catch (Exception ex) { + throw new ConsumerException("can't make GroupInstanceIdConfig", ex); + } + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java new file mode 100755 index 00000000..362708dd --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java @@ -0,0 +1,92 @@ +package com.datasrc.config; + +import static com.datasrc.config.JsonSerializer.OBJECT_MAPPER; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.RETRIES_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +import com.datasrc.model.DataForSending; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Properties; +import java.util.function.Consumer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; +import reactor.kafka.sender.SenderResult; + +public class ReactiveSender> { + private static final Logger log = LoggerFactory.getLogger(ReactiveSender.class); + private final KafkaSender sender; + private final String topicName; + + public ReactiveSender(String bootstrapServers, Scheduler schedulerKafka, String topicName) { + this.topicName = topicName; + var props = new Properties(); + props.put(CLIENT_ID_CONFIG, "KafkaReactiveSender"); + props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ACKS_CONFIG, "1"); + props.put(RETRIES_CONFIG, 1); + props.put(BATCH_SIZE_CONFIG, 16384); + props.put(LINGER_MS_CONFIG, 10); + props.put(BUFFER_MEMORY_CONFIG, 26384); // bytes + props.put(MAX_BLOCK_MS_CONFIG, 1_000); // ms + props.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + var objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + props.put(OBJECT_MAPPER, objectMapper); + + SenderOptions senderOptions = + SenderOptions.create(props) + .maxInFlight(10) + .scheduler(schedulerKafka); + + sender = KafkaSender.create(senderOptions); + + var shutdownHook = + new Thread( + () -> { + log.info("closing kafka sender"); + sender.close(); + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + public Mono> send(T data, Consumer sendAsk) { + return sender.send(Mono.just(SenderRecord.create( + topicName, + null, + null, + data.id(), + data, + data))) + .doOnError(error -> log.error("Send failed", error)) + .doOnNext( + senderResult -> { + log.info( + "message id:{} was sent, offset:{}", + senderResult.correlationMetadata().id(), + senderResult.recordMetadata().offset()); + sendAsk.accept(senderResult.correlationMetadata()); + }).next(); + } + + public void close() { + sender.close(); + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java new file mode 100755 index 00000000..6aa91026 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java @@ -0,0 +1,7 @@ +package com.datasrc.model; + +public interface DataForSending { + long id(); + + T data(); +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java new file mode 100755 index 00000000..00623e66 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java @@ -0,0 +1,34 @@ +package com.datasrc.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Request implements DataForSending { + + private final RequestId id; + private final long seed; + + @JsonCreator + public Request(@JsonProperty("id") RequestId id, @JsonProperty("seed") long seed) { + this.id = id; + this.seed = seed; + } + + @Override + public long id() { + return id.id(); + } + + @Override + public Long data() { + return seed; + } + + @Override + public String toString() { + return "Request{" + + "id=" + id + + ", seed=" + seed + + '}'; + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java new file mode 100755 index 00000000..5a602848 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record RequestId(long id) { +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java new file mode 100755 index 00000000..a224a08d --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java @@ -0,0 +1,33 @@ +package com.datasrc.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Response implements DataForSending { + private final ResponseId id; + private final StringValue stringValue; + + @JsonCreator + public Response(@JsonProperty("id") ResponseId id, @JsonProperty("stringValue") StringValue stringValue) { + this.id = id; + this.stringValue = stringValue; + } + + @Override + public long id() { + return id.id(); + } + + @Override + public StringValue data() { + return stringValue; + } + + @Override + public String toString() { + return "Response{" + + "id=" + id + + ", stringValue=" + stringValue + + '}'; + } +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java new file mode 100755 index 00000000..bf7d7d73 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record ResponseId(long id) { +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java new file mode 100755 index 00000000..cc199ad4 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record StreamData(String value) { +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java new file mode 100755 index 00000000..43568176 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record StringValue(RequestId requestId, String value) { +} diff --git a/2024-05/spring-41-kafka-webflux/common/src/main/resources/logback.xml b/2024-05/spring-41-kafka-webflux/common/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/common/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/2024-05/spring-41-kafka-webflux/docker/docker-compose.yml b/2024-05/spring-41-kafka-webflux/docker/docker-compose.yml new file mode 100755 index 00000000..2f5b20cf --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/docker/docker-compose.yml @@ -0,0 +1,24 @@ +version: "3.9" +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.0.0 + container_name: broker + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/pom.xml b/2024-05/spring-41-kafka-webflux/pom.xml new file mode 100755 index 00000000..99c8c426 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + ru.otus + spring-41-kafka-webflux + 1.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.1.1 + + + pom + + + client + processor + source + common + + + + UTF-8 + 17 + 17 + 3.0.0-M3 + 3.1.1 + 3.3.9 + 1.0.12.RELEASE + 3.0.5 + 3.0.2 + 3.0.0-beta-10 + 1.3.18 + + + + + + org.springframework.boot + spring-boot-dependencies + ${springframeworkBoot.version} + pom + import + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + + + io.projectreactor.kafka + reactor-kafka + ${reactorKafka.version} + + + com.github.tomakehurst + wiremock + ${wiremock.version} + + + + + + + + + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + enforce-maven + + enforce + + + + + + ${minimal.maven.version} + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + + + diff --git a/2024-05/spring-41-kafka-webflux/processor/pom.xml b/2024-05/spring-41-kafka-webflux/processor/pom.xml new file mode 100755 index 00000000..932570f1 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + + + ru.otus + spring-41-kafka-webflux + 1.0 + + + processor + 1.0 + + + 17 + + + + + ru.otus + common + 1.0 + + + + org.springframework.boot + spring-boot-starter-webflux + + + + io.projectreactor.kafka + reactor-kafka + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java new file mode 100755 index 00000000..6d911292 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java @@ -0,0 +1,12 @@ +package com.datasrc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ProcessorData { + + public static void main(String[] args) { + SpringApplication.run(ProcessorData.class, args); + } +} \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java new file mode 100755 index 00000000..0b89514f --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java @@ -0,0 +1,40 @@ +package com.datasrc; + + +import com.datasrc.model.StreamData; +import com.datasrc.processor.DataProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +@RestController +public class ProcessorDataController { + private static final Logger log = LoggerFactory.getLogger(ProcessorDataController.class); + + private final DataProcessor> dataProcessorStringReactorFlux; + private final WebClient client; + + public ProcessorDataController(WebClient client, + @Qualifier("dataProcessorFlux") DataProcessor> dataProcessorFlux) { + this.dataProcessorStringReactorFlux = dataProcessorFlux; + this.client = client; + } + + @GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux data(@PathVariable("seed") long seed) { + log.info("request for data, seed:{}", seed); + + var srcRequest = client.get().uri(String.format("/data/%d", seed)) + .accept(MediaType.APPLICATION_NDJSON) + .retrieve() + .bodyToFlux(StreamData.class); + + return dataProcessorStringReactorFlux.process(srcRequest); + } +} diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java new file mode 100755 index 00000000..14ca7d67 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java @@ -0,0 +1,137 @@ +package com.datasrc.config; + +import com.datasrc.model.Request; +import com.datasrc.model.RequestId; +import com.datasrc.model.Response; +import com.datasrc.model.ResponseId; +import com.datasrc.model.StringValue; +import com.datasrc.processor.DataProcessor; +import io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.client.reactive.ReactorResourceFactory; + + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.NonNull; +import reactor.util.annotation.Nullable; + +@Configuration +@SuppressWarnings("java:S2095") +public class ApplConfig { + private static final Logger log = LoggerFactory.getLogger(ApplConfig.class); + private static final int THREAD_POOL_SIZE = 2; + private static final int REQUEST_RECEIVER_POOL_SIZE = 1; + private static final int KAFKA_POOL_SIZE = 1; + + private final AtomicLong responseIdGenerator = new AtomicLong(0); + + @Bean(name ="serverThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup serverThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) { + var factory = new NettyReactiveWebServerFactory(); + factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop)); + return factory; + } + + @Bean(name ="clientThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup clientThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "client-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactorResourceFactory reactorResourceFactory(@Qualifier("clientThreadEventLoop") NioEventLoopGroup clientThreadEventLoop) { + var resourceFactory = new ReactorResourceFactory(); + resourceFactory.setLoopResources(b -> clientThreadEventLoop); + resourceFactory.setUseGlobalResources(false); + return resourceFactory; + } + + @Bean + public ReactorClientHttpConnector reactorClientHttpConnector(ReactorResourceFactory resourceFactory) { + return new ReactorClientHttpConnector(resourceFactory, mapper -> mapper); + } + + @Bean + public Scheduler timer() { + return Schedulers.newParallel("processor-thread", 2); + } + + + @Bean("requestReceiverScheduler") + public Scheduler requestReceiverScheduler() { + return Schedulers.newParallel("request-receiver", REQUEST_RECEIVER_POOL_SIZE); + } + + @Bean("kafkaScheduler") + public Scheduler kafkaScheduler() { + return Schedulers.newParallel("kafka-scheduler", KAFKA_POOL_SIZE); + } + + @Bean + public WebClient webClient(WebClient.Builder builder, + @Value("${application.source.url}") String url) { + return builder + .baseUrl(url) + .build(); + } + + @Bean(destroyMethod = "close") + public ReactiveSender responseSender(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-response}") String topicResponse, + @Qualifier("kafkaScheduler") Scheduler kafkaScheduler + ) { + return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicResponse); + } + + @Bean(destroyMethod = "close") + public ReactiveReceiver requestReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-request}") String topicRequest, + @Value("${application.kafka-group-id}") String groupId, + @Qualifier("requestReceiverScheduler") Scheduler responseReceiverScheduler, + ReactiveSender responseSender, + @Qualifier("dataProcessorMono") DataProcessor dataProcessor, + WebClient webClient) { + + return new ReactiveReceiver<>(bootstrapServers, Request.class, topicRequest, responseReceiverScheduler, groupId, + request -> webClient.get().uri(String.format("/data-mono/%d", request.data())) + .retrieve() + .bodyToMono(String.class) + .map(dataProcessor::process) + .flatMap(stringValue -> + responseSender.send(new Response(new ResponseId(responseIdGenerator.incrementAndGet()), + new StringValue(new RequestId(request.id()), stringValue)), + stringValueDataForSending -> log.info("response send:{}", stringValueDataForSending))) + .subscribe()); + } +} diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java new file mode 100755 index 00000000..55b9c4cd --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java @@ -0,0 +1,6 @@ +package com.datasrc.processor; + +public interface DataProcessor { + + T process(T data); +} diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java new file mode 100755 index 00000000..c0a5155a --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java @@ -0,0 +1,33 @@ +package com.datasrc.processor; + +import com.datasrc.model.StreamData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; + +import java.time.Duration; + +@Service("dataProcessorFlux") +public class DataProcessorStringReactorFlux implements DataProcessor> { + private static final Logger log = LoggerFactory.getLogger(DataProcessorStringReactorFlux.class); + private final Scheduler timer; + + public DataProcessorStringReactorFlux(Scheduler timer) { + this.timer = timer; + } + + @Override + public Flux process(Flux dataflow) { + log.info("processor"); + var dataSeq = dataflow + .doOnNext(val -> log.info("in val:{}", val)) + .delayElements(Duration.ofSeconds(5), timer) + .map(data -> new StreamData(data.value().toUpperCase())) + .doOnNext(val -> log.info("out val:{}", val)); + + log.info("processor method finished"); + return dataSeq; + } +} diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java new file mode 100755 index 00000000..f102c368 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java @@ -0,0 +1,16 @@ +package com.datasrc.processor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service("dataProcessorMono") +public class DataProcessorStringReactorMono implements DataProcessor { + private static final Logger log = LoggerFactory.getLogger(DataProcessorStringReactorMono.class); + + @Override + public String process(String value) { + log.info("processor"); + return value.toUpperCase(); + } +} diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/resources/application.yml b/2024-05/spring-41-kafka-webflux/processor/src/main/resources/application.yml new file mode 100755 index 00000000..ffa5de55 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 8081 + +application: + source: + url: http://localhost:8080 + kafka-bootstrap-servers: localhost:9092 + kafka-group-id: processorConsumerGroup + topic-request: request + topic-response: response diff --git a/2024-05/spring-41-kafka-webflux/processor/src/main/resources/logback.xml b/2024-05/spring-41-kafka-webflux/processor/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/processor/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/2024-05/spring-41-kafka-webflux/source/HttpRequests.http b/2024-05/spring-41-kafka-webflux/source/HttpRequests.http new file mode 100755 index 00000000..be988df3 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/HttpRequests.http @@ -0,0 +1,12 @@ +### +GET http://localhost:8080/data-mono/13 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + + +### +GET http://localhost:8080/data/5 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/source/pom.xml b/2024-05/spring-41-kafka-webflux/source/pom.xml new file mode 100755 index 00000000..b4c43a8e --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + + ru.otus + spring-41-kafka-webflux + 1.0 + + + source + 1.0 + + + 17 + + + + + ru.otus + common + 1.0 + + + + org.springframework.boot + spring-boot-starter-webflux + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java new file mode 100755 index 00000000..a321fbde --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java @@ -0,0 +1,12 @@ +package com.datasrc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SourceData { + + public static void main(String[] args) { + SpringApplication.run(SourceData.class, args); + } +} \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java new file mode 100755 index 00000000..feb4291a --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java @@ -0,0 +1,54 @@ +package com.datasrc; + + +import com.datasrc.model.StreamData; +import com.datasrc.producer.DataProducer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +public class SourceDataController { + private static final Logger log = LoggerFactory.getLogger(SourceDataController.class); + + private final DataProducer> dataProducerFlux; + private final DataProducer dataProducerStringBlocked; + + private final Executor blockingExecutor; + + public SourceDataController(@Qualifier("dataProducerFlux") DataProducer> dataProducerFlux, + @Qualifier("dataProducerStringBlocked") DataProducer dataProducerStringBlocked, + @Qualifier("blockingExecutor") Executor blockingExecutor) { + this.dataProducerFlux = dataProducerFlux; + this.dataProducerStringBlocked = dataProducerStringBlocked; + this.blockingExecutor = blockingExecutor; + } + + @GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux data(@PathVariable("seed") long seed) { + log.info("request for string data, seed:{}", seed); + var stringData = dataProducerFlux.produce(seed); + + log.info("Method request for string data done"); + return stringData; + } + + @GetMapping(value = "/data-mono/{seed}", produces = MediaType.APPLICATION_JSON_VALUE) + public Mono dataMono(@PathVariable("seed") long seed) { + log.info("request for string data-mono, seed:{}", seed); + + var future = CompletableFuture + .supplyAsync(() -> dataProducerStringBlocked.produce(seed), blockingExecutor); + var mono = Mono.fromFuture(future); + log.info("Method request for string data done"); + return mono; + } +} diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java new file mode 100755 index 00000000..c007fb55 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java @@ -0,0 +1,56 @@ +package com.datasrc.config; + +import io.netty.channel.nio.NioEventLoopGroup; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.NonNull; + +@Configuration +@SuppressWarnings("java:S2095") +public class ApplConfig { + private static final int THREAD_POOL_SIZE = 2; + private static final int BLOCKING_THREAD_POOL_SIZE = 2; + + @Bean(name ="serverThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup serverThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) { + var factory = new NettyReactiveWebServerFactory(); + factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop)); + return factory; + } + + @Bean(name= "blockingExecutor", destroyMethod = "close") + public Executor blockingExecutor() { + var id = new AtomicLong(0); + return Executors.newFixedThreadPool(BLOCKING_THREAD_POOL_SIZE, + task -> new Thread(task, String.format("blocking-thread-%d", id.incrementAndGet()))); + } + + @Bean + public Scheduler timer() { + return Schedulers.newParallel("processor-thread", 2); + } +} diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java new file mode 100755 index 00000000..e205bb5e --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java @@ -0,0 +1,6 @@ +package com.datasrc.producer; + +public interface DataProducer { + + T produce(long seed); +} diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java new file mode 100755 index 00000000..d9cf933b --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java @@ -0,0 +1,26 @@ +package com.datasrc.producer; + +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service("dataProducerStringBlocked") +public class DataProducerStringBlocked implements DataProducer { + private static final Logger log = LoggerFactory.getLogger(DataProducerStringBlocked.class); + + @Override + public String produce(long seed) { + log.info("produce using seed:{}", seed); + sleep(); + return String.format("someDataStr:%s", seed); + } + + private void sleep() { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java new file mode 100755 index 00000000..b8250eda --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java @@ -0,0 +1,39 @@ +package com.datasrc.producer; + +import com.datasrc.model.StreamData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SynchronousSink; +import java.time.Duration; +import java.util.function.BiFunction; +import reactor.core.scheduler.Scheduler; + +@Service("dataProducerFlux") +public class DataProducerStringFlux implements DataProducer> { + private static final Logger log = LoggerFactory.getLogger(DataProducerStringFlux.class); + private final Scheduler timer; + + public DataProducerStringFlux(Scheduler timer) { + this.timer = timer; + } + + @Override + public Flux produce(long seed) { + log.info("produce using seed:{}", seed); + var stringSeed = "someDataStr:"; + var dataSeq = Flux.generate(() -> seed, + (BiFunction, Long>) (prev, sink) -> { + var newValue = prev + 1; + sink.next(newValue); + return newValue; + }) + .delayElements(Duration.ofSeconds(3), timer) + .map(val -> new StreamData(stringSeed + val)) + .doOnNext(val -> log.info("val:{}", val)); + + log.info("produce method finished"); + return dataSeq; + } +} diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/resources/application.yml b/2024-05/spring-41-kafka-webflux/source/src/main/resources/application.yml new file mode 100755 index 00000000..47fbb02d --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/resources/application.yml @@ -0,0 +1,2 @@ +server: + port: 8080 \ No newline at end of file diff --git a/2024-05/spring-41-kafka-webflux/source/src/main/resources/logback.xml b/2024-05/spring-41-kafka-webflux/source/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2024-05/spring-41-kafka-webflux/source/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + +