diff --git a/2025-05/spring-38-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java b/2025-05/spring-38-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java index dc63a5a8..a44ba11f 100644 --- a/2025-05/spring-38-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java +++ b/2025-05/spring-38-kafka/consumer/src/main/java/ru/demo/config/ApplicationConfig.java @@ -50,7 +50,9 @@ public class ApplicationConfig { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(TYPE_MAPPINGS, "ru.demo.model.StringValue:ru.demo.model.StringValue"); + // Максимальное количество записей, возвращаемых за один вызов poll() props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); + // Максимальный интервал в миллисекундах между вызовами poll() (после превышения consumer считается неактивным) props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3_000); var kafkaConsumerFactory = new DefaultKafkaConsumerFactory(props); @@ -65,7 +67,9 @@ public class ApplicationConfig { factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); factory.setConcurrency(1); + // Время в миллисекундах между вызовами poll() к Kafka брокеру factory.getContainerProperties().setIdleBetweenPolls(1_000); + // Таймаут в миллисекундах для операции poll() (время ожидания новых сообщений) factory.getContainerProperties().setPollTimeout(1_000); var executor = new SimpleAsyncTaskExecutor("k-consumer-"); diff --git a/2025-05/spring-38-kafka/consumer/src/main/resources/application.yml b/2025-05/spring-38-kafka/consumer/src/main/resources/application.yml index 32db801b..23adc138 100644 --- a/2025-05/spring-38-kafka/consumer/src/main/resources/application.yml +++ b/2025-05/spring-38-kafka/consumer/src/main/resources/application.yml @@ -9,4 +9,5 @@ spring: group-id: "test-group" bootstrap-servers: "localhost:9092" client-id: "demo-consumer" + # auto-offset-reset - стратегия чтения при отсутствии сохраненного offset: earliest (с начала), latest (с конца) auto-offset-reset: earliest diff --git a/2025-05/spring-38-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java b/2025-05/spring-38-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java index 11858f6f..5e60699e 100644 --- a/2025-05/spring-38-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java +++ b/2025-05/spring-38-kafka/producer/src/main/java/ru/demo/config/ApplicationConfig.java @@ -58,6 +58,7 @@ public class ApplicationConfig { @Bean public NewTopic topic() { + // Создание топика с указанным количеством партиций и реплик return TopicBuilder.name(topicName).partitions(1).replicas(1).build(); }