diff --git a/2024-01/spring-21-webflux/.gitignore b/2024-01/spring-21-webflux/.gitignore
new file mode 100644
index 00000000..e62c33c2
--- /dev/null
+++ b/2024-01/spring-21-webflux/.gitignore
@@ -0,0 +1,4 @@
+.idea/
+*.iml
+
+target/
diff --git a/2024-01/spring-21-webflux/HttpRequests.http b/2024-01/spring-21-webflux/HttpRequests.http
new file mode 100644
index 00000000..962c235e
--- /dev/null
+++ b/2024-01/spring-21-webflux/HttpRequests.http
@@ -0,0 +1,47 @@
+###
+GET http://localhost:8080/flux/one
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET http://localhost:8080/flux/ten
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET http://localhost:8080/stream
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET http://localhost:8080/person
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET curl
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET http://localhost:8080/func/person?name=Lermontov
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET http://localhost:8080/func/person?age=22
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
+
+###
+GET http://localhost:8080/func/person/1
+Accept: */*
+Content-Type: application/json
+Cache-Control: no-cache
diff --git a/2024-01/spring-21-webflux/docker/runDb.src b/2024-01/spring-21-webflux/docker/runDb.src
new file mode 100755
index 00000000..aa76bbc6
--- /dev/null
+++ b/2024-01/spring-21-webflux/docker/runDb.src
@@ -0,0 +1,6 @@
+docker run --rm --name pg-docker \
+-e POSTGRES_PASSWORD=pwd \
+-e POSTGRES_USER=usr \
+-e POSTGRES_DB=demoDB \
+-p 5430:5432 \
+postgres:13
\ No newline at end of file
diff --git a/2024-01/spring-21-webflux/pom.xml b/2024-01/spring-21-webflux/pom.xml
new file mode 100644
index 00000000..db629d58
--- /dev/null
+++ b/2024-01/spring-21-webflux/pom.xml
@@ -0,0 +1,82 @@
+
+
+ 4.0.0
+
+ ru.otus
+ spring-21-webflux
+ 1.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 3.1.0
+
+
+
+ 17
+ 17
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jdbc
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-r2dbc
+
+
+
+ org.flywaydb
+ flyway-core
+
+
+
+ io.r2dbc
+ r2dbc-postgresql
+ 0.8.13.RELEASE
+
+
+
+ org.postgresql
+ postgresql
+ 42.6.0
+
+
+
+ org.jetbrains
+ annotations
+ 24.0.1
+
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/DataFiller.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/DataFiller.java
new file mode 100644
index 00000000..6fd45d59
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/DataFiller.java
@@ -0,0 +1,52 @@
+package ru.otus.spring;
+
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+import reactor.core.scheduler.Scheduler;
+import ru.otus.spring.domain.Notes;
+import ru.otus.spring.domain.Person;
+import ru.otus.spring.repository.NotesRepository;
+import ru.otus.spring.repository.PersonRepository;
+import ru.otus.spring.repository.PersonRepositoryCustom;
+
+@Component
+public class DataFiller implements ApplicationRunner {
+ private static final Logger logger = LoggerFactory.getLogger(DataFiller.class);
+
+ private final PersonRepository personRepository;
+ private final NotesRepository notesRepository;
+ private final PersonRepositoryCustom personRepositoryCustom;
+ private final Scheduler workerPool;
+
+ public DataFiller(PersonRepository personRepository, NotesRepository notesRepository, PersonRepositoryCustom personRepositoryCustom, Scheduler workerPool) {
+ this.personRepository = personRepository;
+ this.notesRepository = notesRepository;
+ this.workerPool = workerPool;
+ this.personRepositoryCustom = personRepositoryCustom;
+ }
+
+ @Override
+ public void run(ApplicationArguments args) {
+ personRepository.saveAll(Arrays.asList(
+ new Person("Pushkin", 22),
+ new Person("Lermontov", 22),
+ new Person("Tolstoy", 60)
+ )).publishOn(workerPool)
+ .subscribe(savedPerson -> {
+ logger.info("saved person:{}", savedPerson);
+ notesRepository.saveAll(Arrays.asList(
+ new Notes(null, "txt_1_" + savedPerson.getId(), savedPerson.getId()),
+ new Notes(null, "txt_2_" + savedPerson.getId(), savedPerson.getId())))
+ .publishOn(workerPool)
+ .subscribe(savedNotes -> logger.info("saved notes:{}", savedNotes));
+ });
+
+ personRepositoryCustom.findAll()
+ .publishOn(workerPool)
+ .subscribe(personDto -> logger.info("personDto:{}", personDto));
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/FunctionalEndpointsConfig.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/FunctionalEndpointsConfig.java
new file mode 100644
index 00000000..8ef166d9
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/FunctionalEndpointsConfig.java
@@ -0,0 +1,69 @@
+package ru.otus.spring;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+import ru.otus.spring.domain.Person;
+import ru.otus.spring.repository.PersonRepository;
+
+import static org.springframework.http.MediaType.APPLICATION_JSON;
+import static org.springframework.web.reactive.function.BodyInserters.fromValue;
+import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
+import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
+import static org.springframework.web.reactive.function.server.RequestPredicates.queryParam;
+import static org.springframework.web.reactive.function.server.RouterFunctions.route;
+import static org.springframework.web.reactive.function.server.ServerResponse.badRequest;
+import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
+import static org.springframework.web.reactive.function.server.ServerResponse.ok;
+
+@Configuration
+public class FunctionalEndpointsConfig {
+ @Bean
+ public RouterFunction composedRoutes(PersonRepository repository) {
+ return route()
+ // эта функция должна стоять раньше findAll - порядок следования роутов - важен
+ .GET("/func/person",
+ queryParam("name", StringUtils::isNotEmpty),
+ request -> request.queryParam("name")
+ .map(name -> ok().body(repository.findAllByLastName(name), Person.class))
+ .orElse(badRequest().build())
+ )
+ // пример другой реализации - начиная с запроса репозитория
+ .GET("/func/person", queryParam("age", StringUtils::isNotEmpty),
+ request ->
+ ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(repository.findAllByAge(request.queryParam("age")
+ .map(Integer::parseInt)
+ .orElseThrow(IllegalArgumentException::new)), Person.class)
+ )
+ // Обратите внимание на использование хэндлера
+ .GET("/func/person", accept(APPLICATION_JSON), new PersonHandler(repository)::list)
+ // Обратите внимание на использование pathVariable
+ .GET("/func/person/{id}", accept(APPLICATION_JSON),
+ request -> repository.findById(Long.parseLong(request.pathVariable("id")))
+ .flatMap(person -> ok().contentType(APPLICATION_JSON).body(fromValue(person)))
+ .switchIfEmpty(notFound().build())
+ ).build();
+ }
+
+ // Это пример хэндлера, который даже не бин
+ static class PersonHandler {
+
+ private final PersonRepository repository;
+
+ PersonHandler(PersonRepository repository) {
+ this.repository = repository;
+ }
+
+ Mono list(ServerRequest request) {
+ // Обратите внимание на пример другого порядка создания response от Flux
+ return ok().contentType(APPLICATION_JSON).body(repository.findAll(), Person.class);
+ }
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/WebfluxDemo.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/WebfluxDemo.java
new file mode 100644
index 00000000..5bd95469
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/WebfluxDemo.java
@@ -0,0 +1,16 @@
+package ru.otus.spring;
+
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class WebfluxDemo {
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(WebfluxDemo.class);
+ }
+}
+
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/config/ApplConfig.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/config/ApplConfig.java
new file mode 100644
index 00000000..9813876b
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/config/ApplConfig.java
@@ -0,0 +1,39 @@
+package ru.otus.spring.config;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
+import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.annotation.NonNull;
+
+@Configuration
+public class ApplConfig {
+ private static final int THREAD_POOL_SIZE = 2;
+
+ @Bean
+ public ReactiveWebServerFactory reactiveWebServerFactory() {
+ var eventLoopGroup = new NioEventLoopGroup(THREAD_POOL_SIZE,
+ new ThreadFactory() {
+ private final AtomicLong threadIdGenerator = new AtomicLong(0);
+ @Override
+ public Thread newThread(@NonNull Runnable task) {
+ return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet());
+ }
+ });
+
+ var factory = new NettyReactiveWebServerFactory();
+ factory.addServerCustomizers(builder -> builder.runOn(eventLoopGroup));
+
+ return factory;
+ }
+
+ @Bean
+ public Scheduler workerPool() {
+ return Schedulers.newParallel("worker-thread", THREAD_POOL_SIZE);
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/Notes.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/Notes.java
new file mode 100644
index 00000000..037cefa4
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/Notes.java
@@ -0,0 +1,49 @@
+package ru.otus.spring.domain;
+
+import org.jetbrains.annotations.NotNull;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.annotation.PersistenceCreator;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Table("notes")
+public class Notes {
+
+ @Id
+ private final Long id;
+
+ @NotNull
+ private final String noteText;
+
+ @NotNull
+ private final Long personId;
+
+ @PersistenceCreator
+ public Notes(Long id, @NotNull String noteText, @NotNull Long personId) {
+ this.id = id;
+ this.noteText = noteText;
+ this.personId = personId;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ @NotNull
+ public String getNoteText() {
+ return noteText;
+ }
+
+ @NotNull
+ public Long getPersonId() {
+ return personId;
+ }
+
+ @Override
+ public String toString() {
+ return "Notes{" +
+ "id=" + id +
+ ", noteText='" + noteText + '\'' +
+ ", personId=" + personId +
+ '}';
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/Person.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/Person.java
new file mode 100644
index 00000000..9721a730
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/Person.java
@@ -0,0 +1,53 @@
+package ru.otus.spring.domain;
+
+import org.jetbrains.annotations.NotNull;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.annotation.PersistenceCreator;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Table("person")
+public class Person {
+
+ @Id
+ private final Long id;
+
+ @NotNull
+ private final String lastName;
+
+ private final int age;
+
+
+ @PersistenceCreator
+ private Person(Long id, @NotNull String lastName, int age) {
+ this.id = id;
+ this.lastName = lastName;
+ this.age = age;
+ }
+
+ public Person(String lastName, int age) {
+ this(null, lastName, age);
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+
+ public @NotNull String getLastName() {
+ return lastName;
+ }
+
+
+ public int getAge() {
+ return age;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" +
+ "id=" + id +
+ ", lastName='" + lastName + '\'' +
+ ", age=" + age +
+ '}';
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/PersonDto.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/PersonDto.java
new file mode 100644
index 00000000..08e8383f
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/domain/PersonDto.java
@@ -0,0 +1,8 @@
+package ru.otus.spring.domain;
+
+
+import java.util.List;
+
+public record PersonDto(String id, String name, Integer age, List notes) {
+
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/NotesRepository.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/NotesRepository.java
new file mode 100644
index 00000000..38145043
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/NotesRepository.java
@@ -0,0 +1,12 @@
+package ru.otus.spring.repository;
+
+import org.jetbrains.annotations.NotNull;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import reactor.core.publisher.Flux;
+import ru.otus.spring.domain.Notes;
+
+
+public interface NotesRepository extends ReactiveCrudRepository {
+
+ Flux findByPersonId(@NotNull Long personId);
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/PersonRepository.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/PersonRepository.java
new file mode 100644
index 00000000..d420f092
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/PersonRepository.java
@@ -0,0 +1,18 @@
+package ru.otus.spring.repository;
+
+import org.jetbrains.annotations.NotNull;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import ru.otus.spring.domain.Person;
+
+public interface PersonRepository extends ReactiveCrudRepository {
+
+ @NotNull Mono findById(@NotNull Long id);
+
+ Mono save(Mono person);
+
+ Flux findAllByLastName(String lastName);
+
+ Flux findAllByAge(int age);
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/PersonRepositoryCustom.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/PersonRepositoryCustom.java
new file mode 100644
index 00000000..993eb822
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/repository/PersonRepositoryCustom.java
@@ -0,0 +1,55 @@
+package ru.otus.spring.repository;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.r2dbc.spi.Readable;
+import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
+import org.springframework.stereotype.Repository;
+import reactor.core.publisher.Flux;
+import ru.otus.spring.domain.PersonDto;
+
+import java.util.List;
+
+
+@Repository
+public class PersonRepositoryCustom {
+
+ private final R2dbcEntityTemplate template;
+ private final ObjectMapper objectMapper;
+
+ private static final String SQL_ALL = """
+ select json_agg(n.note_text) as notes, n.person_id,
+ p.last_name, p.age
+ from notes n
+ inner join person p
+ on n.person_id = p.id
+ group by n.person_id, p.last_name, p.age
+ """;
+
+ public PersonRepositoryCustom(R2dbcEntityTemplate template, ObjectMapper objectMapper) {
+ this.template = template;
+ this.objectMapper = objectMapper;
+ }
+
+ public Flux findAll() {
+ return template.getDatabaseClient().inConnectionMany(connection ->
+ Flux.from(connection.createStatement(SQL_ALL)
+ .execute())
+ .flatMap(result -> result.map(this::mapper)));
+ }
+
+ private PersonDto mapper(Readable selectedRecord) {
+ var notesAsText = selectedRecord.get("notes", String.class);
+ try {
+ List notes = objectMapper.readValue(notesAsText, new TypeReference<>() {
+ });
+ return new PersonDto(selectedRecord.get("person_id", String.class),
+ selectedRecord.get("last_name", String.class),
+ selectedRecord.get("age", Integer.class),
+ notes);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("notes:" + notesAsText + " parsing error:" + e);
+ }
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/rest/AnnotatedController.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/rest/AnnotatedController.java
new file mode 100644
index 00000000..b37dcd86
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/rest/AnnotatedController.java
@@ -0,0 +1,49 @@
+package ru.otus.spring.rest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import reactor.core.scheduler.Scheduler;
+
+
+@RestController
+public class AnnotatedController {
+ private static final Logger logger = LoggerFactory.getLogger(AnnotatedController.class);
+
+ private final Scheduler workerPool;
+
+ public AnnotatedController(Scheduler workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ @GetMapping("/flux/one")
+ public Mono one() {
+ return Mono.just("one");
+ }
+
+ @GetMapping(path ="/flux/ten", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux list() {
+ logger.info("list request");
+ return Flux.range(1, 10)
+ .delayElements(Duration.ofSeconds(1), workerPool)
+ .doOnNext(val -> logger.info("value:{}", val));
+ }
+
+ @GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux stream() {
+ logger.info("stream");
+ return Flux.generate(() -> 0, (state, emitter) -> {
+ emitter.next(state);
+ return state + 1;
+ })
+ .delayElements(Duration.ofSeconds(1L))
+ .map(Object::toString)
+ .map(val -> String.format("valStr:%s", val));
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/rest/PersonController.java b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/rest/PersonController.java
new file mode 100644
index 00000000..8518d2e9
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/java/ru/otus/spring/rest/PersonController.java
@@ -0,0 +1,57 @@
+package ru.otus.spring.rest;
+
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import ru.otus.spring.domain.Notes;
+import ru.otus.spring.domain.Person;
+import ru.otus.spring.domain.PersonDto;
+import ru.otus.spring.repository.NotesRepository;
+import ru.otus.spring.repository.PersonRepository;
+import ru.otus.spring.repository.PersonRepositoryCustom;
+
+import java.util.List;
+
+@RestController
+public class PersonController {
+
+ private final PersonRepository personRepository;
+ private final NotesRepository notesRepository;
+
+ private final PersonRepositoryCustom personRepositoryCustom;
+
+ public PersonController(PersonRepository personRepository, NotesRepository notesRepository, PersonRepositoryCustom personRepositoryCustom) {
+ this.personRepository = personRepository;
+ this.notesRepository = notesRepository;
+ this.personRepositoryCustom = personRepositoryCustom;
+ }
+
+ @GetMapping("/person")
+ public Flux all() {
+ return personRepositoryCustom.findAll();
+ }
+
+ @GetMapping("/person/{id}")
+ public Mono> byId(@PathVariable("id") Long id) {
+ return personRepository.findById(id)
+ .flatMap(person -> notesRepository.findByPersonId(person.getId()).map(Notes::getNoteText).collectList()
+ .map(notes -> toDto(person, notes)))
+ .map(ResponseEntity::ok)
+ .switchIfEmpty(Mono.fromCallable(() -> ResponseEntity.notFound().build()));
+ }
+
+ @PostMapping("/person")
+ public Mono save(@RequestBody Mono dto) {
+ return personRepository.save(dto);
+ }
+
+ @GetMapping("/person/find")
+ public Flux byName(@RequestParam("name") String name) {
+ return personRepository.findAllByLastName(name);
+ }
+
+ private PersonDto toDto(Person person, List notes) {
+ return new PersonDto(String.valueOf(person.getId()), person.getLastName(), person.getAge(), notes);
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/main/resources/application.yml b/2024-01/spring-21-webflux/src/main/resources/application.yml
new file mode 100644
index 00000000..5935fcc0
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/resources/application.yml
@@ -0,0 +1,17 @@
+server:
+ port: 8080
+
+spring:
+ r2dbc:
+ url: r2dbc:postgresql://localhost:5430/demoDB
+ username: usr
+ password: pwd
+ flyway:
+ url: jdbc:postgresql://localhost:5430/demoDB
+ user: usr
+ password: pwd
+
+
+logging:
+ level:
+ org.springframework.jdbc.core.JdbcTemplate: TRACE
diff --git a/2024-01/spring-21-webflux/src/main/resources/db/migration/V1__initial_schema.sql b/2024-01/spring-21-webflux/src/main/resources/db/migration/V1__initial_schema.sql
new file mode 100644
index 00000000..4f0a4eb0
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/resources/db/migration/V1__initial_schema.sql
@@ -0,0 +1,15 @@
+create table person
+(
+ id bigserial not null primary key,
+ last_name varchar(50) not null,
+ age int not null
+);
+
+create table notes
+(
+ id bigserial not null primary key,
+ note_text varchar(250) not null,
+ person_id bigint not null references person (id)
+);
+create index idx_notes_person_id on notes (person_id);
+
diff --git a/2024-01/spring-21-webflux/src/main/resources/logback.xml b/2024-01/spring-21-webflux/src/main/resources/logback.xml
new file mode 100644
index 00000000..b1f9bfe2
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/main/resources/logback.xml
@@ -0,0 +1,11 @@
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
diff --git a/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/repository/PersonRepositoryTest.java b/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/repository/PersonRepositoryTest.java
new file mode 100644
index 00000000..5ed22cc5
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/repository/PersonRepositoryTest.java
@@ -0,0 +1,29 @@
+package ru.otus.spring.repository;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
+import org.springframework.boot.test.context.SpringBootTest;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import ru.otus.spring.domain.Person;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@SpringBootTest
+class PersonRepositoryTest {
+
+ @Autowired
+ private PersonRepository repository;
+
+ @Test
+ void shouldSetIdOnSave() {
+ Mono personMono = repository.save(new Person("Bill", 12));
+
+ StepVerifier
+ .create(personMono)
+ .assertNext(person -> assertNotNull(person.getId()))
+ .expectComplete()
+ .verify();
+ }
+}
diff --git a/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/rest/AnnotatedControllerTest.java b/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/rest/AnnotatedControllerTest.java
new file mode 100644
index 00000000..95eb4d75
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/rest/AnnotatedControllerTest.java
@@ -0,0 +1,96 @@
+package ru.otus.spring.rest;
+
+import java.time.Duration;
+import java.util.List;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.test.StepVerifier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+class AnnotatedControllerTest {
+
+ @Autowired
+ private WebTestClient webTestClient;
+ @LocalServerPort
+ private int port;
+
+
+ @Test
+ void oneTest() {
+ //given
+ var client = WebClient.create(String.format("http://localhost:%d", port));
+
+ //when
+ var result = client
+ .get().uri("/flux/one")
+ .accept(MediaType.APPLICATION_JSON)
+ .retrieve()
+ .bodyToMono(String.class)
+ .timeout(Duration.ofSeconds(3))
+ .block();
+
+ //then
+ assertThat(result).isEqualTo("one");
+ }
+
+ @Test
+ void streamTest() {
+ //given
+ var client = WebClient.create(String.format("http://localhost:%d", port));
+ var expectedSize = 5;
+
+ //when
+ List result = client
+ .get().uri("/stream")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(String.class)
+ .take(expectedSize)
+ .timeout(Duration.ofSeconds(3))
+ .collectList()
+ .block();
+
+ //then
+ assertThat(result).hasSize(expectedSize)
+ .contains(String.format("valStr:%s", 0),
+ String.format("valStr:%s", 1),
+ String.format("valStr:%s", 2),
+ String.format("valStr:%s", 3),
+ String.format("valStr:%s", 4));
+ }
+
+ @Test
+ void dataTest() {
+ //given
+ var webTestClientForTest = webTestClient.mutate()
+ .responseTimeout(Duration.ofSeconds(20))
+ .build();
+
+ //when
+ var result = webTestClientForTest
+ .get().uri("/flux/ten")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .exchange()
+ .expectStatus().isOk()
+ .returnResult(Integer.class)
+ .getResponseBody();
+
+ //then
+ var step = StepVerifier.create(result);
+ StepVerifier.Step stepResult = null;
+ for (var idx = 1; idx <= 10; idx++) {
+ stepResult = step.expectNext(idx);
+ }
+ stepResult.verifyComplete();
+ }
+
+}
\ No newline at end of file
diff --git a/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/rest/PersonControllerTest.java b/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/rest/PersonControllerTest.java
new file mode 100644
index 00000000..71a31505
--- /dev/null
+++ b/2024-01/spring-21-webflux/src/test/java/ru/otus/spring/rest/PersonControllerTest.java
@@ -0,0 +1,28 @@
+package ru.otus.spring.rest;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@SpringBootTest
+class PersonControllerTest {
+
+ @Autowired
+ private RouterFunction route;
+
+ @Test
+ void testRoute() {
+ WebTestClient client = WebTestClient
+ .bindToRouterFunction(route)
+ .build();
+
+ client.get()
+ .uri("/func/person")
+ .exchange()
+ .expectStatus()
+ .isOk();
+ }
+}