From 76e3423102b4fc8b0e89407ffa4182068e923ff4 Mon Sep 17 00:00:00 2001 From: Yuriy Dvorzhetskiy Date: Sun, 14 Nov 2021 00:43:59 +0300 Subject: [PATCH] A bit of reactor updates --- .../spring-19-reactive-spring-data/pom.xml | 4 +- .../java/ru/otus/spring/domain/Account.java | 14 ++--- .../spring/repostory/AccountRepository.java | 1 - .../spring/repostory/PersonRepository.java | 2 +- 2021-08/spring-19/spring-19-reactor/pom.xml | 2 +- .../src/main/java/ru/otus/spring/Main.java | 4 +- .../ru/otus/spring/reactor/FluxService.java | 50 ---------------- .../reactor/ReactiveProcessingService.java | 58 +++++++++++++++++++ .../{reactor => service}/NonFluxService.java | 3 +- 2021-08/spring-19/spring-19-web-flux/pom.xml | 2 +- 10 files changed, 74 insertions(+), 66 deletions(-) delete mode 100644 2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/FluxService.java create mode 100644 2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java rename 2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/{reactor => service}/NonFluxService.java (90%) diff --git a/2021-08/spring-19/spring-19-reactive-spring-data/pom.xml b/2021-08/spring-19/spring-19-reactive-spring-data/pom.xml index 7f18f9ab..edf3be45 100644 --- a/2021-08/spring-19/spring-19-reactive-spring-data/pom.xml +++ b/2021-08/spring-19/spring-19-reactive-spring-data/pom.xml @@ -11,7 +11,7 @@ org.springframework.boot spring-boot-starter-parent - 2.2.6.RELEASE + 2.5.6 @@ -24,6 +24,7 @@ org.springframework.boot spring-boot-starter + org.springframework.boot spring-boot-starter-data-mongodb-reactive @@ -32,7 +33,6 @@ org.mongodb mongodb-driver-reactivestreams - de.flapdoodle.embed de.flapdoodle.embed.mongo diff --git a/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/domain/Account.java b/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/domain/Account.java index 2158e0b7..0f34a772 100644 --- a/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/domain/Account.java +++ b/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/domain/Account.java @@ -4,33 +4,33 @@ public class Account { private String id; private String personId; private Long amount; - + public Account(String id, String personId, Long amount) { this.id = id; this.personId = personId; this.amount = amount; } - + public String getId() { return id; } - + public void setId(String id) { this.id = id; } - + public String getPersonId() { return personId; } - + public void setPersonId(String personId) { this.personId = personId; } - + public Long getAmount() { return amount; } - + public void setAmount(Long amount) { this.amount = amount; } diff --git a/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/AccountRepository.java b/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/AccountRepository.java index 22bd9275..a06fcdb3 100644 --- a/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/AccountRepository.java +++ b/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/AccountRepository.java @@ -2,7 +2,6 @@ package ru.otus.spring.repostory; import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import ru.otus.spring.domain.Account; -import ru.otus.spring.domain.Person; public interface AccountRepository extends ReactiveMongoRepository { } diff --git a/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java b/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java index ddd15cef..26b562e2 100644 --- a/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java +++ b/2021-08/spring-19/spring-19-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java @@ -2,12 +2,12 @@ package ru.otus.spring.repostory; import org.springframework.data.mongodb.repository.Query; import org.springframework.data.mongodb.repository.ReactiveMongoRepository; -import org.springframework.data.repository.reactive.ReactiveCrudRepository; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import ru.otus.spring.domain.Person; public interface PersonRepository extends ReactiveMongoRepository { + Flux findByName(String name); @Query("{ 'name': ?0 }") diff --git a/2021-08/spring-19/spring-19-reactor/pom.xml b/2021-08/spring-19/spring-19-reactor/pom.xml index c76c7939..085139d5 100644 --- a/2021-08/spring-19/spring-19-reactor/pom.xml +++ b/2021-08/spring-19/spring-19-reactor/pom.xml @@ -11,7 +11,7 @@ org.springframework.boot spring-boot-starter-parent - 2.2.6.RELEASE + 2.5.6 diff --git a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/Main.java b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/Main.java index cc406c39..ff27392b 100644 --- a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/Main.java +++ b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/Main.java @@ -3,7 +3,7 @@ package ru.otus.spring; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; -import ru.otus.spring.reactor.FluxService; +import ru.otus.spring.reactor.ReactiveProcessingService; @SpringBootApplication public class Main { @@ -11,7 +11,7 @@ public class Main { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(Main.class); - FluxService service = context.getBean(FluxService.class); + ReactiveProcessingService service = context.getBean(ReactiveProcessingService.class); service.printHello("Ivan"); } diff --git a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/FluxService.java b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/FluxService.java deleted file mode 100644 index e2e67f71..00000000 --- a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/FluxService.java +++ /dev/null @@ -1,50 +0,0 @@ -package ru.otus.spring.reactor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import reactor.core.Disposable; -import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -@Service -public class FluxService { - - private final Logger logger = LoggerFactory.getLogger(FluxService.class); - - private final NonFluxService nonFluxService; - private final DirectProcessor processor; - private final Disposable flow; - - @Autowired - public FluxService(NonFluxService nonFluxService) { - this.nonFluxService = nonFluxService; - // Создаём процессор - это reactor-овская реализация reactive-stream интерфейса - // Direct processor, кстати - это простой последовательный вызов методов) - processor = DirectProcessor.create(); - // Здесь мы настриваем flow - flow = Mono.from(processor) - .map(nonFluxService::nonFluxSayHello) - .subscribe(this::printMessage); - } - - /** - * Этот метод будет инициировать асинзронную обрабтку сообщения - * - * @param name это имя будет приходить из не-reactor окружения - */ - public void printHello(String name) { - processor.onNext(new Message(name)); - } - - /** - * А это терминальный шаг для сообщения - * - * @param message а это финальный шаг для сообщения, отсюда можно вернуть рзультат в не-реактив окружение - */ - private void printMessage(Message message) { - logger.info("Message received: {}", message.getValue()); - } -} diff --git a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java new file mode 100644 index 00000000..99836fe0 --- /dev/null +++ b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java @@ -0,0 +1,58 @@ +package ru.otus.spring.reactor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.Disposable; +import reactor.core.publisher.Sinks; +import ru.otus.spring.service.NonFluxService; + +import javax.annotation.PreDestroy; + +@Service +public class ReactiveProcessingService { + + private final Logger logger = LoggerFactory.getLogger(ReactiveProcessingService.class); + + private final Sinks.Many sink; + private final Disposable flow; + + public ReactiveProcessingService(NonFluxService nonFluxService) { + // Создаём sink (ранее - процессор) + // Это reactor-овская реализация reactive-stream интерфейса + // Обрабатывает данные как простой последовательный вызов методов :) + sink = Sinks.many().multicast().directBestEffort(); + // Здесь мы настраиваем flow + flow = sink.asFlux() + .map(nonFluxService::nonFluxSayHello) + .subscribe(this::printMessage); + // в идеале в коде выше должен быть doOnNext + // в map не предполагаются задержки + } + + /** + * Этот метод будет инициировать асинхронную обрабтку сообщения + * + * @param name это имя будет приходить из не-reactor окружения + */ + public void printHello(String name) { + sink.tryEmitNext(new Message(name)); + } + + /** + * А это терминальный шаг для сообщения + * + * @param message а это финальный шаг для сообщения, отсюда можно вернуть рзультат в не-реактив окружение + */ + private void printMessage(Message message) { + logger.info("Message received: {}", message.getValue()); + } + + /** + * Просто пример, как остановить процесс + */ + @PreDestroy + public void dispose() { + this.flow.dispose(); + } +} diff --git a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/NonFluxService.java b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/service/NonFluxService.java similarity index 90% rename from 2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/NonFluxService.java rename to 2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/service/NonFluxService.java index 1b9be28a..b635e785 100644 --- a/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/reactor/NonFluxService.java +++ b/2021-08/spring-19/spring-19-reactor/src/main/java/ru/otus/spring/service/NonFluxService.java @@ -1,8 +1,9 @@ -package ru.otus.spring.reactor; +package ru.otus.spring.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import ru.otus.spring.reactor.Message; @Service public class NonFluxService { diff --git a/2021-08/spring-19/spring-19-web-flux/pom.xml b/2021-08/spring-19/spring-19-web-flux/pom.xml index 705f3287..d1cb622f 100644 --- a/2021-08/spring-19/spring-19-web-flux/pom.xml +++ b/2021-08/spring-19/spring-19-web-flux/pom.xml @@ -25,7 +25,7 @@ spring-boot-starter-webflux - + io.reactivex.rxjava2 rxjava