l39 spring

This commit is contained in:
petrelevich
2024-03-04 13:04:59 +03:00
parent 7bce9b5c5d
commit 19dafbb503
20 changed files with 615 additions and 0 deletions
+39
View File
@@ -0,0 +1,39 @@
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
!gradle-wrapper.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# Ignore Gradle project-specific cache directory
.gradle
/buildSrc/.gradle/
# Ignore Gradle build output directory
build
out
#Idea
*.iml
*.iws
*.ipr
*.idea
+49
View File
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus</groupId>
<artifactId>spring-39-kafka</artifactId>
<version>1.0</version>
</parent>
<artifactId>consumer</artifactId>
<version>1.0</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,11 @@
package ru.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApp {
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
}
@@ -0,0 +1,108 @@
package ru.demo.config;
import static org.springframework.kafka.support.serializer.JsonDeserializer.TYPE_MAPPINGS;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import ru.demo.model.StringValue;
import ru.demo.service.StringValueConsumer;
import ru.demo.service.StringValueConsumerLogger;
@Configuration
public class ApplicationConfig {
private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
public final String topicName;
public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) {
this.topicName = topicName;
}
@Bean
public ObjectMapper objectMapper() {
return JacksonUtils.enhancedObjectMapper();
}
@Bean
public ConsumerFactory<String, StringValue> consumerFactory(
KafkaProperties kafkaProperties, ObjectMapper mapper) {
var props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(TYPE_MAPPINGS, "ru.demo.model.StringValue:ru.demo.model.StringValue");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3_000);
var kafkaConsumerFactory = new DefaultKafkaConsumerFactory<String, StringValue>(props);
kafkaConsumerFactory.setValueDeserializer(new JsonDeserializer<>(mapper));
return kafkaConsumerFactory;
}
@Bean("listenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, StringValue>>
listenerContainerFactory(ConsumerFactory<String, StringValue> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, StringValue>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.setConcurrency(1);
factory.getContainerProperties().setIdleBetweenPolls(1_000);
factory.getContainerProperties().setPollTimeout(1_000);
var executor = new SimpleAsyncTaskExecutor("k-consumer-");
executor.setConcurrencyLimit(10);
var listenerTaskExecutor = new ConcurrentTaskExecutor(executor);
factory.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor);
return factory;
}
@Bean
public NewTopic topic() {
return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
}
@Bean
public StringValueConsumer stringValueConsumerLogger() {
return new StringValueConsumerLogger();
}
@Bean
public KafkaClient stringValueConsumer(StringValueConsumer stringValueConsumer) {
return new KafkaClient(stringValueConsumer);
}
public static class KafkaClient {
private final StringValueConsumer stringValueConsumer;
public KafkaClient(StringValueConsumer stringValueConsumer) {
this.stringValueConsumer = stringValueConsumer;
}
@KafkaListener(
topics = "${application.kafka.topic}",
containerFactory = "listenerContainerFactory")
public void listen(@Payload List<StringValue> values) {
log.info("values, values.size:{}", values.size());
stringValueConsumer.accept(values);
}
}
}
@@ -0,0 +1,3 @@
package ru.demo.model;
public record StringValue(long id, String value) {}
@@ -0,0 +1,9 @@
package ru.demo.service;
import java.util.List;
import ru.demo.model.StringValue;
public interface StringValueConsumer {
void accept(List<StringValue> value);
}
@@ -0,0 +1,17 @@
package ru.demo.service;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.demo.model.StringValue;
public class StringValueConsumerLogger implements StringValueConsumer {
private static final Logger log = LoggerFactory.getLogger(StringValueConsumerLogger.class);
@Override
public void accept(List<StringValue> values) {
for (var value : values) {
log.info("log:{}", value);
}
}
}
@@ -0,0 +1,12 @@
application:
kafka:
topic: "demo-topic"
spring:
kafka:
consumer:
group-id: "test-group"
bootstrap-servers: "localhost:9092"
client-id: "demo-consumer"
auto-offset-reset: earliest
@@ -0,0 +1,24 @@
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+92
View File
@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>spring-39-kafka</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.1</version>
</parent>
<packaging>pom</packaging>
<modules>
<module>consumer</module>
<module>producer</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
<maven-assembly-plugin.version>3.1.1</maven-assembly-plugin.version>
<minimal.maven.version>3.3.9</minimal.maven.version>
<dependencyManagement.version>1.0.12.RELEASE</dependencyManagement.version>
<springframeworkBoot.version>3.0.5</springframeworkBoot.version>
<jsr305.version>3.0.2</jsr305.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springframeworkBoot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>${maven-enforcer-plugin.version}</version>
<executions>
<execution>
<id>enforce-maven</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<dependencyConvergence/>
<requireMavenVersion>
<version>${minimal.maven.version}</version>
</requireMavenVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
+45
View File
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus</groupId>
<artifactId>spring-39-kafka</artifactId>
<version>1.0</version>
</parent>
<artifactId>producer</artifactId>
<version>1.0</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,11 @@
package ru.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApp {
public static void main(String[] args) {
SpringApplication.run(ProducerApp.class, args);
}
}
@@ -0,0 +1,73 @@
package ru.demo.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonSerializer;
import ru.demo.model.StringValue;
import ru.demo.service.DataSender;
import ru.demo.service.DataSenderKafka;
import ru.demo.service.StringValueSource;
@Configuration
public class ApplicationConfig {
private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
public final String topicName;
public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) {
this.topicName = topicName;
}
@Bean
public ObjectMapper objectMapper() {
return JacksonUtils.enhancedObjectMapper();
}
@Bean
public ProducerFactory<String, StringValue> producerFactory(
KafkaProperties kafkaProperties, ObjectMapper mapper) {
var props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, StringValue>(props);
kafkaProducerFactory.setValueSerializer(new JsonSerializer<>(mapper));
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, StringValue> kafkaTemplate(
ProducerFactory<String, StringValue> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
}
@Bean
public DataSender dataSender(NewTopic topic, KafkaTemplate<String, StringValue> kafkaTemplate) {
return new DataSenderKafka(
topic.name(),
kafkaTemplate,
stringValue -> log.info("asked, value:{}", stringValue));
}
@Bean
public StringValueSource stringValueSource(DataSender dataSender) {
return new StringValueSource(dataSender);
}
}
@@ -0,0 +1,3 @@
package ru.demo.model;
public record StringValue(long id, String value) {}
@@ -0,0 +1,7 @@
package ru.demo.service;
import ru.demo.model.StringValue;
public interface DataSender {
void send(StringValue value);
}
@@ -0,0 +1,48 @@
package ru.demo.service;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import ru.demo.model.StringValue;
public class DataSenderKafka implements DataSender {
private static final Logger log = LoggerFactory.getLogger(DataSenderKafka.class);
private final KafkaTemplate<String, StringValue> template;
private final Consumer<StringValue> sendAsk;
private final String topic;
public DataSenderKafka(
String topic,
KafkaTemplate<String, StringValue> template,
Consumer<StringValue> sendAsk) {
this.topic = topic;
this.template = template;
this.sendAsk = sendAsk;
}
@Override
public void send(StringValue value) {
try {
log.info("value:{}", value);
template.send(topic, value)
.whenComplete(
(result, ex) -> {
if (ex == null) {
log.info(
"message id:{} was sent, offset:{}",
value.id(),
result.getRecordMetadata().offset());
sendAsk.accept(value);
} else {
log.error("message id:{} was not sent", value.id(), ex);
}
});
} catch (Exception ex) {
log.error("send error, value:{}", value, ex);
}
}
}
@@ -0,0 +1,18 @@
package ru.demo.service;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
@Service
public class Runner implements CommandLineRunner {
private final ValueSource valueSource;
public Runner(ValueSource valueSource) {
this.valueSource = valueSource;
}
@Override
public void run(String... args) {
valueSource.generate();
}
}
@@ -0,0 +1,30 @@
package ru.demo.service;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.demo.model.StringValue;
public class StringValueSource implements ValueSource {
private static final Logger log = LoggerFactory.getLogger(StringValueSource.class);
private final AtomicLong nextValue = new AtomicLong(1);
private final DataSender valueConsumer;
public StringValueSource(DataSender dataSender) {
this.valueConsumer = dataSender;
}
@Override
public void generate() {
var executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> valueConsumer.send(makeValue()), 0, 1, TimeUnit.SECONDS);
log.info("generation started");
}
private StringValue makeValue() {
var id = nextValue.getAndIncrement();
return new StringValue(id, "stVal:" + id);
}
}
@@ -0,0 +1,5 @@
package ru.demo.service;
public interface ValueSource {
void generate();
}
@@ -0,0 +1,11 @@
application:
kafka:
topic: "demo-topic"
spring:
kafka:
producer:
bootstrap-servers: "127.0.0.1:9092"
client-id: "demo-producer"