diff --git a/2024-01/spring-39-kafka/.gitignore b/2024-01/spring-39-kafka/.gitignore
new file mode 100644
index 00000000..dae2cf99
--- /dev/null
+++ b/2024-01/spring-39-kafka/.gitignore
@@ -0,0 +1,39 @@
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+!gradle-wrapper.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# Ignore Gradle project-specific cache directory
+.gradle
+/buildSrc/.gradle/
+
+# Ignore Gradle build output directory
+build
+out
+
+#Idea
+*.iml
+*.iws
+*.ipr
+*.idea
+
diff --git a/2024-01/spring-39-kafka/consumer/pom.xml b/2024-01/spring-39-kafka/consumer/pom.xml
new file mode 100644
index 00000000..5d7aa504
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/pom.xml
@@ -0,0 +1,49 @@
+
+
+ 4.0.0
+
+
+ ru.otus
+ spring-39-kafka
+ 1.0
+
+
+ consumer
+ 1.0
+
+
+ 17
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/ConsumerApp.java b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/ConsumerApp.java
new file mode 100644
index 00000000..a9e6aa58
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/ConsumerApp.java
@@ -0,0 +1,11 @@
+package ru.demo;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ConsumerApp {
+ public static void main(String[] args) {
+ SpringApplication.run(ConsumerApp.class, args);
+ }
+}
diff --git a/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java
new file mode 100644
index 00000000..dc63a5a8
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java
@@ -0,0 +1,108 @@
+package ru.demo.config;
+
+import static org.springframework.kafka.support.serializer.JsonDeserializer.TYPE_MAPPINGS;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.support.JacksonUtils;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
+import ru.demo.model.StringValue;
+import ru.demo.service.StringValueConsumer;
+import ru.demo.service.StringValueConsumerLogger;
+
+@Configuration
+public class ApplicationConfig {
+ private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
+ public final String topicName;
+
+ public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) {
+ this.topicName = topicName;
+ }
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ return JacksonUtils.enhancedObjectMapper();
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory(
+ KafkaProperties kafkaProperties, ObjectMapper mapper) {
+ var props = kafkaProperties.buildConsumerProperties();
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ props.put(TYPE_MAPPINGS, "ru.demo.model.StringValue:ru.demo.model.StringValue");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3_000);
+
+ var kafkaConsumerFactory = new DefaultKafkaConsumerFactory(props);
+ kafkaConsumerFactory.setValueDeserializer(new JsonDeserializer<>(mapper));
+ return kafkaConsumerFactory;
+ }
+
+ @Bean("listenerContainerFactory")
+ public KafkaListenerContainerFactory>
+ listenerContainerFactory(ConsumerFactory consumerFactory) {
+ var factory = new ConcurrentKafkaListenerContainerFactory();
+ factory.setConsumerFactory(consumerFactory);
+ factory.setBatchListener(true);
+ factory.setConcurrency(1);
+ factory.getContainerProperties().setIdleBetweenPolls(1_000);
+ factory.getContainerProperties().setPollTimeout(1_000);
+
+ var executor = new SimpleAsyncTaskExecutor("k-consumer-");
+ executor.setConcurrencyLimit(10);
+ var listenerTaskExecutor = new ConcurrentTaskExecutor(executor);
+ factory.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor);
+ return factory;
+ }
+
+ @Bean
+ public NewTopic topic() {
+ return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
+ }
+
+ @Bean
+ public StringValueConsumer stringValueConsumerLogger() {
+ return new StringValueConsumerLogger();
+ }
+
+ @Bean
+ public KafkaClient stringValueConsumer(StringValueConsumer stringValueConsumer) {
+ return new KafkaClient(stringValueConsumer);
+ }
+
+ public static class KafkaClient {
+ private final StringValueConsumer stringValueConsumer;
+
+ public KafkaClient(StringValueConsumer stringValueConsumer) {
+ this.stringValueConsumer = stringValueConsumer;
+ }
+
+ @KafkaListener(
+ topics = "${application.kafka.topic}",
+ containerFactory = "listenerContainerFactory")
+ public void listen(@Payload List values) {
+ log.info("values, values.size:{}", values.size());
+ stringValueConsumer.accept(values);
+ }
+ }
+}
diff --git a/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/model/StringValue.java b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/model/StringValue.java
new file mode 100644
index 00000000..213f1309
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/model/StringValue.java
@@ -0,0 +1,3 @@
+package ru.demo.model;
+
+public record StringValue(long id, String value) {}
diff --git a/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumer.java b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumer.java
new file mode 100644
index 00000000..ff5cd296
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumer.java
@@ -0,0 +1,9 @@
+package ru.demo.service;
+
+import java.util.List;
+import ru.demo.model.StringValue;
+
+public interface StringValueConsumer {
+
+ void accept(List value);
+}
diff --git a/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumerLogger.java b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumerLogger.java
new file mode 100644
index 00000000..c791cbea
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumerLogger.java
@@ -0,0 +1,17 @@
+package ru.demo.service;
+
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.demo.model.StringValue;
+
+public class StringValueConsumerLogger implements StringValueConsumer {
+ private static final Logger log = LoggerFactory.getLogger(StringValueConsumerLogger.class);
+
+ @Override
+ public void accept(List values) {
+ for (var value : values) {
+ log.info("log:{}", value);
+ }
+ }
+}
diff --git a/2024-01/spring-39-kafka/consumer/src/main/resources/application.yml b/2024-01/spring-39-kafka/consumer/src/main/resources/application.yml
new file mode 100644
index 00000000..32db801b
--- /dev/null
+++ b/2024-01/spring-39-kafka/consumer/src/main/resources/application.yml
@@ -0,0 +1,12 @@
+application:
+ kafka:
+ topic: "demo-topic"
+
+
+spring:
+ kafka:
+ consumer:
+ group-id: "test-group"
+ bootstrap-servers: "localhost:9092"
+ client-id: "demo-consumer"
+ auto-offset-reset: earliest
diff --git a/2024-01/spring-39-kafka/docker/docker-compose.yml b/2024-01/spring-39-kafka/docker/docker-compose.yml
new file mode 100644
index 00000000..b16128f3
--- /dev/null
+++ b/2024-01/spring-39-kafka/docker/docker-compose.yml
@@ -0,0 +1,24 @@
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.2.0
+ container_name: zookeeper
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ broker:
+ image: confluentinc/cp-kafka:7.0.0
+ container_name: broker
+ ports:
+ - "9092:9092"
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
\ No newline at end of file
diff --git a/2024-01/spring-39-kafka/pom.xml b/2024-01/spring-39-kafka/pom.xml
new file mode 100644
index 00000000..cab976ba
--- /dev/null
+++ b/2024-01/spring-39-kafka/pom.xml
@@ -0,0 +1,92 @@
+
+
+ 4.0.0
+
+ ru.otus
+ spring-39-kafka
+ 1.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 3.1.1
+
+
+ pom
+
+
+ consumer
+ producer
+
+
+
+ UTF-8
+ 17
+ 17
+ 3.0.0-M3
+ 3.1.1
+ 3.3.9
+ 1.0.12.RELEASE
+ 3.0.5
+ 3.0.2
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${springframeworkBoot.version}
+ pom
+ import
+
+
+ com.google.code.findbugs
+ jsr305
+ ${jsr305.version}
+
+
+
+
+
+
+
+
+
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ enforce-maven
+
+ enforce
+
+
+
+
+
+ ${minimal.maven.version}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${maven-shade-plugin.version}
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+
+
+
+
diff --git a/2024-01/spring-39-kafka/producer/pom.xml b/2024-01/spring-39-kafka/producer/pom.xml
new file mode 100644
index 00000000..9485620b
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/pom.xml
@@ -0,0 +1,45 @@
+
+
+ 4.0.0
+
+
+ ru.otus
+ spring-39-kafka
+ 1.0
+
+
+ producer
+ 1.0
+
+
+ 17
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/ProducerApp.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/ProducerApp.java
new file mode 100644
index 00000000..e9d022bb
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/ProducerApp.java
@@ -0,0 +1,11 @@
+package ru.demo;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ProducerApp {
+ public static void main(String[] args) {
+ SpringApplication.run(ProducerApp.class, args);
+ }
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java
new file mode 100644
index 00000000..e0bc3346
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java
@@ -0,0 +1,73 @@
+package ru.demo.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.JacksonUtils;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import ru.demo.model.StringValue;
+import ru.demo.service.DataSender;
+import ru.demo.service.DataSenderKafka;
+import ru.demo.service.StringValueSource;
+
+@Configuration
+public class ApplicationConfig {
+ private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
+ public final String topicName;
+
+ public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) {
+ this.topicName = topicName;
+ }
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ return JacksonUtils.enhancedObjectMapper();
+ }
+
+ @Bean
+ public ProducerFactory producerFactory(
+ KafkaProperties kafkaProperties, ObjectMapper mapper) {
+ var props = kafkaProperties.buildProducerProperties();
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ var kafkaProducerFactory = new DefaultKafkaProducerFactory(props);
+ kafkaProducerFactory.setValueSerializer(new JsonSerializer<>(mapper));
+ return kafkaProducerFactory;
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate(
+ ProducerFactory producerFactory) {
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public NewTopic topic() {
+ return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
+ }
+
+ @Bean
+ public DataSender dataSender(NewTopic topic, KafkaTemplate kafkaTemplate) {
+ return new DataSenderKafka(
+ topic.name(),
+ kafkaTemplate,
+ stringValue -> log.info("asked, value:{}", stringValue));
+ }
+
+ @Bean
+ public StringValueSource stringValueSource(DataSender dataSender) {
+ return new StringValueSource(dataSender);
+ }
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/model/StringValue.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/model/StringValue.java
new file mode 100644
index 00000000..213f1309
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/model/StringValue.java
@@ -0,0 +1,3 @@
+package ru.demo.model;
+
+public record StringValue(long id, String value) {}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSender.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSender.java
new file mode 100644
index 00000000..321e3f5f
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSender.java
@@ -0,0 +1,7 @@
+package ru.demo.service;
+
+import ru.demo.model.StringValue;
+
+public interface DataSender {
+ void send(StringValue value);
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSenderKafka.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSenderKafka.java
new file mode 100644
index 00000000..a1b07f80
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSenderKafka.java
@@ -0,0 +1,48 @@
+package ru.demo.service;
+
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import ru.demo.model.StringValue;
+
+public class DataSenderKafka implements DataSender {
+ private static final Logger log = LoggerFactory.getLogger(DataSenderKafka.class);
+
+ private final KafkaTemplate template;
+
+ private final Consumer sendAsk;
+
+ private final String topic;
+
+ public DataSenderKafka(
+ String topic,
+ KafkaTemplate template,
+ Consumer sendAsk) {
+ this.topic = topic;
+ this.template = template;
+ this.sendAsk = sendAsk;
+ }
+
+ @Override
+ public void send(StringValue value) {
+ try {
+ log.info("value:{}", value);
+ template.send(topic, value)
+ .whenComplete(
+ (result, ex) -> {
+ if (ex == null) {
+ log.info(
+ "message id:{} was sent, offset:{}",
+ value.id(),
+ result.getRecordMetadata().offset());
+ sendAsk.accept(value);
+ } else {
+ log.error("message id:{} was not sent", value.id(), ex);
+ }
+ });
+ } catch (Exception ex) {
+ log.error("send error, value:{}", value, ex);
+ }
+ }
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/Runner.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/Runner.java
new file mode 100644
index 00000000..16c25602
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/Runner.java
@@ -0,0 +1,18 @@
+package ru.demo.service;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Service;
+
+@Service
+public class Runner implements CommandLineRunner {
+ private final ValueSource valueSource;
+
+ public Runner(ValueSource valueSource) {
+ this.valueSource = valueSource;
+ }
+
+ @Override
+ public void run(String... args) {
+ valueSource.generate();
+ }
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/StringValueSource.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/StringValueSource.java
new file mode 100644
index 00000000..6554db75
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/StringValueSource.java
@@ -0,0 +1,30 @@
+package ru.demo.service;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.demo.model.StringValue;
+
+public class StringValueSource implements ValueSource {
+ private static final Logger log = LoggerFactory.getLogger(StringValueSource.class);
+ private final AtomicLong nextValue = new AtomicLong(1);
+ private final DataSender valueConsumer;
+
+ public StringValueSource(DataSender dataSender) {
+ this.valueConsumer = dataSender;
+ }
+
+ @Override
+ public void generate() {
+ var executor = Executors.newScheduledThreadPool(1);
+ executor.scheduleAtFixedRate(() -> valueConsumer.send(makeValue()), 0, 1, TimeUnit.SECONDS);
+ log.info("generation started");
+ }
+
+ private StringValue makeValue() {
+ var id = nextValue.getAndIncrement();
+ return new StringValue(id, "stVal:" + id);
+ }
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/ValueSource.java b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/ValueSource.java
new file mode 100644
index 00000000..1204f5e6
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/java/ru/demo/service/ValueSource.java
@@ -0,0 +1,5 @@
+package ru.demo.service;
+
+public interface ValueSource {
+ void generate();
+}
diff --git a/2024-01/spring-39-kafka/producer/src/main/resources/application.yml b/2024-01/spring-39-kafka/producer/src/main/resources/application.yml
new file mode 100644
index 00000000..72fa1f46
--- /dev/null
+++ b/2024-01/spring-39-kafka/producer/src/main/resources/application.yml
@@ -0,0 +1,11 @@
+application:
+ kafka:
+ topic: "demo-topic"
+
+spring:
+ kafka:
+ producer:
+ bootstrap-servers: "127.0.0.1:9092"
+ client-id: "demo-producer"
+
+