From b3bd66a49d785f3c0d97953409ec3e9381cc3a48 Mon Sep 17 00:00:00 2001 From: sergey Date: Thu, 9 Feb 2023 23:14:13 +0300 Subject: [PATCH] L20 rxJava --- 2022-11/spring-20-rxjava/.gitignore | 7 ++ 2022-11/spring-20-rxjava/pom.xml | 39 ++++++++++ .../src/main/java/ru/otus/CreateExample.java | 75 +++++++++++++++++++ .../main/java/ru/otus/OperatorsExample.java | 44 +++++++++++ .../src/main/java/ru/otus/Person.java | 66 ++++++++++++++++ .../main/java/ru/otus/PublisherExample.java | 51 +++++++++++++ 6 files changed, 282 insertions(+) create mode 100644 2022-11/spring-20-rxjava/.gitignore create mode 100644 2022-11/spring-20-rxjava/pom.xml create mode 100644 2022-11/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java create mode 100644 2022-11/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java create mode 100644 2022-11/spring-20-rxjava/src/main/java/ru/otus/Person.java create mode 100644 2022-11/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java diff --git a/2022-11/spring-20-rxjava/.gitignore b/2022-11/spring-20-rxjava/.gitignore new file mode 100644 index 00000000..fbe7a1ed --- /dev/null +++ b/2022-11/spring-20-rxjava/.gitignore @@ -0,0 +1,7 @@ +.idea/ +*.iml + +target/ + +/node_modules +/output diff --git a/2022-11/spring-20-rxjava/pom.xml b/2022-11/spring-20-rxjava/pom.xml new file mode 100644 index 00000000..c43ac59b --- /dev/null +++ b/2022-11/spring-20-rxjava/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + ru.otus + spring-20-rxjava + 1.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 2.2.4.RELEASE + + + + 11 + 11 + + + + + io.reactivex.rxjava2 + rxjava + + + ch.qos.logback + logback-classic + 1.2.11 + + + + org.slf4j + slf4j-api + 1.7.36 + + + diff --git a/2022-11/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java b/2022-11/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java new file mode 100644 index 00000000..4fc2cb78 --- /dev/null +++ b/2022-11/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java @@ -0,0 +1,75 @@ +package ru.otus; + +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CreateExample { + private static final Logger logger = LoggerFactory.getLogger(CreateExample.class); + + public static void main(String[] args) { + //onEachNext(); + //lazyObservable(); + //creatorExample(); + } + + private static void creatorExample() { + Observable obs = createExample(); + + obs.onExceptionResumeNext(Observable.just("r1", "r2", "r3")) + .doOnNext(item -> logger.info("item__1:{}", item)) + .subscribe(); + + logger.info("---------------"); + + Disposable disposable = obs.doOnNext(item -> logger.info("item__2:{}", item)) + .subscribe(next -> logger.info("next:{}", next), + error -> logger.info("error:{}", error.getMessage()), + () -> logger.info("onComplete")); + + logger.info("isDisposed:{}", disposable.isDisposed()); + } + + private static void lazyObservable() { + Observable obs = deferExample(); + + obs.doOnNext(item -> logger.info("item_1:{}", item)) + .subscribe(); + obs.doOnNext(item -> logger.info("item_2:{}", item)) + .subscribe(); + } + + private static void onEachNext() { + Observable obs = justExample(); + obs.doOnEach(item -> logger.info("item_1:{}", item.getValue())) + .subscribe(); + obs.doOnNext(item -> logger.info("item_2:{}", item)) + .map(String::length) + .doOnNext(item -> logger.info("length_2:{}", item)) + .subscribe(); + } + + private static Observable justExample() { + return Observable.just("one", "two", "three"); + } + + private static Observable deferExample() { + return Observable.defer(() -> { + logger.info("creating new Observable"); + return Observable.just("one", "two", "three"); + }); + } + + private static Observable createExample() { + return Observable.create(emitter -> { + emitter.onNext("one"); + emitter.onNext("two"); + + emitter.onError(new RuntimeException("Error!")); + + emitter.onNext("three"); + emitter.onComplete(); + }); + } +} diff --git a/2022-11/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java b/2022-11/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java new file mode 100644 index 00000000..dab87855 --- /dev/null +++ b/2022-11/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java @@ -0,0 +1,44 @@ +package ru.otus; + +import io.reactivex.Emitter; +import io.reactivex.Observable; +import io.reactivex.ObservableTransformer; + +import java.time.LocalDate; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OperatorsExample { + private static final Logger logger = LoggerFactory.getLogger(OperatorsExample.class); + + public static void main(String[] args) { + var persons = List.of( + new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)), + new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23)), + new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)), + new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30))); + + var disposable = Observable.fromIterable(persons) + .filter(person -> person.getBirth().isAfter(LocalDate.of(1990, 1, 1))) + .map(p -> p.getFirstName() + " " + p.getLastName()) + .toList() + .subscribe(item -> logger.info("item: {}", item)); + + logger.info("disposable.isDisposed:{}", disposable.isDisposed()); + + var disposableTransform =Observable.fromIterable(persons) + .compose(filterAndUpperCase()) + .subscribe(val -> logger.info("value:{}", val)); + + logger.info("disposable.isDisposed:{}", disposableTransform.isDisposed()); + } + + private static ObservableTransformer filterAndUpperCase() { + return upstream -> upstream + .map(Person::getFirstName) + .filter(s -> s.length() >= 4) + .map(String::toUpperCase); + } +} diff --git a/2022-11/spring-20-rxjava/src/main/java/ru/otus/Person.java b/2022-11/spring-20-rxjava/src/main/java/ru/otus/Person.java new file mode 100644 index 00000000..72d3f520 --- /dev/null +++ b/2022-11/spring-20-rxjava/src/main/java/ru/otus/Person.java @@ -0,0 +1,66 @@ +package ru.otus; + +import java.time.LocalDate; +import java.util.Objects; + +public class Person { + private String firstName; + private String lastName; + private String gender; + private LocalDate birth; + + public Person(String firstName, String lastName, String gender, LocalDate birth) { + this.firstName = firstName; + this.lastName = lastName; + this.gender = gender; + this.birth = birth; + } + + public String getFirstName() { + return firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + public String getLastName() { + return lastName; + } + + public void setLastName(String lastName) { + this.lastName = lastName; + } + + public String getGender() { + return gender; + } + + public void setGender(String gender) { + this.gender = gender; + } + + public LocalDate getBirth() { + return birth; + } + + public void setBirth(LocalDate birth) { + this.birth = birth; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Person person = (Person) o; + return Objects.equals(firstName, person.firstName) && + Objects.equals(lastName, person.lastName) && + Objects.equals(gender, person.gender) && + Objects.equals(birth, person.birth); + } + + @Override + public int hashCode() { + return Objects.hash(firstName, lastName, gender, birth); + } +} diff --git a/2022-11/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java b/2022-11/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java new file mode 100644 index 00000000..7e535c9b --- /dev/null +++ b/2022-11/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java @@ -0,0 +1,51 @@ +package ru.otus; + +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PublisherExample { + private static final Logger logger = LoggerFactory.getLogger(PublisherExample.class); + + public static void main(String[] args) throws Exception { + publisherExample(); + } + + public static void publisherExample() throws InterruptedException { + Observable ob = magicPublisher(); + logger.info("First subscribed"); + var disposable1 = ob.subscribe(item -> logger.info("item: {}", item)); + + logger.info("disposable1.isDisposed:{}", disposable1.isDisposed()); + Thread.sleep(5000); + + logger.info("Second subscribed"); + var disposable2 = ob.subscribe(item -> logger.info("item: {}", item)); + + + logger.info("disposable2.isDisposed():{}", disposable2.isDisposed()); + + Thread.sleep(60_000); + } + + public static Observable magicPublisher() { + Scheduler scheduler = Schedulers.newThread(); + PublishSubject subject = PublishSubject.create(); + var obs = Observable + .generate(() -> 0, (prev, emitter) -> { + var nextVal = prev + 1; + emitter.onNext(nextVal); + return nextVal; + }) + .subscribeOn(scheduler) + .concatMap(i-> Observable.just(i).delay(2, TimeUnit.SECONDS)) + .map(String::valueOf); + + obs.subscribe(subject); + return subject; + } +}