diff --git a/2022-11/spring-21-reactive/.gitignore b/2022-11/spring-21-reactive/.gitignore new file mode 100644 index 00000000..4ea52072 --- /dev/null +++ b/2022-11/spring-21-reactive/.gitignore @@ -0,0 +1,24 @@ +target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/build/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ diff --git a/2022-11/spring-21-reactive/pom.xml b/2022-11/spring-21-reactive/pom.xml new file mode 100644 index 00000000..79682fee --- /dev/null +++ b/2022-11/spring-21-reactive/pom.xml @@ -0,0 +1,18 @@ + + + 4.0.0 + + ru.otus + spring-21-reactive + 1.0 + + pom + + + spring-21-web-flux + spring-21-reactor + spring-21-reactive-spring-data + + diff --git a/2022-11/spring-21-reactive/spring-21-reactive-spring-data/pom.xml b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/pom.xml new file mode 100644 index 00000000..0598ef68 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + ru.otus + spring-21-reactive-spring-data + 1.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.7.5 + + + + + 11 + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-data-mongodb-reactive + + + org.mongodb + mongodb-driver-reactivestreams + + + de.flapdoodle.embed + de.flapdoodle.embed.mongo + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/ReactiveSpringDataDemo.java b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/ReactiveSpringDataDemo.java new file mode 100644 index 00000000..f8a49c43 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/ReactiveSpringDataDemo.java @@ -0,0 +1,42 @@ +package ru.otus.spring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import reactor.core.publisher.Flux; +import ru.otus.spring.domain.Person; +import ru.otus.spring.repostory.PersonRepository; + +import java.util.List; + +@SpringBootApplication +public class ReactiveSpringDataDemo { + private static final Logger logger = LoggerFactory.getLogger(ReactiveSpringDataDemo.class); + + public static void main(String[] args) throws InterruptedException { + var context = SpringApplication.run(ReactiveSpringDataDemo.class); + + var repository = context.getBean(PersonRepository.class); + + var persons = List.of(new Person("Pushkin"), new Person("Lermontov")); + + logger.info("before save"); + repository.saveAll(persons) + .doOnNext(savedPers -> logger.info("savedPers:{}", savedPers)) + .subscribe(); + logger.info("after save"); + + // можно ничего и не найти + repository.findAll() + .map(Person::getName) + .subscribe(name -> logger.info("person name:{}", name)); + + // Пример объединения двух Flux + Flux.merge(repository.findAll(), repository.findAll()) + .map(Person::getName) + .subscribe(name -> logger.info("join name:{}", name)); + + Thread.sleep(60_000); + } +} diff --git a/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/domain/Person.java b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/domain/Person.java new file mode 100644 index 00000000..e3859657 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/domain/Person.java @@ -0,0 +1,35 @@ +package ru.otus.spring.domain; + +public class Person { + + private String id; + private String name; + + public Person(String name) { + this.name = name; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return "Person{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + '}'; + } +} diff --git a/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java new file mode 100644 index 00000000..87ad0c54 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/java/ru/otus/spring/repostory/PersonRepository.java @@ -0,0 +1,17 @@ +package ru.otus.spring.repostory; + +import org.springframework.data.mongodb.repository.Query; +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import ru.otus.spring.domain.Person; + +public interface PersonRepository extends ReactiveMongoRepository { + + // -ooo---ooo---ooo|--------- + // --------------------X----- + Flux findByName(String name); + + @Query("{ 'name': ?0 }") + Mono findFirstByName(String name); +} diff --git a/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/resources/application.yml b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/resources/application.yml new file mode 100644 index 00000000..6a372567 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactive-spring-data/src/main/resources/application.yml @@ -0,0 +1,4 @@ +spring: + mongodb: + embedded: + version: "3.5.5" \ No newline at end of file diff --git a/2022-11/spring-21-reactive/spring-21-reactor/pom.xml b/2022-11/spring-21-reactive/spring-21-reactor/pom.xml new file mode 100644 index 00000000..b583fdc7 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactor/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + ru.otus + spring-21-reactor + 1.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.6.3 + + + + + 11 + + + + + org.springframework.boot + spring-boot-starter + + + io.projectreactor + reactor-core + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/Main.java b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/Main.java new file mode 100644 index 00000000..ed893f7e --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/Main.java @@ -0,0 +1,21 @@ +package ru.otus.spring; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import ru.otus.spring.reactor.ReactiveProcessingService; + +@SpringBootApplication +public class Main { + + public static void main(String[] args) { + var context = SpringApplication.run(Main.class); + + var service = context.getBean(ReactiveProcessingService.class); + + for (int i = 0; i < 10; ++i) { + service.printHello(String.format("Ivan_%d", i)); + } + } +} + + diff --git a/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/reactor/Message.java b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/reactor/Message.java new file mode 100644 index 00000000..ac58a62c --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/reactor/Message.java @@ -0,0 +1,14 @@ +package ru.otus.spring.reactor; + +public class Message { + + private final String value; + + public Message(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java new file mode 100644 index 00000000..f2837011 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/reactor/ReactiveProcessingService.java @@ -0,0 +1,50 @@ +package ru.otus.spring.reactor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.Disposable; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; +import ru.otus.spring.service.NonFluxService; + +import javax.annotation.PreDestroy; + +@Service +public class ReactiveProcessingService { + + private final Logger logger = LoggerFactory.getLogger(ReactiveProcessingService.class); + + private final Sinks.Many sink; + private final Disposable flow; + + public ReactiveProcessingService(NonFluxService nonFluxService) { + var scheduler = Schedulers.newParallel("processing", 2); + sink = Sinks.many().unicast().onBackpressureBuffer(); + flow = sink.asFlux() + .publishOn(scheduler) + .map(nonFluxService::nonFluxSayHello) + .doOnError(error -> logger.error("error", error)) + .subscribe(this::printMessage); + } + + /** + * Этот метод будет инициировать асинхронную обрабтку сообщения + * + * @param name это имя будет приходить из не-reactor окружения + */ + public void printHello(String name) { + var emitResult = sink.tryEmitNext(new Message(name)); + logger.info("emitResult:{}", emitResult); + } + + + private void printMessage(Message message) { + logger.info("Message received: {}", message.getValue()); + } + + @PreDestroy + public void dispose() { + this.flow.dispose(); + } +} diff --git a/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/service/NonFluxService.java b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/service/NonFluxService.java new file mode 100644 index 00000000..1b54f625 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-reactor/src/main/java/ru/otus/spring/service/NonFluxService.java @@ -0,0 +1,26 @@ +package ru.otus.spring.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import ru.otus.spring.reactor.Message; + +@Service +public class NonFluxService { + + private final Logger logger = LoggerFactory.getLogger(NonFluxService.class); + + public Message nonFluxSayHello(Message message) { + logger.info("Message received in non-flux service: {}", message.getValue()); + + var name = message.getValue(); + var withHello = "Hello, " + name + "!"; + try { + Thread.sleep(1000); + return new Message(withHello); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return new Message(withHello); + } + } +} diff --git a/2022-11/spring-21-reactive/spring-21-web-flux/HttpRequests.http b/2022-11/spring-21-reactive/spring-21-web-flux/HttpRequests.http new file mode 100644 index 00000000..af0b1fc6 --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-web-flux/HttpRequests.http @@ -0,0 +1,41 @@ +### +GET http://localhost:8080/flux/one +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8080/flux/three +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8080/flux/ten +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8080/flux/stream +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8080/rx/one +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8080/rx/ten +Accept: */* +Content-Type: application/json +Cache-Control: no-cache + +### +GET http://localhost:8080/rx/five +Accept: */* +Content-Type: application/json +Cache-Control: no-cache \ No newline at end of file diff --git a/2022-11/spring-21-reactive/spring-21-web-flux/pom.xml b/2022-11/spring-21-reactive/spring-21-web-flux/pom.xml new file mode 100644 index 00000000..f359099b --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-web-flux/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + ru.otus + spring-21-web-flux + 1.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.6.3 + + + + + 11 + + + + + org.springframework.boot + spring-boot-starter-webflux + + + + + io.reactivex.rxjava2 + rxjava + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/ReactorController.java b/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/ReactorController.java new file mode 100644 index 00000000..e24d0c9f --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/ReactorController.java @@ -0,0 +1,39 @@ +package ru.otus.spring; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +@RestController +public class ReactorController { + + @GetMapping("/flux/one") + public Mono one() { + return Mono.just("one"); + } + + @GetMapping("/flux/three") + public Flux three() { + return Flux.just(300, 600, 900); + } + + @GetMapping("/flux/ten") + public Flux list() { + // 12345678910|------ + return Flux.range(1, 10); + } + + @GetMapping(path = "/flux/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux stream() { + return Flux.generate(() -> 0, (state, emitter) -> { + emitter.next(state); + return state + 1; + }) + .delayElements(Duration.ofSeconds(1L)) + .map(Object::toString); + } +} diff --git a/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/RxJava2Controller.java b/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/RxJava2Controller.java new file mode 100644 index 00000000..e513d02c --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/RxJava2Controller.java @@ -0,0 +1,38 @@ +package ru.otus.spring; + +import io.reactivex.Flowable; +import io.reactivex.Single; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.TimeUnit; + +@RestController +public class RxJava2Controller { + private static final Logger logger = LoggerFactory.getLogger(RxJava2Controller.class); + + @GetMapping("/rx/one") + public Single single() { + return Single.just("one") + .map(String::length); + } + + @GetMapping(value = "/rx/ten", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flowable list() { + // --0--1--2--3--4--... + return Flowable.interval(2, TimeUnit.SECONDS) + .map(i -> i + 1) + .doOnNext(item -> logger.info("item:{}", item)); + } + + @GetMapping(value = "/rx/five", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flowable five() { + // --0--1--2--3--4--... + return Flowable.interval(1, TimeUnit.SECONDS) + .doOnNext(item -> logger.info("item:{}", item)) + .limit(5); + } +} diff --git a/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/WebFluxSpringDemo.java b/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/WebFluxSpringDemo.java new file mode 100644 index 00000000..fc0e020c --- /dev/null +++ b/2022-11/spring-21-reactive/spring-21-web-flux/src/main/java/ru/otus/spring/WebFluxSpringDemo.java @@ -0,0 +1,14 @@ +package ru.otus.spring; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class WebFluxSpringDemo { + + public static void main(String[] args) { + SpringApplication.run(WebFluxSpringDemo.class); + } +} + +