diff --git a/2023-01/spring-20-rxjava/.gitignore b/2023-01/spring-20-rxjava/.gitignore new file mode 100644 index 00000000..fbe7a1ed --- /dev/null +++ b/2023-01/spring-20-rxjava/.gitignore @@ -0,0 +1,7 @@ +.idea/ +*.iml + +target/ + +/node_modules +/output diff --git a/2023-01/spring-20-rxjava/pom.xml b/2023-01/spring-20-rxjava/pom.xml new file mode 100644 index 00000000..39500fe7 --- /dev/null +++ b/2023-01/spring-20-rxjava/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + ru.otus + spring-20-rxjava + 1.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 3.0.2 + + + + 17 + 17 + + + + + io.reactivex.rxjava2 + rxjava + 2.2.21 + + + ch.qos.logback + logback-classic + 1.4.6 + + + backport-util-concurrent + backport-util-concurrent + 3.1 + compile + + + diff --git a/2023-01/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java b/2023-01/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java new file mode 100644 index 00000000..61b00293 --- /dev/null +++ b/2023-01/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java @@ -0,0 +1,67 @@ +package ru.otus; + +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CreateExample { + private static final Logger logger = LoggerFactory.getLogger(CreateExample.class); + + public static void main(String[] args) { + //onEachNext(); + //lazyObservable(); + //creatorExample(); + } + + private static void onEachNext() { + Observable obs = Observable.just("one", "two", "three"); + obs.doOnEach(item -> logger.info("item_1:{}", item.getValue())) + .subscribe(); + logger.info("-----"); + obs.doOnNext(item -> logger.info("item_2:{}", item)) + .map(String::length) + .doOnNext(item -> logger.info("length_2:{}", item)) + .subscribe(); + } + + private static void lazyObservable() { + Observable obs = Observable.defer(() -> { + logger.info("creating new Observable"); + return Observable.just("one", "two", "three"); + }); + + obs.doOnNext(item -> logger.info("item_1:{}", item)) + .subscribe(); + + logger.info("---------------"); + + obs.doOnNext(item -> logger.info("item_2:{}", item)) + .subscribe(); + } + + private static void creatorExample() { + Observable obs = Observable.create(emitter -> { + emitter.onNext("one"); + emitter.onNext("two"); + + emitter.onError(new RuntimeException("Error!")); + + emitter.onNext("three"); + emitter.onComplete(); + }); + + obs.onExceptionResumeNext(Observable.just("r1", "r2", "r3")) + .doOnNext(item -> logger.info("item__1:{}", item)) + .subscribe(); + + logger.info("---------------"); + + Disposable disposable = obs.doOnNext(item -> logger.info("item__2:{}", item)) + .subscribe(next -> logger.info("next:{}", next), + error -> logger.info("error:{}", error.getMessage()), + () -> logger.info("onComplete")); + + logger.info("isDisposed:{}", disposable.isDisposed()); + } +} diff --git a/2023-01/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java b/2023-01/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java new file mode 100644 index 00000000..967e9da6 --- /dev/null +++ b/2023-01/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java @@ -0,0 +1,43 @@ +package ru.otus; + +import io.reactivex.Observable; +import io.reactivex.ObservableTransformer; + +import java.time.LocalDate; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OperatorsExample { + private static final Logger logger = LoggerFactory.getLogger(OperatorsExample.class); + + public static void main(String[] args) { + var persons = List.of( + new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)), + new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23)), + new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)), + new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30))); + + var disposable = Observable.fromIterable(persons) + .filter(person -> person.birth().isAfter(LocalDate.of(1990, 1, 1))) + .map(p -> p.firstName() + " " + p.lastName()) + .toList() + .subscribe(item -> logger.info("item: {}", item)); + + logger.info("disposable.isDisposed:{}", disposable.isDisposed()); + + var disposableTransform = Observable.fromIterable(persons) + .compose(filterAndUpperCase()) + .subscribe(val -> logger.info("value:{}", val)); + + logger.info("disposable.isDisposed:{}", disposableTransform.isDisposed()); + } + + private static ObservableTransformer filterAndUpperCase() { + return upstream -> upstream + .map(Person::firstName) + .filter(s -> s.length() >= 4) + .map(String::toUpperCase); + } +} diff --git a/2023-01/spring-20-rxjava/src/main/java/ru/otus/Person.java b/2023-01/spring-20-rxjava/src/main/java/ru/otus/Person.java new file mode 100644 index 00000000..ddbe5422 --- /dev/null +++ b/2023-01/spring-20-rxjava/src/main/java/ru/otus/Person.java @@ -0,0 +1,6 @@ +package ru.otus; + +import java.time.LocalDate; + +public record Person(String firstName, String lastName, String gender, LocalDate birth) { +} diff --git a/2023-01/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java b/2023-01/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java new file mode 100644 index 00000000..5f31318e --- /dev/null +++ b/2023-01/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java @@ -0,0 +1,53 @@ +package ru.otus; + +import io.reactivex.Observable; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.subjects.PublishSubject; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PublisherExample { + private static final Logger logger = LoggerFactory.getLogger(PublisherExample.class); + + public static void main(String[] args) throws Exception { + publisherExample(); + } + + public static void publisherExample() throws InterruptedException { + Observable ob = magicPublisher(); + logger.info("First subscribed"); + Thread.sleep(5000); + var disposable1 = ob.subscribe(item -> logger.info("item: {}", item)); + + logger.info("disposable1.isDisposed:{}", disposable1.isDisposed()); + Thread.sleep(5000); + + logger.info("Second subscribed"); + var disposable2 = ob.subscribe(item -> logger.info("item second: {}", item)); + + + logger.info("disposable2.isDisposed():{}", disposable2.isDisposed()); + + Thread.sleep(60_000); + } + + public static Observable magicPublisher() { + var executor = Executors.newSingleThreadScheduledExecutor(); + + ObservableOnSubscribe handler = emitter -> { + var currentValue = new AtomicLong(0); + executor.scheduleWithFixedDelay(() -> + emitter.onNext(currentValue.incrementAndGet()), 0, 2, TimeUnit.SECONDS); + }; + + Observable obs = Observable.create(handler) + .map(String::valueOf); + + PublishSubject subject = PublishSubject.create(); + obs.subscribe(subject); + return subject; + } +}