diff --git a/2023-09/spring-39-kafka/.gitignore b/2023-09/spring-39-kafka/.gitignore new file mode 100644 index 00000000..dae2cf99 --- /dev/null +++ b/2023-09/spring-39-kafka/.gitignore @@ -0,0 +1,39 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +!gradle-wrapper.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# Ignore Gradle project-specific cache directory +.gradle +/buildSrc/.gradle/ + +# Ignore Gradle build output directory +build +out + +#Idea +*.iml +*.iws +*.ipr +*.idea + diff --git a/2023-09/spring-39-kafka/consumer/pom.xml b/2023-09/spring-39-kafka/consumer/pom.xml new file mode 100644 index 00000000..5d7aa504 --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + + ru.otus + spring-39-kafka + 1.0 + + + consumer + 1.0 + + + 17 + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.kafka + spring-kafka + + + + com.fasterxml.jackson.core + jackson-databind + + + com.google.code.findbugs + jsr305 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/ConsumerApp.java b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/ConsumerApp.java new file mode 100644 index 00000000..a9e6aa58 --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/ConsumerApp.java @@ -0,0 +1,11 @@ +package ru.demo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ConsumerApp { + public static void main(String[] args) { + SpringApplication.run(ConsumerApp.class, args); + } +} diff --git a/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java new file mode 100644 index 00000000..dc63a5a8 --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java @@ -0,0 +1,108 @@ +package ru.demo.config; + +import static org.springframework.kafka.support.serializer.JsonDeserializer.TYPE_MAPPINGS; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.support.JacksonUtils; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import ru.demo.model.StringValue; +import ru.demo.service.StringValueConsumer; +import ru.demo.service.StringValueConsumerLogger; + +@Configuration +public class ApplicationConfig { + private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class); + public final String topicName; + + public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) { + this.topicName = topicName; + } + + @Bean + public ObjectMapper objectMapper() { + return JacksonUtils.enhancedObjectMapper(); + } + + @Bean + public ConsumerFactory consumerFactory( + KafkaProperties kafkaProperties, ObjectMapper mapper) { + var props = kafkaProperties.buildConsumerProperties(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(TYPE_MAPPINGS, "ru.demo.model.StringValue:ru.demo.model.StringValue"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3_000); + + var kafkaConsumerFactory = new DefaultKafkaConsumerFactory(props); + kafkaConsumerFactory.setValueDeserializer(new JsonDeserializer<>(mapper)); + return kafkaConsumerFactory; + } + + @Bean("listenerContainerFactory") + public KafkaListenerContainerFactory> + listenerContainerFactory(ConsumerFactory consumerFactory) { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setBatchListener(true); + factory.setConcurrency(1); + factory.getContainerProperties().setIdleBetweenPolls(1_000); + factory.getContainerProperties().setPollTimeout(1_000); + + var executor = new SimpleAsyncTaskExecutor("k-consumer-"); + executor.setConcurrencyLimit(10); + var listenerTaskExecutor = new ConcurrentTaskExecutor(executor); + factory.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor); + return factory; + } + + @Bean + public NewTopic topic() { + return TopicBuilder.name(topicName).partitions(1).replicas(1).build(); + } + + @Bean + public StringValueConsumer stringValueConsumerLogger() { + return new StringValueConsumerLogger(); + } + + @Bean + public KafkaClient stringValueConsumer(StringValueConsumer stringValueConsumer) { + return new KafkaClient(stringValueConsumer); + } + + public static class KafkaClient { + private final StringValueConsumer stringValueConsumer; + + public KafkaClient(StringValueConsumer stringValueConsumer) { + this.stringValueConsumer = stringValueConsumer; + } + + @KafkaListener( + topics = "${application.kafka.topic}", + containerFactory = "listenerContainerFactory") + public void listen(@Payload List values) { + log.info("values, values.size:{}", values.size()); + stringValueConsumer.accept(values); + } + } +} diff --git a/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/model/StringValue.java b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/model/StringValue.java new file mode 100644 index 00000000..213f1309 --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/model/StringValue.java @@ -0,0 +1,3 @@ +package ru.demo.model; + +public record StringValue(long id, String value) {} diff --git a/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumer.java b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumer.java new file mode 100644 index 00000000..ff5cd296 --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumer.java @@ -0,0 +1,9 @@ +package ru.demo.service; + +import java.util.List; +import ru.demo.model.StringValue; + +public interface StringValueConsumer { + + void accept(List value); +} diff --git a/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumerLogger.java b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumerLogger.java new file mode 100644 index 00000000..c791cbea --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/src/main/java/ru/demo/service/StringValueConsumerLogger.java @@ -0,0 +1,17 @@ +package ru.demo.service; + +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.demo.model.StringValue; + +public class StringValueConsumerLogger implements StringValueConsumer { + private static final Logger log = LoggerFactory.getLogger(StringValueConsumerLogger.class); + + @Override + public void accept(List values) { + for (var value : values) { + log.info("log:{}", value); + } + } +} diff --git a/2023-09/spring-39-kafka/consumer/src/main/resources/application.yml b/2023-09/spring-39-kafka/consumer/src/main/resources/application.yml new file mode 100644 index 00000000..32db801b --- /dev/null +++ b/2023-09/spring-39-kafka/consumer/src/main/resources/application.yml @@ -0,0 +1,12 @@ +application: + kafka: + topic: "demo-topic" + + +spring: + kafka: + consumer: + group-id: "test-group" + bootstrap-servers: "localhost:9092" + client-id: "demo-consumer" + auto-offset-reset: earliest diff --git a/2023-09/spring-39-kafka/docker/docker-compose.yml b/2023-09/spring-39-kafka/docker/docker-compose.yml new file mode 100644 index 00000000..2f5b20cf --- /dev/null +++ b/2023-09/spring-39-kafka/docker/docker-compose.yml @@ -0,0 +1,24 @@ +version: "3.9" +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.0.0 + container_name: broker + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/2023-09/spring-39-kafka/pom.xml b/2023-09/spring-39-kafka/pom.xml new file mode 100644 index 00000000..cab976ba --- /dev/null +++ b/2023-09/spring-39-kafka/pom.xml @@ -0,0 +1,92 @@ + + + 4.0.0 + + ru.otus + spring-39-kafka + 1.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.1.1 + + + pom + + + consumer + producer + + + + UTF-8 + 17 + 17 + 3.0.0-M3 + 3.1.1 + 3.3.9 + 1.0.12.RELEASE + 3.0.5 + 3.0.2 + + + + + + org.springframework.boot + spring-boot-dependencies + ${springframeworkBoot.version} + pom + import + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + + + + + + + + + + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + enforce-maven + + enforce + + + + + + ${minimal.maven.version} + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + + + diff --git a/2023-09/spring-39-kafka/producer/pom.xml b/2023-09/spring-39-kafka/producer/pom.xml new file mode 100644 index 00000000..9485620b --- /dev/null +++ b/2023-09/spring-39-kafka/producer/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + + ru.otus + spring-39-kafka + 1.0 + + + producer + 1.0 + + + 17 + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.kafka + spring-kafka + + + + com.fasterxml.jackson.core + jackson-databind + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/ProducerApp.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/ProducerApp.java new file mode 100644 index 00000000..e9d022bb --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/ProducerApp.java @@ -0,0 +1,11 @@ +package ru.demo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ProducerApp { + public static void main(String[] args) { + SpringApplication.run(ProducerApp.class, args); + } +} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java new file mode 100644 index 00000000..e0bc3346 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java @@ -0,0 +1,73 @@ +package ru.demo.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.JacksonUtils; +import org.springframework.kafka.support.serializer.JsonSerializer; +import ru.demo.model.StringValue; +import ru.demo.service.DataSender; +import ru.demo.service.DataSenderKafka; +import ru.demo.service.StringValueSource; + +@Configuration +public class ApplicationConfig { + private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class); + public final String topicName; + + public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) { + this.topicName = topicName; + } + + @Bean + public ObjectMapper objectMapper() { + return JacksonUtils.enhancedObjectMapper(); + } + + @Bean + public ProducerFactory producerFactory( + KafkaProperties kafkaProperties, ObjectMapper mapper) { + var props = kafkaProperties.buildProducerProperties(); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + var kafkaProducerFactory = new DefaultKafkaProducerFactory(props); + kafkaProducerFactory.setValueSerializer(new JsonSerializer<>(mapper)); + return kafkaProducerFactory; + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public NewTopic topic() { + return TopicBuilder.name(topicName).partitions(1).replicas(1).build(); + } + + @Bean + public DataSender dataSender(NewTopic topic, KafkaTemplate kafkaTemplate) { + return new DataSenderKafka( + topic.name(), + kafkaTemplate, + stringValue -> log.info("asked, value:{}", stringValue)); + } + + @Bean + public StringValueSource stringValueSource(DataSender dataSender) { + return new StringValueSource(dataSender); + } +} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/model/StringValue.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/model/StringValue.java new file mode 100644 index 00000000..213f1309 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/model/StringValue.java @@ -0,0 +1,3 @@ +package ru.demo.model; + +public record StringValue(long id, String value) {} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSender.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSender.java new file mode 100644 index 00000000..321e3f5f --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSender.java @@ -0,0 +1,7 @@ +package ru.demo.service; + +import ru.demo.model.StringValue; + +public interface DataSender { + void send(StringValue value); +} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSenderKafka.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSenderKafka.java new file mode 100644 index 00000000..a1b07f80 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/DataSenderKafka.java @@ -0,0 +1,48 @@ +package ru.demo.service; + +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import ru.demo.model.StringValue; + +public class DataSenderKafka implements DataSender { + private static final Logger log = LoggerFactory.getLogger(DataSenderKafka.class); + + private final KafkaTemplate template; + + private final Consumer sendAsk; + + private final String topic; + + public DataSenderKafka( + String topic, + KafkaTemplate template, + Consumer sendAsk) { + this.topic = topic; + this.template = template; + this.sendAsk = sendAsk; + } + + @Override + public void send(StringValue value) { + try { + log.info("value:{}", value); + template.send(topic, value) + .whenComplete( + (result, ex) -> { + if (ex == null) { + log.info( + "message id:{} was sent, offset:{}", + value.id(), + result.getRecordMetadata().offset()); + sendAsk.accept(value); + } else { + log.error("message id:{} was not sent", value.id(), ex); + } + }); + } catch (Exception ex) { + log.error("send error, value:{}", value, ex); + } + } +} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/Runner.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/Runner.java new file mode 100644 index 00000000..16c25602 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/Runner.java @@ -0,0 +1,18 @@ +package ru.demo.service; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Service; + +@Service +public class Runner implements CommandLineRunner { + private final ValueSource valueSource; + + public Runner(ValueSource valueSource) { + this.valueSource = valueSource; + } + + @Override + public void run(String... args) { + valueSource.generate(); + } +} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/StringValueSource.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/StringValueSource.java new file mode 100644 index 00000000..6554db75 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/StringValueSource.java @@ -0,0 +1,30 @@ +package ru.demo.service; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.demo.model.StringValue; + +public class StringValueSource implements ValueSource { + private static final Logger log = LoggerFactory.getLogger(StringValueSource.class); + private final AtomicLong nextValue = new AtomicLong(1); + private final DataSender valueConsumer; + + public StringValueSource(DataSender dataSender) { + this.valueConsumer = dataSender; + } + + @Override + public void generate() { + var executor = Executors.newScheduledThreadPool(1); + executor.scheduleAtFixedRate(() -> valueConsumer.send(makeValue()), 0, 1, TimeUnit.SECONDS); + log.info("generation started"); + } + + private StringValue makeValue() { + var id = nextValue.getAndIncrement(); + return new StringValue(id, "stVal:" + id); + } +} diff --git a/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/ValueSource.java b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/ValueSource.java new file mode 100644 index 00000000..1204f5e6 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/java/ru/demo/service/ValueSource.java @@ -0,0 +1,5 @@ +package ru.demo.service; + +public interface ValueSource { + void generate(); +} diff --git a/2023-09/spring-39-kafka/producer/src/main/resources/application.yml b/2023-09/spring-39-kafka/producer/src/main/resources/application.yml new file mode 100644 index 00000000..72fa1f46 --- /dev/null +++ b/2023-09/spring-39-kafka/producer/src/main/resources/application.yml @@ -0,0 +1,11 @@ +application: + kafka: + topic: "demo-topic" + +spring: + kafka: + producer: + bootstrap-servers: "127.0.0.1:9092" + client-id: "demo-producer" + +