diff --git a/2022-08/spring-20-rxjava/.gitignore b/2022-08/spring-20-rxjava/.gitignore
new file mode 100644
index 00000000..fbe7a1ed
--- /dev/null
+++ b/2022-08/spring-20-rxjava/.gitignore
@@ -0,0 +1,7 @@
+.idea/
+*.iml
+
+target/
+
+/node_modules
+/output
diff --git a/2022-08/spring-20-rxjava/pom.xml b/2022-08/spring-20-rxjava/pom.xml
new file mode 100644
index 00000000..c43ac59b
--- /dev/null
+++ b/2022-08/spring-20-rxjava/pom.xml
@@ -0,0 +1,39 @@
+
+
+ 4.0.0
+
+ ru.otus
+ spring-20-rxjava
+ 1.0-SNAPSHOT
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.4.RELEASE
+
+
+
+ 11
+ 11
+
+
+
+
+ io.reactivex.rxjava2
+ rxjava
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.11
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+
+
+
diff --git a/2022-08/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java b/2022-08/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java
new file mode 100644
index 00000000..4fc2cb78
--- /dev/null
+++ b/2022-08/spring-20-rxjava/src/main/java/ru/otus/CreateExample.java
@@ -0,0 +1,75 @@
+package ru.otus;
+
+import io.reactivex.Observable;
+import io.reactivex.disposables.Disposable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CreateExample {
+ private static final Logger logger = LoggerFactory.getLogger(CreateExample.class);
+
+ public static void main(String[] args) {
+ //onEachNext();
+ //lazyObservable();
+ //creatorExample();
+ }
+
+ private static void creatorExample() {
+ Observable obs = createExample();
+
+ obs.onExceptionResumeNext(Observable.just("r1", "r2", "r3"))
+ .doOnNext(item -> logger.info("item__1:{}", item))
+ .subscribe();
+
+ logger.info("---------------");
+
+ Disposable disposable = obs.doOnNext(item -> logger.info("item__2:{}", item))
+ .subscribe(next -> logger.info("next:{}", next),
+ error -> logger.info("error:{}", error.getMessage()),
+ () -> logger.info("onComplete"));
+
+ logger.info("isDisposed:{}", disposable.isDisposed());
+ }
+
+ private static void lazyObservable() {
+ Observable obs = deferExample();
+
+ obs.doOnNext(item -> logger.info("item_1:{}", item))
+ .subscribe();
+ obs.doOnNext(item -> logger.info("item_2:{}", item))
+ .subscribe();
+ }
+
+ private static void onEachNext() {
+ Observable obs = justExample();
+ obs.doOnEach(item -> logger.info("item_1:{}", item.getValue()))
+ .subscribe();
+ obs.doOnNext(item -> logger.info("item_2:{}", item))
+ .map(String::length)
+ .doOnNext(item -> logger.info("length_2:{}", item))
+ .subscribe();
+ }
+
+ private static Observable justExample() {
+ return Observable.just("one", "two", "three");
+ }
+
+ private static Observable deferExample() {
+ return Observable.defer(() -> {
+ logger.info("creating new Observable");
+ return Observable.just("one", "two", "three");
+ });
+ }
+
+ private static Observable createExample() {
+ return Observable.create(emitter -> {
+ emitter.onNext("one");
+ emitter.onNext("two");
+
+ emitter.onError(new RuntimeException("Error!"));
+
+ emitter.onNext("three");
+ emitter.onComplete();
+ });
+ }
+}
diff --git a/2022-08/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java b/2022-08/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java
new file mode 100644
index 00000000..dab87855
--- /dev/null
+++ b/2022-08/spring-20-rxjava/src/main/java/ru/otus/OperatorsExample.java
@@ -0,0 +1,44 @@
+package ru.otus;
+
+import io.reactivex.Emitter;
+import io.reactivex.Observable;
+import io.reactivex.ObservableTransformer;
+
+import java.time.LocalDate;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OperatorsExample {
+ private static final Logger logger = LoggerFactory.getLogger(OperatorsExample.class);
+
+ public static void main(String[] args) {
+ var persons = List.of(
+ new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)),
+ new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23)),
+ new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)),
+ new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30)));
+
+ var disposable = Observable.fromIterable(persons)
+ .filter(person -> person.getBirth().isAfter(LocalDate.of(1990, 1, 1)))
+ .map(p -> p.getFirstName() + " " + p.getLastName())
+ .toList()
+ .subscribe(item -> logger.info("item: {}", item));
+
+ logger.info("disposable.isDisposed:{}", disposable.isDisposed());
+
+ var disposableTransform =Observable.fromIterable(persons)
+ .compose(filterAndUpperCase())
+ .subscribe(val -> logger.info("value:{}", val));
+
+ logger.info("disposable.isDisposed:{}", disposableTransform.isDisposed());
+ }
+
+ private static ObservableTransformer filterAndUpperCase() {
+ return upstream -> upstream
+ .map(Person::getFirstName)
+ .filter(s -> s.length() >= 4)
+ .map(String::toUpperCase);
+ }
+}
diff --git a/2022-08/spring-20-rxjava/src/main/java/ru/otus/Person.java b/2022-08/spring-20-rxjava/src/main/java/ru/otus/Person.java
new file mode 100644
index 00000000..72d3f520
--- /dev/null
+++ b/2022-08/spring-20-rxjava/src/main/java/ru/otus/Person.java
@@ -0,0 +1,66 @@
+package ru.otus;
+
+import java.time.LocalDate;
+import java.util.Objects;
+
+public class Person {
+ private String firstName;
+ private String lastName;
+ private String gender;
+ private LocalDate birth;
+
+ public Person(String firstName, String lastName, String gender, LocalDate birth) {
+ this.firstName = firstName;
+ this.lastName = lastName;
+ this.gender = gender;
+ this.birth = birth;
+ }
+
+ public String getFirstName() {
+ return firstName;
+ }
+
+ public void setFirstName(String firstName) {
+ this.firstName = firstName;
+ }
+
+ public String getLastName() {
+ return lastName;
+ }
+
+ public void setLastName(String lastName) {
+ this.lastName = lastName;
+ }
+
+ public String getGender() {
+ return gender;
+ }
+
+ public void setGender(String gender) {
+ this.gender = gender;
+ }
+
+ public LocalDate getBirth() {
+ return birth;
+ }
+
+ public void setBirth(LocalDate birth) {
+ this.birth = birth;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Person person = (Person) o;
+ return Objects.equals(firstName, person.firstName) &&
+ Objects.equals(lastName, person.lastName) &&
+ Objects.equals(gender, person.gender) &&
+ Objects.equals(birth, person.birth);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(firstName, lastName, gender, birth);
+ }
+}
diff --git a/2022-08/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java b/2022-08/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java
new file mode 100644
index 00000000..e7b99d11
--- /dev/null
+++ b/2022-08/spring-20-rxjava/src/main/java/ru/otus/PublisherExample.java
@@ -0,0 +1,69 @@
+package ru.otus;
+
+import io.reactivex.Observable;
+import io.reactivex.Scheduler;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subjects.PublishSubject;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PublisherExample {
+ private static final Logger logger = LoggerFactory.getLogger(PublisherExample.class);
+
+ public static void main(String[] args) throws Exception {
+ publisherExample();
+ }
+
+ public static void publisherExample() throws InterruptedException {
+ Observable ob = magicPublisher();
+ logger.info("First subscribed");
+ var disposable1 = ob.subscribe(item -> logger.info("item: {}", item));
+
+ logger.info("disposable1.isDisposed:{}", disposable1.isDisposed());
+ Thread.sleep(5000);
+
+ logger.info("Second subscribed");
+ var disposable2 = ob.subscribe(item -> logger.info("item: {}", item));
+
+
+ logger.info("disposable2.isDisposed():{}", disposable2.isDisposed());
+
+ Thread.sleep(60_000);
+ }
+
+ public static Observable magicPublisher() {
+// Random r = new Random(1);
+// AtomicInteger i = new AtomicInteger();
+ Scheduler scheduler = Schedulers.newThread();
+ PublishSubject subject = PublishSubject.create();
+ var obs = Observable
+ .generate(() -> 0, (prev, emitter) -> {
+ var nextVal = prev + 1;
+ emitter.onNext(nextVal);
+ return nextVal;
+ })
+ .subscribeOn(scheduler)
+ .concatMap(i-> Observable.just(i).delay(2, TimeUnit.SECONDS))
+ .map(String::valueOf);
+
+ obs.subscribe(subject);
+ return subject;
+
+// BehaviorSubject subject = BehaviorSubject.create();
+
+// AsyncSubject subject = AsyncSubject.create();
+// CompletableFuture.runAsync(() -> {
+// try {
+// Thread.sleep(7000);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+// subject.onComplete();
+// });
+
+// ReplaySubject subject = ReplaySubject.create();
+
+ }
+
+}