mirror of
https://github.com/OtusTeam/Spring.git
synced 2026-05-23 10:50:34 +00:00
41 kafka
This commit is contained in:
@@ -0,0 +1,39 @@
|
||||
# Compiled class file
|
||||
*.class
|
||||
|
||||
# Log file
|
||||
*.log
|
||||
|
||||
# BlueJ files
|
||||
*.ctxt
|
||||
|
||||
# Mobile Tools for Java (J2ME)
|
||||
.mtj.tmp/
|
||||
|
||||
# Package Files #
|
||||
*.jar
|
||||
!gradle-wrapper.jar
|
||||
*.war
|
||||
*.nar
|
||||
*.ear
|
||||
*.zip
|
||||
*.tar.gz
|
||||
*.rar
|
||||
|
||||
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
|
||||
hs_err_pid*
|
||||
|
||||
# Ignore Gradle project-specific cache directory
|
||||
.gradle
|
||||
/buildSrc/.gradle/
|
||||
|
||||
# Ignore Gradle build output directory
|
||||
build
|
||||
out
|
||||
|
||||
#Idea
|
||||
*.iml
|
||||
*.iws
|
||||
*.ipr
|
||||
*.idea
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>ru.otus</groupId>
|
||||
<artifactId>spring-41-kafka</artifactId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>consumer</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<properties>
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@@ -0,0 +1,11 @@
|
||||
package ru.demo;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ConsumerApp {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ConsumerApp.class, args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
package ru.demo.config;
|
||||
|
||||
import static org.springframework.kafka.support.serializer.JsonDeserializer.TYPE_MAPPINGS;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.util.List;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.config.TopicBuilder;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.support.JacksonUtils;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
|
||||
import ru.demo.model.StringValue;
|
||||
import ru.demo.service.StringValueConsumer;
|
||||
import ru.demo.service.StringValueConsumerLogger;
|
||||
|
||||
@Configuration
|
||||
public class ApplicationConfig {
|
||||
private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
|
||||
public final String topicName;
|
||||
|
||||
public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) {
|
||||
this.topicName = topicName;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ObjectMapper objectMapper() {
|
||||
return JacksonUtils.enhancedObjectMapper();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, StringValue> consumerFactory(
|
||||
KafkaProperties kafkaProperties, ObjectMapper mapper) {
|
||||
var props = kafkaProperties.buildConsumerProperties();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||
props.put(TYPE_MAPPINGS, "ru.demo.model.StringValue:ru.demo.model.StringValue");
|
||||
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
|
||||
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3_000);
|
||||
|
||||
var kafkaConsumerFactory = new DefaultKafkaConsumerFactory<String, StringValue>(props);
|
||||
kafkaConsumerFactory.setValueDeserializer(new JsonDeserializer<>(mapper));
|
||||
return kafkaConsumerFactory;
|
||||
}
|
||||
|
||||
@Bean("listenerContainerFactory")
|
||||
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, StringValue>>
|
||||
listenerContainerFactory(ConsumerFactory<String, StringValue> consumerFactory) {
|
||||
var factory = new ConcurrentKafkaListenerContainerFactory<String, StringValue>();
|
||||
factory.setConsumerFactory(consumerFactory);
|
||||
factory.setBatchListener(true);
|
||||
factory.setConcurrency(1);
|
||||
factory.getContainerProperties().setIdleBetweenPolls(1_000);
|
||||
factory.getContainerProperties().setPollTimeout(1_000);
|
||||
|
||||
var executor = new SimpleAsyncTaskExecutor("k-consumer-");
|
||||
executor.setConcurrencyLimit(10);
|
||||
var listenerTaskExecutor = new ConcurrentTaskExecutor(executor);
|
||||
factory.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public NewTopic topic() {
|
||||
return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StringValueConsumer stringValueConsumerLogger() {
|
||||
return new StringValueConsumerLogger();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaClient stringValueConsumer(StringValueConsumer stringValueConsumer) {
|
||||
return new KafkaClient(stringValueConsumer);
|
||||
}
|
||||
|
||||
public static class KafkaClient {
|
||||
private final StringValueConsumer stringValueConsumer;
|
||||
|
||||
public KafkaClient(StringValueConsumer stringValueConsumer) {
|
||||
this.stringValueConsumer = stringValueConsumer;
|
||||
}
|
||||
|
||||
@KafkaListener(
|
||||
topics = "${application.kafka.topic}",
|
||||
containerFactory = "listenerContainerFactory")
|
||||
public void listen(@Payload List<StringValue> values) {
|
||||
log.info("values, values.size:{}", values.size());
|
||||
stringValueConsumer.accept(values);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
package ru.demo.model;
|
||||
|
||||
public record StringValue(long id, String value) {}
|
||||
+9
@@ -0,0 +1,9 @@
|
||||
package ru.demo.service;
|
||||
|
||||
import java.util.List;
|
||||
import ru.demo.model.StringValue;
|
||||
|
||||
public interface StringValueConsumer {
|
||||
|
||||
void accept(List<StringValue> value);
|
||||
}
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
package ru.demo.service;
|
||||
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import ru.demo.model.StringValue;
|
||||
|
||||
public class StringValueConsumerLogger implements StringValueConsumer {
|
||||
private static final Logger log = LoggerFactory.getLogger(StringValueConsumerLogger.class);
|
||||
|
||||
@Override
|
||||
public void accept(List<StringValue> values) {
|
||||
for (var value : values) {
|
||||
log.info("log:{}", value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
application:
|
||||
kafka:
|
||||
topic: "demo-topic"
|
||||
|
||||
|
||||
spring:
|
||||
kafka:
|
||||
consumer:
|
||||
group-id: "test-group"
|
||||
bootstrap-servers: "localhost:9092"
|
||||
client-id: "demo-consumer"
|
||||
auto-offset-reset: earliest
|
||||
@@ -0,0 +1,24 @@
|
||||
version: "3.9"
|
||||
services:
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:6.2.0
|
||||
container_name: zookeeper
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_TICK_TIME: 2000
|
||||
|
||||
broker:
|
||||
image: confluentinc/cp-kafka:7.0.0
|
||||
container_name: broker
|
||||
ports:
|
||||
- "9092:9092"
|
||||
depends_on:
|
||||
- zookeeper
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
@@ -0,0 +1,92 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>ru.otus</groupId>
|
||||
<artifactId>spring-41-kafka</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</parent>
|
||||
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
<module>consumer</module>
|
||||
<module>producer</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
|
||||
<maven-assembly-plugin.version>3.1.1</maven-assembly-plugin.version>
|
||||
<minimal.maven.version>3.3.9</minimal.maven.version>
|
||||
<dependencyManagement.version>1.0.12.RELEASE</dependencyManagement.version>
|
||||
<springframeworkBoot.version>3.0.5</springframeworkBoot.version>
|
||||
<jsr305.version>3.0.2</jsr305.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>${springframeworkBoot.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
<version>${jsr305.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>${maven-enforcer-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce-maven</id>
|
||||
<goals>
|
||||
<goal>enforce</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<rules>
|
||||
<dependencyConvergence/>
|
||||
<requireMavenVersion>
|
||||
<version>${minimal.maven.version}</version>
|
||||
</requireMavenVersion>
|
||||
</rules>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>${maven-shade-plugin.version}</version>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>${maven-assembly-plugin.version}</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
</project>
|
||||
@@ -0,0 +1,45 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>ru.otus</groupId>
|
||||
<artifactId>spring-41-kafka</artifactId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>producer</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<properties>
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@@ -0,0 +1,11 @@
|
||||
package ru.demo;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ProducerApp {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ProducerApp.class, args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package ru.demo.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.config.TopicBuilder;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.JacksonUtils;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
import ru.demo.model.StringValue;
|
||||
import ru.demo.service.DataSender;
|
||||
import ru.demo.service.DataSenderKafka;
|
||||
import ru.demo.service.StringValueSource;
|
||||
|
||||
@Configuration
|
||||
public class ApplicationConfig {
|
||||
private static final Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
|
||||
public final String topicName;
|
||||
|
||||
public ApplicationConfig(@Value("${application.kafka.topic}") String topicName) {
|
||||
this.topicName = topicName;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ObjectMapper objectMapper() {
|
||||
return JacksonUtils.enhancedObjectMapper();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, StringValue> producerFactory(
|
||||
KafkaProperties kafkaProperties, ObjectMapper mapper) {
|
||||
var props = kafkaProperties.buildProducerProperties();
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
|
||||
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, StringValue>(props);
|
||||
kafkaProducerFactory.setValueSerializer(new JsonSerializer<>(mapper));
|
||||
return kafkaProducerFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, StringValue> kafkaTemplate(
|
||||
ProducerFactory<String, StringValue> producerFactory) {
|
||||
return new KafkaTemplate<>(producerFactory);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public NewTopic topic() {
|
||||
return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DataSender dataSender(NewTopic topic, KafkaTemplate<String, StringValue> kafkaTemplate) {
|
||||
return new DataSenderKafka(
|
||||
topic.name(),
|
||||
kafkaTemplate,
|
||||
stringValue -> log.info("asked, value:{}", stringValue));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StringValueSource stringValueSource(DataSender dataSender) {
|
||||
return new StringValueSource(dataSender);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
package ru.demo.model;
|
||||
|
||||
public record StringValue(long id, String value) {}
|
||||
@@ -0,0 +1,7 @@
|
||||
package ru.demo.service;
|
||||
|
||||
import ru.demo.model.StringValue;
|
||||
|
||||
public interface DataSender {
|
||||
void send(StringValue value);
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package ru.demo.service;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import ru.demo.model.StringValue;
|
||||
|
||||
public class DataSenderKafka implements DataSender {
|
||||
private static final Logger log = LoggerFactory.getLogger(DataSenderKafka.class);
|
||||
|
||||
private final KafkaTemplate<String, StringValue> template;
|
||||
|
||||
private final Consumer<StringValue> sendAsk;
|
||||
|
||||
private final String topic;
|
||||
|
||||
public DataSenderKafka(
|
||||
String topic,
|
||||
KafkaTemplate<String, StringValue> template,
|
||||
Consumer<StringValue> sendAsk) {
|
||||
this.topic = topic;
|
||||
this.template = template;
|
||||
this.sendAsk = sendAsk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(StringValue value) {
|
||||
try {
|
||||
log.info("value:{}", value);
|
||||
template.send(topic, value)
|
||||
.whenComplete(
|
||||
(result, ex) -> {
|
||||
if (ex == null) {
|
||||
log.info(
|
||||
"message id:{} was sent, offset:{}",
|
||||
value.id(),
|
||||
result.getRecordMetadata().offset());
|
||||
sendAsk.accept(value);
|
||||
} else {
|
||||
log.error("message id:{} was not sent", value.id(), ex);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
log.error("send error, value:{}", value, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package ru.demo.service;
|
||||
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class Runner implements CommandLineRunner {
|
||||
private final ValueSource valueSource;
|
||||
|
||||
public Runner(ValueSource valueSource) {
|
||||
this.valueSource = valueSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) {
|
||||
valueSource.generate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package ru.demo.service;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import ru.demo.model.StringValue;
|
||||
|
||||
public class StringValueSource implements ValueSource {
|
||||
private static final Logger log = LoggerFactory.getLogger(StringValueSource.class);
|
||||
private final AtomicLong nextValue = new AtomicLong(1);
|
||||
private final DataSender valueConsumer;
|
||||
|
||||
public StringValueSource(DataSender dataSender) {
|
||||
this.valueConsumer = dataSender;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void generate() {
|
||||
var executor = Executors.newScheduledThreadPool(1);
|
||||
executor.scheduleAtFixedRate(() -> valueConsumer.send(makeValue()), 0, 1, TimeUnit.SECONDS);
|
||||
log.info("generation started");
|
||||
}
|
||||
|
||||
private StringValue makeValue() {
|
||||
var id = nextValue.getAndIncrement();
|
||||
return new StringValue(id, "stVal:" + id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package ru.demo.service;
|
||||
|
||||
public interface ValueSource {
|
||||
void generate();
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
application:
|
||||
kafka:
|
||||
topic: "demo-topic"
|
||||
|
||||
spring:
|
||||
kafka:
|
||||
producer:
|
||||
bootstrap-servers: "127.0.0.1:9092"
|
||||
client-id: "demo-producer"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user