diff --git a/2024-03/spring-20-reactor/.gitignore b/2024-03/spring-20-reactor/.gitignore new file mode 100644 index 00000000..fbe7a1ed --- /dev/null +++ b/2024-03/spring-20-reactor/.gitignore @@ -0,0 +1,7 @@ +.idea/ +*.iml + +target/ + +/node_modules +/output diff --git a/2024-03/spring-20-reactor/pom.xml b/2024-03/spring-20-reactor/pom.xml new file mode 100644 index 00000000..ee8b414c --- /dev/null +++ b/2024-03/spring-20-reactor/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + ru.otus + spring-20-reactor + 1.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 3.1.0 + + + + 17 + 17 + + + + + + io.projectreactor + reactor-bom + 2022.0.7 + pom + import + + + org.springframework.boot + spring-boot-dependencies + 3.1.0 + pom + import + + + + + + + + io.projectreactor + reactor-core + + + + ch.qos.logback + logback-classic + + + + io.projectreactor + reactor-test + test + + + org.junit.jupiter + junit-jupiter + test + + + org.assertj + assertj-core + test + + + + diff --git a/2024-03/spring-20-reactor/src/main/java/ru/otus/CreateExample.java b/2024-03/spring-20-reactor/src/main/java/ru/otus/CreateExample.java new file mode 100644 index 00000000..e279b6fe --- /dev/null +++ b/2024-03/spring-20-reactor/src/main/java/ru/otus/CreateExample.java @@ -0,0 +1,74 @@ +package ru.otus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +public class CreateExample { + private static final Logger logger = LoggerFactory.getLogger(CreateExample.class); + + public static void main(String[] args) { +// onEachNext(); +// lazyObservable(); +// creatorExample(); + } + + private static void onEachNext() { + Flux obs = Flux.just("one", "two", "three"); + obs.doFirst(() -> logger.info("Starting:")) + .doOnComplete(() -> logger.info("The end!")) + .doOnEach(item -> logger.info("item_1:{}", item.get())) + .subscribe(); + + logger.info("-----"); + + obs.doOnNext(item -> logger.info("item_2:{}", item)) + .map(String::length) + .doOnNext(item -> logger.info("length_2:{}", item)) + .subscribe(); + } + + private static void lazyObservable() { + Flux obs = Flux.defer(() -> { + logger.info("creating new Observable"); + return Flux.just("one", "two", "three"); + }); + + obs.doOnNext(item -> logger.info("item_1:{}", item)) + .subscribe(); + + logger.info("---------------"); + + obs.doOnNext(item -> logger.info("item_2:{}", item)) + .subscribe(); + } + + private static void creatorExample() { + Flux obs = Flux.create(emitter -> { + emitter.next("one"); + emitter.next("two"); + + emitter.error(new RuntimeException("Error!")); + + emitter.next("three"); + emitter.complete(); + }); + + obs.onErrorResume(e -> { + logger.error("error:{}", e.getMessage(), e); + return Flux.just("r1", "r2", "r3"); + }) + .doOnNext(item -> logger.info("item__1:{}", item)) + .subscribe(); + + logger.info("---------------"); + + Disposable disposable = obs.doOnNext(item -> logger.info("item__2:{}", item)) + .subscribe(next -> logger.info("next:{}", next), + error -> logger.info("error:{}", error.getMessage()), + () -> logger.info("onComplete")); + + logger.info("isDisposed:{}", disposable.isDisposed()); + } +} diff --git a/2024-03/spring-20-reactor/src/main/java/ru/otus/OperatorsExample.java b/2024-03/spring-20-reactor/src/main/java/ru/otus/OperatorsExample.java new file mode 100644 index 00000000..97ff6e9e --- /dev/null +++ b/2024-03/spring-20-reactor/src/main/java/ru/otus/OperatorsExample.java @@ -0,0 +1,51 @@ +package ru.otus; + +import java.time.LocalDate; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + + +public class OperatorsExample { + private static final Logger logger = LoggerFactory.getLogger(OperatorsExample.class); + + public static void main(String[] args) { +// merge(); +// fromList(); + } + + private static void merge() { + var listFlux1 = Flux.fromIterable(List.of( + new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)), + new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23)))); + + var listFlux2 = Flux.fromIterable(List.of( + new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)), + new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30)))); + + var listFlux3 = Flux.fromIterable(List.of( + new Person("Ivan", "Petrov", "male", LocalDate.of(1890, 2, 10)), + new Person("Joanne", "Stuard", "female", LocalDate.of(1965, 1, 3)))); + + Flux.merge(listFlux1, listFlux2, listFlux3) + .subscribe(person -> logger.info("person:{}", person)); + + } + + private static void fromList() { + var persons = List.of( + new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)), + new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23)), + new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)), + new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30))); + + var disposable = Flux.fromIterable(persons) + .filter(person -> person.birth().isAfter(LocalDate.of(1990, 1, 1))) + .map(p -> p.firstName() + " " + p.lastName()) + .collectList() + .subscribe(item -> logger.info("item: {}", item)); + + logger.info("disposable.isDisposed:{}", disposable.isDisposed()); + } +} diff --git a/2024-03/spring-20-reactor/src/main/java/ru/otus/Person.java b/2024-03/spring-20-reactor/src/main/java/ru/otus/Person.java new file mode 100644 index 00000000..ddbe5422 --- /dev/null +++ b/2024-03/spring-20-reactor/src/main/java/ru/otus/Person.java @@ -0,0 +1,6 @@ +package ru.otus; + +import java.time.LocalDate; + +public record Person(String firstName, String lastName, String gender, LocalDate birth) { +} diff --git a/2024-03/spring-20-reactor/src/main/java/ru/otus/PublisherExample.java b/2024-03/spring-20-reactor/src/main/java/ru/otus/PublisherExample.java new file mode 100644 index 00000000..aaab7853 --- /dev/null +++ b/2024-03/spring-20-reactor/src/main/java/ru/otus/PublisherExample.java @@ -0,0 +1,56 @@ +package ru.otus; + +import java.time.Duration; +import java.util.function.BiFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SynchronousSink; +import reactor.core.scheduler.Schedulers; + +public class PublisherExample { + private static final Logger logger = LoggerFactory.getLogger(PublisherExample.class); + + public static void main(String[] args) throws Exception { + publisherExample(); + } + + public static void publisherExample() throws InterruptedException { + Flux ob = magicPublisher(); + Thread.sleep(5000); + logger.info("First subscribed"); + var disposable1 = ob.subscribe(item -> logger.info("item: {}", item)); + + logger.info("disposable1.isDisposed:{}", disposable1.isDisposed()); + Thread.sleep(5000); + + logger.info("Second subscribed"); + var disposable2 = ob.subscribe(item -> logger.info("item second: {}", item)); + + + logger.info("disposable2.isDisposed():{}", disposable2.isDisposed()); + + Thread.sleep(60_000); + } + + public static Flux magicPublisher() { + var schedulerGenerator = Schedulers.newParallel("generator", 1); + var generator = Flux.generate( + () -> 0L, + (BiFunction, Long>) + (prev, sink) -> { + var newValue = prev + 1; + sink.next(newValue); + logger.info("newValue:{}", newValue); + return newValue; + }) + .delayElements(Duration.ofSeconds(5), schedulerGenerator) + .map(id -> "new id:" + id) + .doOnNext(val -> logger.info("val:{}", val)); + + ConnectableFlux generatorConnectable = generator.publish(); + + return generatorConnectable.autoConnect(0); + } +}