spring-38,39 reactor

This commit is contained in:
petrelevich
2024-11-01 15:09:18 +03:00
parent 6e6b0fdce6
commit 8947e4b1a5
28 changed files with 1078 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
.idea/
*.iml
target/
/node_modules
/output
+70
View File
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>spring-38-reactor</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2022.0.7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,74 @@
package ru.otus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
public class CreateExample {
private static final Logger logger = LoggerFactory.getLogger(CreateExample.class);
public static void main(String[] args) {
onEachNext();
lazyObservable();
creatorExample();
}
private static void onEachNext() {
Flux<String> obs = Flux.just("one", "two", "three");
obs.doFirst(() -> logger.info("Starting:"))
.doOnComplete(() -> logger.info("The end!"))
.doOnEach(item -> logger.info("item_1:{}", item.get()))
.subscribe();
logger.info("-----");
obs.doOnNext(item -> logger.info("item_2:{}", item))
.map(String::length)
.doOnNext(item -> logger.info("length_2:{}", item))
.subscribe();
}
private static void lazyObservable() {
Flux<String> obs = Flux.defer(() -> {
logger.info("creating new Observable");
return Flux.just("one", "two", "three");
});
obs.doOnNext(item -> logger.info("item_1:{}", item))
.subscribe();
logger.info("---------------");
obs.doOnNext(item -> logger.info("item_2:{}", item))
.subscribe();
}
private static void creatorExample() {
Flux<String> obs = Flux.create(emitter -> {
emitter.next("one");
emitter.next("two");
emitter.error(new RuntimeException("Error!"));
emitter.next("three");
emitter.complete();
});
obs.onErrorResume(e -> {
logger.error("error:{}", e.getMessage(), e);
return Flux.just("r1", "r2", "r3");
})
.doOnNext(item -> logger.info("item__1:{}", item))
.subscribe();
logger.info("---------------");
Disposable disposable = obs.doOnNext(item -> logger.info("item__2:{}", item))
.subscribe(next -> logger.info("next:{}", next),
error -> logger.info("error:{}", error.getMessage()),
() -> logger.info("onComplete"));
logger.info("isDisposed:{}", disposable.isDisposed());
}
}
@@ -0,0 +1,51 @@
package ru.otus;
import java.time.LocalDate;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class OperatorsExample {
private static final Logger logger = LoggerFactory.getLogger(OperatorsExample.class);
public static void main(String[] args) {
merge();
fromList();
}
private static void merge() {
var listFlux1 = Flux.fromIterable(List.of(
new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)),
new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23))));
var listFlux2 = Flux.fromIterable(List.of(
new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)),
new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30))));
var listFlux3 = Flux.fromIterable(List.of(
new Person("Ivan", "Petrov", "male", LocalDate.of(1890, 2, 10)),
new Person("Joanne", "Stuard", "female", LocalDate.of(1965, 1, 3))));
Flux.merge(listFlux1, listFlux2, listFlux3)
.subscribe(person -> logger.info("person:{}", person));
}
private static void fromList() {
var persons = List.of(
new Person("John", "Dow", "male", LocalDate.of(1992, 3, 12)),
new Person("Jane", "Dow", "female", LocalDate.of(2001, 6, 23)),
new Person("Howard", "Lovecraft", "male", LocalDate.of(1890, 8, 20)),
new Person("Joanne", "Rowling", "female", LocalDate.of(1965, 6, 30)));
var disposable = Flux.fromIterable(persons)
.filter(person -> person.birth().isAfter(LocalDate.of(1990, 1, 1)))
.map(p -> p.firstName() + " " + p.lastName())
.collectList()
.subscribe(item -> logger.info("item: {}", item));
logger.info("disposable.isDisposed:{}", disposable.isDisposed());
}
}
@@ -0,0 +1,6 @@
package ru.otus;
import java.time.LocalDate;
public record Person(String firstName, String lastName, String gender, LocalDate birth) {
}
@@ -0,0 +1,56 @@
package ru.otus;
import java.time.Duration;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Schedulers;
public class PublisherExample {
private static final Logger logger = LoggerFactory.getLogger(PublisherExample.class);
public static void main(String[] args) throws Exception {
publisherExample();
}
public static void publisherExample() throws InterruptedException {
Flux<String> ob = magicPublisher();
Thread.sleep(5000);
logger.info("First subscribed");
var disposable1 = ob.subscribe(item -> logger.info("item: {}", item));
logger.info("disposable1.isDisposed:{}", disposable1.isDisposed());
Thread.sleep(5000);
logger.info("Second subscribed");
var disposable2 = ob.subscribe(item -> logger.info("item second: {}", item));
logger.info("disposable2.isDisposed():{}", disposable2.isDisposed());
Thread.sleep(60_000);
}
public static Flux<String> magicPublisher() {
var schedulerGenerator = Schedulers.newParallel("generator", 1);
var generator = Flux.generate(
() -> 0L,
(BiFunction<Long, SynchronousSink<Long>, Long>)
(prev, sink) -> {
var newValue = prev + 1;
sink.next(newValue);
logger.info("newValue:{}", newValue);
return newValue;
})
.delayElements(Duration.ofSeconds(5), schedulerGenerator)
.map(id -> "new id:" + id)
.doOnNext(val -> logger.info("val:{}", val));
ConnectableFlux<String> generatorConnectable = generator.publish();
return generatorConnectable.autoConnect(0);
}
}
+4
View File
@@ -0,0 +1,4 @@
.idea/
*.iml
target/
@@ -0,0 +1,47 @@
###
GET http://localhost:8080/flux/one
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/flux/ten
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/stream
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/person
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET curl
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/func/person?name=Lermontov
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/func/person?age=22
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/func/person/1
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
+6
View File
@@ -0,0 +1,6 @@
docker run --rm --name pg-docker \
-e POSTGRES_PASSWORD=pwd \
-e POSTGRES_USER=usr \
-e POSTGRES_DB=demoDB \
-p 5430:5432 \
postgres:16
+82
View File
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>spring-40-webflux</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>24.0.1</version>
</dependency>
<!-- Тестирование -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,52 @@
package ru.otus.spring;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.scheduler.Scheduler;
import ru.otus.spring.domain.Notes;
import ru.otus.spring.domain.Person;
import ru.otus.spring.repository.NotesRepository;
import ru.otus.spring.repository.PersonRepository;
import ru.otus.spring.repository.PersonRepositoryCustom;
@Component
public class DataFiller implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(DataFiller.class);
private final PersonRepository personRepository;
private final NotesRepository notesRepository;
private final PersonRepositoryCustom personRepositoryCustom;
private final Scheduler workerPool;
public DataFiller(PersonRepository personRepository, NotesRepository notesRepository, PersonRepositoryCustom personRepositoryCustom, Scheduler workerPool) {
this.personRepository = personRepository;
this.notesRepository = notesRepository;
this.workerPool = workerPool;
this.personRepositoryCustom = personRepositoryCustom;
}
@Override
public void run(ApplicationArguments args) {
personRepository.saveAll(Arrays.asList(
new Person("Pushkin", 22),
new Person("Lermontov", 22),
new Person("Tolstoy", 60)
)).publishOn(workerPool)
.subscribe(savedPerson -> {
logger.info("saved person:{}", savedPerson);
notesRepository.saveAll(Arrays.asList(
new Notes(null, "txt_1_" + savedPerson.getId(), savedPerson.getId()),
new Notes(null, "txt_2_" + savedPerson.getId(), savedPerson.getId())))
.publishOn(workerPool)
.subscribe(savedNotes -> logger.info("saved notes:{}", savedNotes));
});
personRepositoryCustom.findAll()
.publishOn(workerPool)
.subscribe(personDto -> logger.info("personDto:{}", personDto));
}
}
@@ -0,0 +1,69 @@
package ru.otus.spring;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import ru.otus.spring.domain.Person;
import ru.otus.spring.repository.PersonRepository;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.BodyInserters.fromValue;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.queryParam;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.badRequest;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Configuration
public class FunctionalEndpointsConfig {
@Bean
public RouterFunction<ServerResponse> composedRoutes(PersonRepository repository) {
return route()
// эта функция должна стоять раньше findAll - порядок следования роутов - важен
.GET("/func/person",
queryParam("name", StringUtils::isNotEmpty),
request -> request.queryParam("name")
.map(name -> ok().body(repository.findAllByLastName(name), Person.class))
.orElse(badRequest().build())
)
// пример другой реализации - начиная с запроса репозитория
.GET("/func/person", queryParam("age", StringUtils::isNotEmpty),
request ->
ok()
.contentType(MediaType.APPLICATION_JSON)
.body(repository.findAllByAge(request.queryParam("age")
.map(Integer::parseInt)
.orElseThrow(IllegalArgumentException::new)), Person.class)
)
// Обратите внимание на использование хэндлера
.GET("/func/person", accept(APPLICATION_JSON), new PersonHandler(repository)::list)
// Обратите внимание на использование pathVariable
.GET("/func/person/{id}", accept(APPLICATION_JSON),
request -> repository.findById(Long.parseLong(request.pathVariable("id")))
.flatMap(person -> ok().contentType(APPLICATION_JSON).body(fromValue(person)))
.switchIfEmpty(notFound().build())
).build();
}
// Это пример хэндлера, который даже не бин
static class PersonHandler {
private final PersonRepository repository;
PersonHandler(PersonRepository repository) {
this.repository = repository;
}
Mono<ServerResponse> list(ServerRequest request) {
// Обратите внимание на пример другого порядка создания response от Flux
return ok().contentType(APPLICATION_JSON).body(repository.findAll(), Person.class);
}
}
}
@@ -0,0 +1,16 @@
package ru.otus.spring;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WebfluxDemo {
public static void main(String[] args) {
SpringApplication.run(WebfluxDemo.class);
}
}
@@ -0,0 +1,41 @@
package ru.otus.spring.config;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
@Configuration
public class ApplConfig {
private static final int THREAD_POOL_SIZE = 2;
@Bean(destroyMethod = "close")
public NioEventLoopGroup eventLoopGroup() {
return new NioEventLoopGroup(THREAD_POOL_SIZE,
new ThreadFactory() {
private final AtomicLong threadIdGenerator = new AtomicLong(0);
@Override
public Thread newThread(@NonNull Runnable task) {
return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet());
}
});
}
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory(NioEventLoopGroup eventLoopGroup) {
var factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder.runOn(eventLoopGroup));
return factory;
}
@Bean
public Scheduler workerPool() {
return Schedulers.newParallel("worker-thread", THREAD_POOL_SIZE);
}
}
@@ -0,0 +1,49 @@
package ru.otus.spring.domain;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.PersistenceCreator;
import org.springframework.data.relational.core.mapping.Table;
@Table("notes")
public class Notes {
@Id
private final Long id;
@NotNull
private final String noteText;
@NotNull
private final Long personId;
@PersistenceCreator
public Notes(Long id, @NotNull String noteText, @NotNull Long personId) {
this.id = id;
this.noteText = noteText;
this.personId = personId;
}
public Long getId() {
return id;
}
@NotNull
public String getNoteText() {
return noteText;
}
@NotNull
public Long getPersonId() {
return personId;
}
@Override
public String toString() {
return "Notes{" +
"id=" + id +
", noteText='" + noteText + '\'' +
", personId=" + personId +
'}';
}
}
@@ -0,0 +1,53 @@
package ru.otus.spring.domain;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.PersistenceCreator;
import org.springframework.data.relational.core.mapping.Table;
@Table("person")
public class Person {
@Id
private final Long id;
@NotNull
private final String lastName;
private final int age;
@PersistenceCreator
private Person(Long id, @NotNull String lastName, int age) {
this.id = id;
this.lastName = lastName;
this.age = age;
}
public Person(String lastName, int age) {
this(null, lastName, age);
}
public Long getId() {
return id;
}
public @NotNull String getLastName() {
return lastName;
}
public int getAge() {
return age;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", lastName='" + lastName + '\'' +
", age=" + age +
'}';
}
}
@@ -0,0 +1,8 @@
package ru.otus.spring.domain;
import java.util.List;
public record PersonDto(String id, String name, Integer age, List<String> notes) {
}
@@ -0,0 +1,12 @@
package ru.otus.spring.repository;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import ru.otus.spring.domain.Notes;
public interface NotesRepository extends ReactiveCrudRepository<Notes, Long> {
Flux<Notes> findByPersonId(@NotNull Long personId);
}
@@ -0,0 +1,18 @@
package ru.otus.spring.repository;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import ru.otus.spring.domain.Person;
public interface PersonRepository extends ReactiveCrudRepository<Person, Long> {
@NotNull Mono<Person> findById(@NotNull Long id);
Mono<Person> save(Mono<Person> person);
Flux<Person> findAllByLastName(String lastName);
Flux<Person> findAllByAge(int age);
}
@@ -0,0 +1,55 @@
package ru.otus.spring.repository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.r2dbc.spi.Readable;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import ru.otus.spring.domain.PersonDto;
import java.util.List;
@Repository
public class PersonRepositoryCustom {
private final R2dbcEntityTemplate template;
private final ObjectMapper objectMapper;
private static final String SQL_ALL = """
select json_agg(n.note_text) as notes, n.person_id,
p.last_name, p.age
from notes n
inner join person p
on n.person_id = p.id
group by n.person_id, p.last_name, p.age
""";
public PersonRepositoryCustom(R2dbcEntityTemplate template, ObjectMapper objectMapper) {
this.template = template;
this.objectMapper = objectMapper;
}
public Flux<PersonDto> findAll() {
return template.getDatabaseClient().inConnectionMany(connection ->
Flux.from(connection.createStatement(SQL_ALL)
.execute())
.flatMap(result -> result.map(this::mapper)));
}
private PersonDto mapper(Readable selectedRecord) {
var notesAsText = selectedRecord.get("notes", String.class);
try {
List<String> notes = objectMapper.readValue(notesAsText, new TypeReference<>() {
});
return new PersonDto(selectedRecord.get("person_id", String.class),
selectedRecord.get("last_name", String.class),
selectedRecord.get("age", Integer.class),
notes);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("notes:" + notesAsText + " parsing error:" + e);
}
}
}
@@ -0,0 +1,49 @@
package ru.otus.spring.rest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import reactor.core.scheduler.Scheduler;
@RestController
public class AnnotatedController {
private static final Logger logger = LoggerFactory.getLogger(AnnotatedController.class);
private final Scheduler workerPool;
public AnnotatedController(Scheduler workerPool) {
this.workerPool = workerPool;
}
@GetMapping("/flux/one")
public Mono<String> one() {
return Mono.just("one");
}
@GetMapping(path ="/flux/ten", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> list() {
logger.info("list request");
return Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1), workerPool)
.doOnNext(val -> logger.info("value:{}", val));
}
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream() {
logger.info("stream");
return Flux.generate(() -> 0, (state, emitter) -> {
emitter.next(state);
return state + 1;
})
.delayElements(Duration.ofSeconds(1L))
.map(Object::toString)
.map(val -> String.format("valStr:%s", val));
}
}
@@ -0,0 +1,57 @@
package ru.otus.spring.rest;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import ru.otus.spring.domain.Notes;
import ru.otus.spring.domain.Person;
import ru.otus.spring.domain.PersonDto;
import ru.otus.spring.repository.NotesRepository;
import ru.otus.spring.repository.PersonRepository;
import ru.otus.spring.repository.PersonRepositoryCustom;
import java.util.List;
@RestController
public class PersonController {
private final PersonRepository personRepository;
private final NotesRepository notesRepository;
private final PersonRepositoryCustom personRepositoryCustom;
public PersonController(PersonRepository personRepository, NotesRepository notesRepository, PersonRepositoryCustom personRepositoryCustom) {
this.personRepository = personRepository;
this.notesRepository = notesRepository;
this.personRepositoryCustom = personRepositoryCustom;
}
@GetMapping("/person")
public Flux<PersonDto> all() {
return personRepositoryCustom.findAll();
}
@GetMapping("/person/{id}")
public Mono<ResponseEntity<PersonDto>> byId(@PathVariable("id") Long id) {
return personRepository.findById(id)
.flatMap(person -> notesRepository.findByPersonId(person.getId()).map(Notes::getNoteText).collectList()
.map(notes -> toDto(person, notes)))
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.fromCallable(() -> ResponseEntity.notFound().build()));
}
@PostMapping("/person")
public Mono<Person> save(@RequestBody Mono<Person> dto) {
return personRepository.save(dto);
}
@GetMapping("/person/find")
public Flux<Person> byName(@RequestParam("name") String name) {
return personRepository.findAllByLastName(name);
}
private PersonDto toDto(Person person, List<String> notes) {
return new PersonDto(String.valueOf(person.getId()), person.getLastName(), person.getAge(), notes);
}
}
@@ -0,0 +1,17 @@
server:
port: 8080
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5430/demoDB
username: usr
password: pwd
flyway:
url: jdbc:postgresql://localhost:5430/demoDB
user: usr
password: pwd
logging:
level:
org.springframework.jdbc.core.JdbcTemplate: TRACE
@@ -0,0 +1,15 @@
create table person
(
id bigserial not null primary key,
last_name varchar(50) not null,
age int not null
);
create table notes
(
id bigserial not null primary key,
note_text varchar(250) not null,
person_id bigint not null references person (id)
);
create index idx_notes_person_id on notes (person_id);
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,29 @@
package ru.otus.spring.repository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import ru.otus.spring.domain.Person;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@SpringBootTest
class PersonRepositoryTest {
@Autowired
private PersonRepository repository;
@Test
void shouldSetIdOnSave() {
Mono<Person> personMono = repository.save(new Person("Bill", 12));
StepVerifier
.create(personMono)
.assertNext(person -> assertNotNull(person.getId()))
.expectComplete()
.verify();
}
}
@@ -0,0 +1,96 @@
package ru.otus.spring.rest;
import java.time.Duration;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class AnnotatedControllerTest {
@Autowired
private WebTestClient webTestClient;
@LocalServerPort
private int port;
@Test
void oneTest() {
//given
var client = WebClient.create(String.format("http://localhost:%d", port));
//when
var result = client
.get().uri("/flux/one")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(3))
.block();
//then
assertThat(result).isEqualTo("one");
}
@Test
void streamTest() {
//given
var client = WebClient.create(String.format("http://localhost:%d", port));
var expectedSize = 5;
//when
List<String> result = client
.get().uri("/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.take(expectedSize)
.timeout(Duration.ofSeconds(3))
.collectList()
.block();
//then
assertThat(result).hasSize(expectedSize)
.contains(String.format("valStr:%s", 0),
String.format("valStr:%s", 1),
String.format("valStr:%s", 2),
String.format("valStr:%s", 3),
String.format("valStr:%s", 4));
}
@Test
void dataTest() {
//given
var webTestClientForTest = webTestClient.mutate()
.responseTimeout(Duration.ofSeconds(20))
.build();
//when
var result = webTestClientForTest
.get().uri("/flux/ten")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.returnResult(Integer.class)
.getResponseBody();
//then
var step = StepVerifier.create(result);
StepVerifier.Step<Integer> stepResult = null;
for (var idx = 1; idx <= 10; idx++) {
stepResult = step.expectNext(idx);
}
stepResult.verifyComplete();
}
}
@@ -0,0 +1,28 @@
package ru.otus.spring.rest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
@SpringBootTest
class PersonControllerTest {
@Autowired
private RouterFunction<ServerResponse> route;
@Test
void testRoute() {
WebTestClient client = WebTestClient
.bindToRouterFunction(route)
.build();
client.get()
.uri("/func/person")
.exchange()
.expectStatus()
.isOk();
}
}