diff --git a/2025-11/spring-39-kafka-webflux/.gitignore b/2025-11/spring-39-kafka-webflux/.gitignore new file mode 100755 index 00000000..d8762a8d --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/.gitignore @@ -0,0 +1,35 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +!gradle-wrapper.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# Ignore Gradle build output directory +build +out + +#Idea +*.iml +*.iws +*.ipr +*.idea + diff --git a/2025-11/spring-39-kafka-webflux/.mvn/wrapper/MavenWrapperDownloader.java b/2025-11/spring-39-kafka-webflux/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 00000000..e76d1f32 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/2025-11/spring-39-kafka-webflux/.mvn/wrapper/maven-wrapper.properties b/2025-11/spring-39-kafka-webflux/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 00000000..642d572c --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/2025-11/spring-39-kafka-webflux/client/HttpRequests.http b/2025-11/spring-39-kafka-webflux/client/HttpRequests.http new file mode 100755 index 00000000..3432cafa --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/HttpRequests.http @@ -0,0 +1,11 @@ +### +GET http://localhost:8082/data-mono/16 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8082/data/5 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache diff --git a/2025-11/spring-39-kafka-webflux/client/curlLoop.sh b/2025-11/spring-39-kafka-webflux/client/curlLoop.sh new file mode 100755 index 00000000..32d83a38 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/curlLoop.sh @@ -0,0 +1,5 @@ +date +for run in {1..100000}; do + curl -s "http://localhost:8082/data-mono/$run" > /dev/null +done +date \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/client/pom.xml b/2025-11/spring-39-kafka-webflux/client/pom.xml new file mode 100755 index 00000000..b5f78173 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + + ru.otus + spring-39-kafka-webflux + 1.0 + + + client + 1.0 + + + 21 + + + + + ru.otus + common + 1.0 + + + + org.springframework.boot + spring-boot-starter-webflux + + + + io.projectreactor.kafka + reactor-kafka + + + + org.springframework.boot + spring-boot-starter-test + test + + + + io.projectreactor + reactor-test + test + + + + com.github.tomakehurst + wiremock + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java new file mode 100755 index 00000000..341f9828 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/ClientData.java @@ -0,0 +1,12 @@ +package com.datasrc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ClientData { + + public static void main(String[] args) { + SpringApplication.run(ClientData.class, args); + } +} \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java new file mode 100755 index 00000000..1761ca74 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/ClientDataController.java @@ -0,0 +1,58 @@ +package com.datasrc; + + +import com.datasrc.config.ReactiveSender; +import com.datasrc.model.Request; +import com.datasrc.model.RequestId; +import com.datasrc.model.StreamData; +import com.datasrc.model.StringValue; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + + +// http://localhost:8082/data/5 +@RestController +public class ClientDataController { + private static final Logger log = LoggerFactory.getLogger(ClientDataController.class); + private final AtomicLong idGenerator = new AtomicLong(0); + + private final WebClient webClient; + private final ReactiveSender requestSender; + + private final StringValueStorage stringValueStorage; + + public ClientDataController(WebClient webClient, ReactiveSender requestSender, StringValueStorage stringValueStorage) { + this.webClient = webClient; + this.requestSender = requestSender; + this.stringValueStorage = stringValueStorage; + } + + @GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux data(@PathVariable("seed") long seed) { + log.info("request for data, seed:{}", seed); + + return webClient.get().uri(String.format("/data/%d", seed)) + .accept(MediaType.APPLICATION_NDJSON) + .retrieve() + .bodyToFlux(StreamData.class) + .doOnNext(val -> log.info("val:{}", val)); + } + + @GetMapping(value = "/data-mono/{seed}", produces = MediaType.APPLICATION_JSON_VALUE) + public Mono dataMono(@PathVariable("seed") long seed) { + log.info("request for string data-mono, seed:{}", seed); + var request = new Request(new RequestId(idGenerator.incrementAndGet()), seed); + + return requestSender.send(request, requestSend -> log.info("send ok: {}", requestSend)) + .flatMap(v -> stringValueStorage.get(new RequestId(v.correlationMetadata().id()))) + .doOnNext(stringValue -> log.info("value for client:{}", stringValue)); + } +} diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java new file mode 100755 index 00000000..8ffdd2a9 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/StringValueStorage.java @@ -0,0 +1,47 @@ +package com.datasrc; + +import com.datasrc.model.RequestId; +import com.datasrc.model.StringValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.util.annotation.NonNull; + +public class StringValueStorage implements Sinks.EmitFailureHandler { + + private final Sinks.Many sink; + private final ConnectableFlux sinkConnectable; + private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class); + + public StringValueStorage() { + sink = Sinks.many().multicast().onBackpressureBuffer(); + sinkConnectable = sink.asFlux().publish(); + sinkConnectable.connect(); + } + + public void put(RequestId requestId, StringValue value) { + log.info("put. requestId:{}, value:{}", requestId, value); + sink.emitNext(new ResponseData(requestId, value), this); + } + + public Mono get(RequestId requestId) { + return Mono.from(sinkConnectable + .filter(responseData -> { + log.info("waiting:{}, fact:{}", requestId, responseData.requestId); + return responseData.requestId.id() == requestId.id(); + }) + .map(responseData -> responseData.stringValue)); + } + + + @Override + public boolean onEmitFailure(@NonNull SignalType signalType, @NonNull Sinks.EmitResult emitResult) { + return false; + } + + private record ResponseData(RequestId requestId, StringValue stringValue) { + } +} diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java new file mode 100755 index 00000000..70edb4aa --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/java/com/datasrc/config/ApplConfig.java @@ -0,0 +1,117 @@ +package com.datasrc.config; + +import com.datasrc.StringValueStorage; +import com.datasrc.model.Request; +import com.datasrc.model.Response; +import io.netty.channel.nio.NioEventLoopGroup; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ReactorResourceFactory; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.NonNull; + +@Configuration +@SuppressWarnings("java:S2095") +public class ApplConfig { + private static final int THREAD_POOL_SIZE = 4; + private static final int RESPONSE_RECEIVER_POOL_SIZE = 1; + private static final int KAFKA_POOL_SIZE = 1; + + @Bean(name ="serverThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup serverThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) { + var factory = new NettyReactiveWebServerFactory(); + factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop)); + return factory; + } + + @Bean(name ="clientThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup clientThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "client-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactorResourceFactory reactorResourceFactory(@Qualifier("clientThreadEventLoop") NioEventLoopGroup clientThreadEventLoop) { + var resourceFactory = new org.springframework.http.client.ReactorResourceFactory(); + resourceFactory.setLoopResources(b -> clientThreadEventLoop); + resourceFactory.setUseGlobalResources(false); + return resourceFactory; + } + + @Bean + public ReactorClientHttpConnector reactorClientHttpConnector(ReactorResourceFactory resourceFactory) { + return new ReactorClientHttpConnector(resourceFactory, mapper -> mapper); + } + + @Bean("responseReceiverScheduler") + public Scheduler responseReceiverScheduler() { + return Schedulers.newParallel("response-receiver", RESPONSE_RECEIVER_POOL_SIZE); + } + + @Bean("kafkaScheduler") + public Scheduler kafkaScheduler() { + return Schedulers.newParallel("kafka-scheduler", KAFKA_POOL_SIZE); + } + + @Bean + public WebClient webClient(WebClient.Builder builder, + @Value("${application.processor.url}") String url) { + return builder + .baseUrl(url) + .build(); + } + + @Bean(destroyMethod = "close") + public ReactiveSender requestSender(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-request}") String topicRequest, + @Qualifier("kafkaScheduler") Scheduler kafkaScheduler + ) { + return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest); + } + + @Bean + public StringValueStorage stringValueStorage() { + return new StringValueStorage(); + } + + @Bean(destroyMethod = "close") + public ReactiveReceiver responseReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-response}") String topicResponse, + @Value("${application.kafka-group-id}") String groupId, + @Qualifier("responseReceiverScheduler") Scheduler responseReceiverScheduler, + StringValueStorage stringValueStorage) { + return new ReactiveReceiver<>(bootstrapServers, Response.class, topicResponse, responseReceiverScheduler, groupId, + response -> stringValueStorage.put(response.data().requestId(), response.data())); + } + +} diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/resources/application.yml b/2025-11/spring-39-kafka-webflux/client/src/main/resources/application.yml new file mode 100755 index 00000000..29b56723 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 8082 + +application: + processor: + url: http://localhost:8081 + kafka-bootstrap-servers: localhost:9092 + kafka-group-id: clientConsumerGroup + topic-request: request + topic-response: response diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/resources/logback.xml b/2025-11/spring-39-kafka-webflux/client/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/resources/static/index.html b/2025-11/spring-39-kafka-webflux/client/src/main/resources/static/index.html new file mode 100755 index 00000000..d42e4d95 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/resources/static/index.html @@ -0,0 +1,16 @@ + + + + + + + + Stream Demo + + +

Stream Demo

+
+ + + + \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/client/src/main/resources/static/webclient.js b/2025-11/spring-39-kafka-webflux/client/src/main/resources/static/webclient.js new file mode 100755 index 00000000..9287c9b8 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/client/src/main/resources/static/webclient.js @@ -0,0 +1,24 @@ +const streamErr = e => { + console.warn("error"); + console.warn(e); +} + +fetch("http://localhost:8082/data/5").then((response) => { + return can.ndjsonStream(response.body); +}).then(dataStream => { + const reader = dataStream.getReader(); + const read = result => { + if (result.done) { + return; + } + render(result.value); + reader.read().then(read, streamErr); + } + reader.read().then(read, streamErr); +}); + +const render = value => { + const div = document.createElement('div'); + div.append(new Date() + ' stringValue:', JSON.stringify(value)); + document.getElementById('dataBlock').append(div); +}; \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/common/HttpRequests.http b/2025-11/spring-39-kafka-webflux/common/HttpRequests.http new file mode 100755 index 00000000..83afa6d0 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/HttpRequests.http @@ -0,0 +1,11 @@ +### +GET http://localhost:8082/data-mono/13 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8082/data/5 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache diff --git a/2025-11/spring-39-kafka-webflux/common/curlLoop.sh b/2025-11/spring-39-kafka-webflux/common/curlLoop.sh new file mode 100755 index 00000000..32d83a38 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/curlLoop.sh @@ -0,0 +1,5 @@ +date +for run in {1..100000}; do + curl -s "http://localhost:8082/data-mono/$run" > /dev/null +done +date \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/common/pom.xml b/2025-11/spring-39-kafka-webflux/common/pom.xml new file mode 100755 index 00000000..0091a0e8 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + ru.otus + spring-39-kafka-webflux + 1.0 + + + common + 1.0 + + + 21 + + + + + ch.qos.logback + logback-classic + provided + + + + io.projectreactor.kafka + reactor-kafka + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java new file mode 100755 index 00000000..1df2c495 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ConsumerException.java @@ -0,0 +1,7 @@ +package com.datasrc.config; + +public class ConsumerException extends RuntimeException { + public ConsumerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java new file mode 100755 index 00000000..fa76ab49 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/JsonDeserializer.java @@ -0,0 +1,42 @@ +package com.datasrc.config; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +public class JsonDeserializer implements Deserializer { + public static final String OBJECT_MAPPER = "objectMapper"; + public static final String TYPE_REFERENCE = "typeReference"; + private final String encoding = StandardCharsets.UTF_8.name(); + private ObjectMapper mapper; + private JavaType javaType; + + @Override + public void configure(Map configs, boolean isKey) { + mapper = (ObjectMapper) configs.get(OBJECT_MAPPER); + if (mapper == null) { + throw new IllegalArgumentException("config property OBJECT_MAPPER was not set"); + } + javaType = (JavaType) configs.get(TYPE_REFERENCE); + if (javaType == null) { + throw new IllegalArgumentException("config property TYPE_REFERENCE was not set"); + } + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + if (data == null) { + return null; + } else { + var valueAsString = new String(data, encoding); + return mapper.readValue(valueAsString, javaType); + } + } catch (Exception e) { + throw new SerializationException("Error when deserializing byte[] to StringValue", e); + } + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java new file mode 100755 index 00000000..84b48d44 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/JsonSerializer.java @@ -0,0 +1,34 @@ +package com.datasrc.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +public class JsonSerializer implements Serializer { + public static final String OBJECT_MAPPER = "objectMapper"; + private final String encoding = StandardCharsets.UTF_8.name(); + private ObjectMapper mapper; + + @Override + public void configure(Map configs, boolean isKey) { + mapper = (ObjectMapper) configs.get(OBJECT_MAPPER); + if (mapper == null) { + throw new IllegalArgumentException("config property OBJECT_MAPPER was not set"); + } + } + + @Override + public byte[] serialize(String topic, T data) { + try { + if (data == null) { + return new byte[]{}; + } else { + return mapper.writeValueAsString(data).getBytes(encoding); + } + } catch (Exception e) { + throw new SerializationException("Error when serializing StringValue to byte[] ", e); + } + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java new file mode 100755 index 00000000..02cd1d35 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveReceiver.java @@ -0,0 +1,127 @@ +package com.datasrc.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.InetAddress; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.CancellationException; +import java.util.function.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.scheduler.Scheduler; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.util.retry.Retry; + +import static com.datasrc.config.JsonDeserializer.OBJECT_MAPPER; +import static com.datasrc.config.JsonDeserializer.TYPE_REFERENCE; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +public class ReactiveReceiver { + private static final Logger log = LoggerFactory.getLogger(ReactiveReceiver.class); + private final Random random = new Random(); + + private final Disposable kafkaSubscriber; + private final Disposable kafkaConnection; + + public static final int MAX_POLL_INTERVAL_MS = 1_000; + + public ReactiveReceiver( + String bootstrapServers, Class valueClass, String topicName, Scheduler schedulerValueReceiver, String groupId, Consumer valueConsumer + ) { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(GROUP_ID_CONFIG, groupId); + props.put(GROUP_INSTANCE_ID_CONFIG, makeGroupInstanceIdConfig(groupId)); + props.put(ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); + props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + + var objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + props.put(OBJECT_MAPPER, objectMapper); + props.put(TYPE_REFERENCE, objectMapper.getTypeFactory().constructType(valueClass)); + + props.put(MAX_POLL_RECORDS_CONFIG, 3); + props.put(MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL_MS); + + ReceiverOptions receiverOptions = + ReceiverOptions.create(props) + .pollTimeout(Duration.ofSeconds(500)) + .schedulerSupplier(() -> schedulerValueReceiver) + .subscription(Collections.singleton(topicName)); + + Flux> inboundFlux = KafkaReceiver.create(receiverOptions) + .receiveAutoAck() + .concatMap( + consumerRecordFlux -> { + log.info("consumerRecordFlux done, commit"); + return consumerRecordFlux; + }) + .retryWhen(Retry.backoff(3, Duration.of(5, ChronoUnit.SECONDS))); + + Hooks.onErrorDropped( + error -> { + if (error instanceof CancellationException) { + log.info("Cancellation event:", error); + } else { + log.error("error:", error); + } + }); + + var kafkaFlow = inboundFlux.doOnCancel(() -> log.info("connection canceled")) + .doOnError(error -> log.error("Consuming error", error)) + .publish(); + + log.info("start consuming"); + kafkaConnection = kafkaFlow.connect(); + kafkaSubscriber = + kafkaFlow.subscribe( + receiverRecord -> { + var key = receiverRecord.key(); + var value = receiverRecord.value(); + log.info("key:{}, value:{}, record:{}", key, value, receiverRecord); + valueConsumer.accept(value); + }); + } + + public void close() { + log.info("stop consuming"); + kafkaSubscriber.dispose(); + kafkaConnection.dispose(); + } + + private String makeGroupInstanceIdConfig(String groupId) { + try { + var hostName = InetAddress.getLocalHost().getHostName(); + return String.join( + "-", + groupId, + hostName, + String.valueOf(random.nextInt(100_999_999))); + } catch (Exception ex) { + throw new ConsumerException("can't make GroupInstanceIdConfig", ex); + } + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java new file mode 100755 index 00000000..362708dd --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/config/ReactiveSender.java @@ -0,0 +1,92 @@ +package com.datasrc.config; + +import static com.datasrc.config.JsonSerializer.OBJECT_MAPPER; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.RETRIES_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +import com.datasrc.model.DataForSending; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Properties; +import java.util.function.Consumer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; +import reactor.kafka.sender.SenderResult; + +public class ReactiveSender> { + private static final Logger log = LoggerFactory.getLogger(ReactiveSender.class); + private final KafkaSender sender; + private final String topicName; + + public ReactiveSender(String bootstrapServers, Scheduler schedulerKafka, String topicName) { + this.topicName = topicName; + var props = new Properties(); + props.put(CLIENT_ID_CONFIG, "KafkaReactiveSender"); + props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ACKS_CONFIG, "1"); + props.put(RETRIES_CONFIG, 1); + props.put(BATCH_SIZE_CONFIG, 16384); + props.put(LINGER_MS_CONFIG, 10); + props.put(BUFFER_MEMORY_CONFIG, 26384); // bytes + props.put(MAX_BLOCK_MS_CONFIG, 1_000); // ms + props.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + var objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + props.put(OBJECT_MAPPER, objectMapper); + + SenderOptions senderOptions = + SenderOptions.create(props) + .maxInFlight(10) + .scheduler(schedulerKafka); + + sender = KafkaSender.create(senderOptions); + + var shutdownHook = + new Thread( + () -> { + log.info("closing kafka sender"); + sender.close(); + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + public Mono> send(T data, Consumer sendAsk) { + return sender.send(Mono.just(SenderRecord.create( + topicName, + null, + null, + data.id(), + data, + data))) + .doOnError(error -> log.error("Send failed", error)) + .doOnNext( + senderResult -> { + log.info( + "message id:{} was sent, offset:{}", + senderResult.correlationMetadata().id(), + senderResult.recordMetadata().offset()); + sendAsk.accept(senderResult.correlationMetadata()); + }).next(); + } + + public void close() { + sender.close(); + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java new file mode 100755 index 00000000..6aa91026 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/DataForSending.java @@ -0,0 +1,7 @@ +package com.datasrc.model; + +public interface DataForSending { + long id(); + + T data(); +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java new file mode 100755 index 00000000..00623e66 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/Request.java @@ -0,0 +1,34 @@ +package com.datasrc.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Request implements DataForSending { + + private final RequestId id; + private final long seed; + + @JsonCreator + public Request(@JsonProperty("id") RequestId id, @JsonProperty("seed") long seed) { + this.id = id; + this.seed = seed; + } + + @Override + public long id() { + return id.id(); + } + + @Override + public Long data() { + return seed; + } + + @Override + public String toString() { + return "Request{" + + "id=" + id + + ", seed=" + seed + + '}'; + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java new file mode 100755 index 00000000..5a602848 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/RequestId.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record RequestId(long id) { +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java new file mode 100755 index 00000000..a224a08d --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/Response.java @@ -0,0 +1,33 @@ +package com.datasrc.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Response implements DataForSending { + private final ResponseId id; + private final StringValue stringValue; + + @JsonCreator + public Response(@JsonProperty("id") ResponseId id, @JsonProperty("stringValue") StringValue stringValue) { + this.id = id; + this.stringValue = stringValue; + } + + @Override + public long id() { + return id.id(); + } + + @Override + public StringValue data() { + return stringValue; + } + + @Override + public String toString() { + return "Response{" + + "id=" + id + + ", stringValue=" + stringValue + + '}'; + } +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java new file mode 100755 index 00000000..bf7d7d73 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/ResponseId.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record ResponseId(long id) { +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java new file mode 100755 index 00000000..cc199ad4 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/StreamData.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record StreamData(String value) { +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java new file mode 100755 index 00000000..43568176 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/java/com/datasrc/model/StringValue.java @@ -0,0 +1,4 @@ +package com.datasrc.model; + +public record StringValue(RequestId requestId, String value) { +} diff --git a/2025-11/spring-39-kafka-webflux/common/src/main/resources/logback.xml b/2025-11/spring-39-kafka-webflux/common/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/common/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/2025-11/spring-39-kafka-webflux/docker/docker-compose.yml b/2025-11/spring-39-kafka-webflux/docker/docker-compose.yml new file mode 100755 index 00000000..c264d84e --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/docker/docker-compose.yml @@ -0,0 +1,23 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.0.0 + container_name: broker + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/mvnw b/2025-11/spring-39-kafka-webflux/mvnw new file mode 100755 index 00000000..a16b5431 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/2025-11/spring-39-kafka-webflux/mvnw.cmd b/2025-11/spring-39-kafka-webflux/mvnw.cmd new file mode 100644 index 00000000..c8d43372 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/2025-11/spring-39-kafka-webflux/pom.xml b/2025-11/spring-39-kafka-webflux/pom.xml new file mode 100755 index 00000000..e24e7f10 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + ru.otus + spring-39-kafka-webflux + 1.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.4.3 + + + pom + + + client + processor + source + common + + + + UTF-8 + 21 + 21 + 3.0.0-M3 + 3.1.1 + 3.3.9 + 1.0.12.RELEASE + 3.4.3 + 3.0.2 + 3.0.0-beta-10 + 1.3.23 + + + + + + org.springframework.boot + spring-boot-dependencies + ${springframeworkBoot.version} + pom + import + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + + + io.projectreactor.kafka + reactor-kafka + ${reactorKafka.version} + + + com.github.tomakehurst + wiremock + ${wiremock.version} + + + + + + + + + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + enforce-maven + + enforce + + + + + + ${minimal.maven.version} + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + + + diff --git a/2025-11/spring-39-kafka-webflux/processor/pom.xml b/2025-11/spring-39-kafka-webflux/processor/pom.xml new file mode 100755 index 00000000..d266d2df --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + + + ru.otus + spring-39-kafka-webflux + 1.0 + + + processor + 1.0 + + + 21 + + + + + ru.otus + common + 1.0 + + + + org.springframework.boot + spring-boot-starter-webflux + + + + io.projectreactor.kafka + reactor-kafka + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java new file mode 100755 index 00000000..6d911292 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorData.java @@ -0,0 +1,12 @@ +package com.datasrc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ProcessorData { + + public static void main(String[] args) { + SpringApplication.run(ProcessorData.class, args); + } +} \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java new file mode 100755 index 00000000..0b89514f --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/ProcessorDataController.java @@ -0,0 +1,40 @@ +package com.datasrc; + + +import com.datasrc.model.StreamData; +import com.datasrc.processor.DataProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +@RestController +public class ProcessorDataController { + private static final Logger log = LoggerFactory.getLogger(ProcessorDataController.class); + + private final DataProcessor> dataProcessorStringReactorFlux; + private final WebClient client; + + public ProcessorDataController(WebClient client, + @Qualifier("dataProcessorFlux") DataProcessor> dataProcessorFlux) { + this.dataProcessorStringReactorFlux = dataProcessorFlux; + this.client = client; + } + + @GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux data(@PathVariable("seed") long seed) { + log.info("request for data, seed:{}", seed); + + var srcRequest = client.get().uri(String.format("/data/%d", seed)) + .accept(MediaType.APPLICATION_NDJSON) + .retrieve() + .bodyToFlux(StreamData.class); + + return dataProcessorStringReactorFlux.process(srcRequest); + } +} diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java new file mode 100755 index 00000000..d091bfc7 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/config/ApplConfig.java @@ -0,0 +1,136 @@ +package com.datasrc.config; + +import com.datasrc.model.Request; +import com.datasrc.model.RequestId; +import com.datasrc.model.Response; +import com.datasrc.model.ResponseId; +import com.datasrc.model.StringValue; +import com.datasrc.processor.DataProcessor; +import io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.client.ReactorResourceFactory; + + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.NonNull; + +@Configuration +@SuppressWarnings("java:S2095") +public class ApplConfig { + private static final Logger log = LoggerFactory.getLogger(ApplConfig.class); + private static final int THREAD_POOL_SIZE = 2; + private static final int REQUEST_RECEIVER_POOL_SIZE = 1; + private static final int KAFKA_POOL_SIZE = 1; + + private final AtomicLong responseIdGenerator = new AtomicLong(0); + + @Bean(name ="serverThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup serverThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) { + var factory = new NettyReactiveWebServerFactory(); + factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop)); + return factory; + } + + @Bean(name ="clientThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup clientThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "client-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactorResourceFactory reactorResourceFactory(@Qualifier("clientThreadEventLoop") NioEventLoopGroup clientThreadEventLoop) { + var resourceFactory = new ReactorResourceFactory(); + resourceFactory.setLoopResources(b -> clientThreadEventLoop); + resourceFactory.setUseGlobalResources(false); + return resourceFactory; + } + + @Bean + public ReactorClientHttpConnector reactorClientHttpConnector(ReactorResourceFactory resourceFactory) { + return new ReactorClientHttpConnector(resourceFactory, mapper -> mapper); + } + + @Bean + public Scheduler timer() { + return Schedulers.newParallel("processor-thread", 2); + } + + + @Bean("requestReceiverScheduler") + public Scheduler requestReceiverScheduler() { + return Schedulers.newParallel("request-receiver", REQUEST_RECEIVER_POOL_SIZE); + } + + @Bean("kafkaScheduler") + public Scheduler kafkaScheduler() { + return Schedulers.newParallel("kafka-scheduler", KAFKA_POOL_SIZE); + } + + @Bean + public WebClient webClient(WebClient.Builder builder, + @Value("${application.source.url}") String url) { + return builder + .baseUrl(url) + .build(); + } + + @Bean(destroyMethod = "close") + public ReactiveSender responseSender(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-response}") String topicResponse, + @Qualifier("kafkaScheduler") Scheduler kafkaScheduler + ) { + return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicResponse); + } + + @Bean(destroyMethod = "close") + public ReactiveReceiver requestReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers, + @Value("${application.topic-request}") String topicRequest, + @Value("${application.kafka-group-id}") String groupId, + @Qualifier("requestReceiverScheduler") Scheduler responseReceiverScheduler, + ReactiveSender responseSender, + @Qualifier("dataProcessorMono") DataProcessor dataProcessor, + WebClient webClient) { + + return new ReactiveReceiver<>(bootstrapServers, Request.class, topicRequest, responseReceiverScheduler, groupId, + request -> webClient.get().uri(String.format("/data-mono/%d", request.data())) + .retrieve() + .bodyToMono(String.class) + .map(dataProcessor::process) + .flatMap(stringValue -> + responseSender.send(new Response(new ResponseId(responseIdGenerator.incrementAndGet()), + new StringValue(new RequestId(request.id()), stringValue)), + stringValueDataForSending -> log.info("response send:{}", stringValueDataForSending))) + .subscribe()); + } +} diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java new file mode 100755 index 00000000..55b9c4cd --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessor.java @@ -0,0 +1,6 @@ +package com.datasrc.processor; + +public interface DataProcessor { + + T process(T data); +} diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java new file mode 100755 index 00000000..c0a5155a --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorFlux.java @@ -0,0 +1,33 @@ +package com.datasrc.processor; + +import com.datasrc.model.StreamData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; + +import java.time.Duration; + +@Service("dataProcessorFlux") +public class DataProcessorStringReactorFlux implements DataProcessor> { + private static final Logger log = LoggerFactory.getLogger(DataProcessorStringReactorFlux.class); + private final Scheduler timer; + + public DataProcessorStringReactorFlux(Scheduler timer) { + this.timer = timer; + } + + @Override + public Flux process(Flux dataflow) { + log.info("processor"); + var dataSeq = dataflow + .doOnNext(val -> log.info("in val:{}", val)) + .delayElements(Duration.ofSeconds(5), timer) + .map(data -> new StreamData(data.value().toUpperCase())) + .doOnNext(val -> log.info("out val:{}", val)); + + log.info("processor method finished"); + return dataSeq; + } +} diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java new file mode 100755 index 00000000..f102c368 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/java/com/datasrc/processor/DataProcessorStringReactorMono.java @@ -0,0 +1,16 @@ +package com.datasrc.processor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service("dataProcessorMono") +public class DataProcessorStringReactorMono implements DataProcessor { + private static final Logger log = LoggerFactory.getLogger(DataProcessorStringReactorMono.class); + + @Override + public String process(String value) { + log.info("processor"); + return value.toUpperCase(); + } +} diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/resources/application.yml b/2025-11/spring-39-kafka-webflux/processor/src/main/resources/application.yml new file mode 100755 index 00000000..ffa5de55 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 8081 + +application: + source: + url: http://localhost:8080 + kafka-bootstrap-servers: localhost:9092 + kafka-group-id: processorConsumerGroup + topic-request: request + topic-response: response diff --git a/2025-11/spring-39-kafka-webflux/processor/src/main/resources/logback.xml b/2025-11/spring-39-kafka-webflux/processor/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/processor/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/2025-11/spring-39-kafka-webflux/source/HttpRequests.http b/2025-11/spring-39-kafka-webflux/source/HttpRequests.http new file mode 100755 index 00000000..be988df3 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/HttpRequests.http @@ -0,0 +1,12 @@ +### +GET http://localhost:8080/data-mono/13 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + + +### +GET http://localhost:8080/data/5 +Accept: */* +Content-Type: application/json +Cache-Control: no-cache \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/source/pom.xml b/2025-11/spring-39-kafka-webflux/source/pom.xml new file mode 100755 index 00000000..9e828243 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + + ru.otus + spring-39-kafka-webflux + 1.0 + + + source + 1.0 + + + 21 + + + + + ru.otus + common + 1.0 + + + + org.springframework.boot + spring-boot-starter-webflux + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java new file mode 100755 index 00000000..a321fbde --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/SourceData.java @@ -0,0 +1,12 @@ +package com.datasrc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SourceData { + + public static void main(String[] args) { + SpringApplication.run(SourceData.class, args); + } +} \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java new file mode 100755 index 00000000..feb4291a --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/SourceDataController.java @@ -0,0 +1,54 @@ +package com.datasrc; + + +import com.datasrc.model.StreamData; +import com.datasrc.producer.DataProducer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +public class SourceDataController { + private static final Logger log = LoggerFactory.getLogger(SourceDataController.class); + + private final DataProducer> dataProducerFlux; + private final DataProducer dataProducerStringBlocked; + + private final Executor blockingExecutor; + + public SourceDataController(@Qualifier("dataProducerFlux") DataProducer> dataProducerFlux, + @Qualifier("dataProducerStringBlocked") DataProducer dataProducerStringBlocked, + @Qualifier("blockingExecutor") Executor blockingExecutor) { + this.dataProducerFlux = dataProducerFlux; + this.dataProducerStringBlocked = dataProducerStringBlocked; + this.blockingExecutor = blockingExecutor; + } + + @GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux data(@PathVariable("seed") long seed) { + log.info("request for string data, seed:{}", seed); + var stringData = dataProducerFlux.produce(seed); + + log.info("Method request for string data done"); + return stringData; + } + + @GetMapping(value = "/data-mono/{seed}", produces = MediaType.APPLICATION_JSON_VALUE) + public Mono dataMono(@PathVariable("seed") long seed) { + log.info("request for string data-mono, seed:{}", seed); + + var future = CompletableFuture + .supplyAsync(() -> dataProducerStringBlocked.produce(seed), blockingExecutor); + var mono = Mono.fromFuture(future); + log.info("Method request for string data done"); + return mono; + } +} diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java new file mode 100755 index 00000000..7186ef9a --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/config/ApplConfig.java @@ -0,0 +1,56 @@ +package com.datasrc.config; + +import io.netty.channel.nio.NioEventLoopGroup; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.NonNull; + +@Configuration +@SuppressWarnings("java:S2095") +public class ApplConfig { + private static final int THREAD_POOL_SIZE = 2; + private static final int BLOCKING_THREAD_POOL_SIZE = 2; + + @Bean(name ="serverThreadEventLoop", destroyMethod = "close") + public NioEventLoopGroup serverThreadEventLoop() { + return new NioEventLoopGroup(THREAD_POOL_SIZE, + new ThreadFactory() { + private final AtomicLong threadIdGenerator = new AtomicLong(0); + @Override + public Thread newThread(@NonNull Runnable task) { + return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet()); + } + }); + } + + @Bean + public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) { + var factory = new NettyReactiveWebServerFactory(); + factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop)); + return factory; + } + + @Bean(name= "blockingExecutor", destroyMethod = "close") + public ExecutorService blockingExecutor() { + var id = new AtomicLong(0); + return Executors.newFixedThreadPool(BLOCKING_THREAD_POOL_SIZE, + task -> new Thread(task, String.format("blocking-thread-%d", id.incrementAndGet()))); + } + + @Bean + public Scheduler timer() { + return Schedulers.newParallel("processor-thread", 2); + } +} diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java new file mode 100755 index 00000000..e205bb5e --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducer.java @@ -0,0 +1,6 @@ +package com.datasrc.producer; + +public interface DataProducer { + + T produce(long seed); +} diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java new file mode 100755 index 00000000..d9cf933b --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringBlocked.java @@ -0,0 +1,26 @@ +package com.datasrc.producer; + +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service("dataProducerStringBlocked") +public class DataProducerStringBlocked implements DataProducer { + private static final Logger log = LoggerFactory.getLogger(DataProducerStringBlocked.class); + + @Override + public String produce(long seed) { + log.info("produce using seed:{}", seed); + sleep(); + return String.format("someDataStr:%s", seed); + } + + private void sleep() { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java new file mode 100755 index 00000000..b8250eda --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/java/com/datasrc/producer/DataProducerStringFlux.java @@ -0,0 +1,39 @@ +package com.datasrc.producer; + +import com.datasrc.model.StreamData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SynchronousSink; +import java.time.Duration; +import java.util.function.BiFunction; +import reactor.core.scheduler.Scheduler; + +@Service("dataProducerFlux") +public class DataProducerStringFlux implements DataProducer> { + private static final Logger log = LoggerFactory.getLogger(DataProducerStringFlux.class); + private final Scheduler timer; + + public DataProducerStringFlux(Scheduler timer) { + this.timer = timer; + } + + @Override + public Flux produce(long seed) { + log.info("produce using seed:{}", seed); + var stringSeed = "someDataStr:"; + var dataSeq = Flux.generate(() -> seed, + (BiFunction, Long>) (prev, sink) -> { + var newValue = prev + 1; + sink.next(newValue); + return newValue; + }) + .delayElements(Duration.ofSeconds(3), timer) + .map(val -> new StreamData(stringSeed + val)) + .doOnNext(val -> log.info("val:{}", val)); + + log.info("produce method finished"); + return dataSeq; + } +} diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/resources/application.yml b/2025-11/spring-39-kafka-webflux/source/src/main/resources/application.yml new file mode 100755 index 00000000..47fbb02d --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/resources/application.yml @@ -0,0 +1,2 @@ +server: + port: 8080 \ No newline at end of file diff --git a/2025-11/spring-39-kafka-webflux/source/src/main/resources/logback.xml b/2025-11/spring-39-kafka-webflux/source/src/main/resources/logback.xml new file mode 100755 index 00000000..b1f9bfe2 --- /dev/null +++ b/2025-11/spring-39-kafka-webflux/source/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + +