Merge remote-tracking branch 'origin/master'

This commit is contained in:
vitalykutsenko
2024-05-15 19:59:43 +03:00
189 changed files with 6778 additions and 45 deletions
@@ -0,0 +1,39 @@
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
!gradle-wrapper.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# Ignore Gradle project-specific cache directory
.gradle
/buildSrc/.gradle/
# Ignore Gradle build output directory
build
out
#Idea
*.iml
*.iws
*.ipr
*.idea
@@ -0,0 +1,11 @@
###
GET http://localhost:8082/data-mono/16
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8082/data/5
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
+5
View File
@@ -0,0 +1,5 @@
date
for run in {1..100000}; do
curl -s "http://localhost:8082/data-mono/$run" > /dev/null
done
date
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus</groupId>
<artifactId>spring-41-kafka-webflux</artifactId>
<version>1.0</version>
</parent>
<artifactId>client</artifactId>
<version>1.0</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>ru.otus</groupId>
<artifactId>common</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,12 @@
package com.datasrc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ClientData {
public static void main(String[] args) {
SpringApplication.run(ClientData.class, args);
}
}
@@ -0,0 +1,58 @@
package com.datasrc;
import com.datasrc.config.ReactiveSender;
import com.datasrc.model.Request;
import com.datasrc.model.RequestId;
import com.datasrc.model.StreamData;
import com.datasrc.model.StringValue;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// http://localhost:8082/data/5
@RestController
public class ClientDataController {
private static final Logger log = LoggerFactory.getLogger(ClientDataController.class);
private final AtomicLong idGenerator = new AtomicLong(0);
private final WebClient webClient;
private final ReactiveSender<Long, Request> requestSender;
private final StringValueStorage stringValueStorage;
public ClientDataController(WebClient webClient, ReactiveSender<Long, Request> requestSender, StringValueStorage stringValueStorage) {
this.webClient = webClient;
this.requestSender = requestSender;
this.stringValueStorage = stringValueStorage;
}
@GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<StreamData> data(@PathVariable("seed") long seed) {
log.info("request for data, seed:{}", seed);
return webClient.get().uri(String.format("/data/%d", seed))
.accept(MediaType.APPLICATION_NDJSON)
.retrieve()
.bodyToFlux(StreamData.class)
.doOnNext(val -> log.info("val:{}", val));
}
@GetMapping(value = "/data-mono/{seed}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<StringValue> dataMono(@PathVariable("seed") long seed) {
log.info("request for string data-mono, seed:{}", seed);
var request = new Request(new RequestId(idGenerator.incrementAndGet()), seed);
return requestSender.send(request, requestSend -> log.info("send ok: {}", requestSend))
.flatMap(v -> stringValueStorage.get(new RequestId(v.correlationMetadata().id())))
.doOnNext(stringValue -> log.info("value for client:{}", stringValue));
}
}
@@ -0,0 +1,47 @@
package com.datasrc;
import com.datasrc.model.RequestId;
import com.datasrc.model.StringValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.NonNull;
public class StringValueStorage implements Sinks.EmitFailureHandler {
private final Sinks.Many<ResponseData> sink;
private final ConnectableFlux<ResponseData> sinkConnectable;
private static final Logger log = LoggerFactory.getLogger(StringValueStorage.class);
public StringValueStorage() {
sink = Sinks.many().multicast().onBackpressureBuffer();
sinkConnectable = sink.asFlux().publish();
sinkConnectable.connect();
}
public void put(RequestId requestId, StringValue value) {
log.info("put. requestId:{}, value:{}", requestId, value);
sink.emitNext(new ResponseData(requestId, value), this);
}
public Mono<StringValue> get(RequestId requestId) {
return Mono.from(sinkConnectable
.filter(responseData -> {
log.info("waiting:{}, fact:{}", requestId, responseData.requestId);
return responseData.requestId.id() == requestId.id();
})
.map(responseData -> responseData.stringValue));
}
@Override
public boolean onEmitFailure(@NonNull SignalType signalType, @NonNull Sinks.EmitResult emitResult) {
return false;
}
private record ResponseData(RequestId requestId, StringValue stringValue) {
}
}
@@ -0,0 +1,117 @@
package com.datasrc.config;
import com.datasrc.StringValueStorage;
import com.datasrc.model.Request;
import com.datasrc.model.Response;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
@Configuration
@SuppressWarnings("java:S2095")
public class ApplConfig {
private static final int THREAD_POOL_SIZE = 4;
private static final int RESPONSE_RECEIVER_POOL_SIZE = 1;
private static final int KAFKA_POOL_SIZE = 1;
@Bean(name ="serverThreadEventLoop", destroyMethod = "close")
public NioEventLoopGroup serverThreadEventLoop() {
return new NioEventLoopGroup(THREAD_POOL_SIZE,
new ThreadFactory() {
private final AtomicLong threadIdGenerator = new AtomicLong(0);
@Override
public Thread newThread(@NonNull Runnable task) {
return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet());
}
});
}
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory(@Qualifier("serverThreadEventLoop") NioEventLoopGroup serverThreadEventLoop) {
var factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder.runOn(serverThreadEventLoop));
return factory;
}
@Bean(name ="clientThreadEventLoop", destroyMethod = "close")
public NioEventLoopGroup clientThreadEventLoop() {
return new NioEventLoopGroup(THREAD_POOL_SIZE,
new ThreadFactory() {
private final AtomicLong threadIdGenerator = new AtomicLong(0);
@Override
public Thread newThread(@NonNull Runnable task) {
return new Thread(task, "client-thread-" + threadIdGenerator.incrementAndGet());
}
});
}
@Bean
public ReactorResourceFactory reactorResourceFactory(@Qualifier("clientThreadEventLoop") NioEventLoopGroup clientThreadEventLoop) {
var resourceFactory = new ReactorResourceFactory();
resourceFactory.setLoopResources(b -> clientThreadEventLoop);
resourceFactory.setUseGlobalResources(false);
return resourceFactory;
}
@Bean
public ReactorClientHttpConnector reactorClientHttpConnector(ReactorResourceFactory resourceFactory) {
return new ReactorClientHttpConnector(resourceFactory, mapper -> mapper);
}
@Bean("responseReceiverScheduler")
public Scheduler responseReceiverScheduler() {
return Schedulers.newParallel("response-receiver", RESPONSE_RECEIVER_POOL_SIZE);
}
@Bean("kafkaScheduler")
public Scheduler kafkaScheduler() {
return Schedulers.newParallel("kafka-scheduler", KAFKA_POOL_SIZE);
}
@Bean
public WebClient webClient(WebClient.Builder builder,
@Value("${application.processor.url}") String url) {
return builder
.baseUrl(url)
.build();
}
@Bean(destroyMethod = "close")
public ReactiveSender<Long, Request> requestSender(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers,
@Value("${application.topic-request}") String topicRequest,
@Qualifier("kafkaScheduler") Scheduler kafkaScheduler
) {
return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicRequest);
}
@Bean
public StringValueStorage stringValueStorage() {
return new StringValueStorage();
}
@Bean(destroyMethod = "close")
public ReactiveReceiver<Response> responseReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers,
@Value("${application.topic-response}") String topicResponse,
@Value("${application.kafka-group-id}") String groupId,
@Qualifier("responseReceiverScheduler") Scheduler responseReceiverScheduler,
StringValueStorage stringValueStorage) {
return new ReactiveReceiver<>(bootstrapServers, Response.class, topicResponse, responseReceiverScheduler, groupId,
response -> stringValueStorage.put(response.data().requestId(), response.data()));
}
}
@@ -0,0 +1,10 @@
server:
port: 8082
application:
processor:
url: http://localhost:8081
kafka-bootstrap-servers: localhost:9092
kafka-group-id: clientConsumerGroup
topic-request: request
topic-response: response
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,16 @@
<!DOCTYPE html>
<html>
<head>
<script src="https://unpkg.com/can-ndjson-stream@0.1.6/dist/global/can-ndjson-stream.js"></script>
<script src="./webclient.js"></script>
<meta charset="UTF-8">
<title>Stream Demo</title>
</head>
<body>
<h1>Stream Demo</h1>
<div id="dataBlock"/>
</body>
</html>
@@ -0,0 +1,24 @@
const streamErr = e => {
console.warn("error");
console.warn(e);
}
fetch("http://localhost:8082/data/5").then((response) => {
return can.ndjsonStream(response.body);
}).then(dataStream => {
const reader = dataStream.getReader();
const read = result => {
if (result.done) {
return;
}
render(result.value);
reader.read().then(read, streamErr);
}
reader.read().then(read, streamErr);
});
const render = value => {
const div = document.createElement('div');
div.append(new Date() + ' stringValue:', JSON.stringify(value));
document.getElementById('dataBlock').append(div);
};
@@ -0,0 +1,11 @@
###
GET http://localhost:8082/data-mono/13
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8082/data/5
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
+5
View File
@@ -0,0 +1,5 @@
date
for run in {1..100000}; do
curl -s "http://localhost:8082/data-mono/$run" > /dev/null
done
date
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus</groupId>
<artifactId>spring-41-kafka-webflux</artifactId>
<version>1.0</version>
</parent>
<artifactId>common</artifactId>
<version>1.0</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,7 @@
package com.datasrc.config;
public class ConsumerException extends RuntimeException {
public ConsumerException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,42 @@
package com.datasrc.config;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
public class JsonDeserializer<T> implements Deserializer<T> {
public static final String OBJECT_MAPPER = "objectMapper";
public static final String TYPE_REFERENCE = "typeReference";
private final String encoding = StandardCharsets.UTF_8.name();
private ObjectMapper mapper;
private JavaType javaType;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
mapper = (ObjectMapper) configs.get(OBJECT_MAPPER);
if (mapper == null) {
throw new IllegalArgumentException("config property OBJECT_MAPPER was not set");
}
javaType = (JavaType) configs.get(TYPE_REFERENCE);
if (javaType == null) {
throw new IllegalArgumentException("config property TYPE_REFERENCE was not set");
}
}
@Override
public T deserialize(String topic, byte[] data) {
try {
if (data == null) {
return null;
} else {
var valueAsString = new String(data, encoding);
return mapper.readValue(valueAsString, javaType);
}
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to StringValue", e);
}
}
}
@@ -0,0 +1,34 @@
package com.datasrc.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
public class JsonSerializer<T> implements Serializer<T> {
public static final String OBJECT_MAPPER = "objectMapper";
private final String encoding = StandardCharsets.UTF_8.name();
private ObjectMapper mapper;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
mapper = (ObjectMapper) configs.get(OBJECT_MAPPER);
if (mapper == null) {
throw new IllegalArgumentException("config property OBJECT_MAPPER was not set");
}
}
@Override
public byte[] serialize(String topic, T data) {
try {
if (data == null) {
return new byte[]{};
} else {
return mapper.writeValueAsString(data).getBytes(encoding);
}
} catch (Exception e) {
throw new SerializationException("Error when serializing StringValue to byte[] ", e);
}
}
}
@@ -0,0 +1,127 @@
package com.datasrc.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.InetAddress;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.scheduler.Scheduler;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.util.retry.Retry;
import static com.datasrc.config.JsonDeserializer.OBJECT_MAPPER;
import static com.datasrc.config.JsonDeserializer.TYPE_REFERENCE;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
public class ReactiveReceiver<T> {
private static final Logger log = LoggerFactory.getLogger(ReactiveReceiver.class);
private final Random random = new Random();
private final Disposable kafkaSubscriber;
private final Disposable kafkaConnection;
public static final int MAX_POLL_INTERVAL_MS = 1_000;
public ReactiveReceiver(
String bootstrapServers, Class<?> valueClass, String topicName, Scheduler schedulerValueReceiver, String groupId, Consumer<T> valueConsumer
) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(GROUP_ID_CONFIG, groupId);
props.put(GROUP_INSTANCE_ID_CONFIG, makeGroupInstanceIdConfig(groupId));
props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
var objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
props.put(OBJECT_MAPPER, objectMapper);
props.put(TYPE_REFERENCE, objectMapper.getTypeFactory().constructType(valueClass));
props.put(MAX_POLL_RECORDS_CONFIG, 3);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL_MS);
ReceiverOptions<Long, T> receiverOptions =
ReceiverOptions.<Long, T>create(props)
.pollTimeout(Duration.ofSeconds(500))
.schedulerSupplier(() -> schedulerValueReceiver)
.subscription(Collections.singleton(topicName));
Flux<ConsumerRecord<Long, T>> inboundFlux = KafkaReceiver.create(receiverOptions)
.receiveAutoAck()
.concatMap(
consumerRecordFlux -> {
log.info("consumerRecordFlux done, commit");
return consumerRecordFlux;
})
.retryWhen(Retry.backoff(3, Duration.of(5, ChronoUnit.SECONDS)));
Hooks.onErrorDropped(
error -> {
if (error instanceof CancellationException) {
log.info("Cancellation event:", error);
} else {
log.error("error:", error);
}
});
var kafkaFlow = inboundFlux.doOnCancel(() -> log.info("connection canceled"))
.doOnError(error -> log.error("Consuming error", error))
.publish();
log.info("start consuming");
kafkaConnection = kafkaFlow.connect();
kafkaSubscriber =
kafkaFlow.subscribe(
receiverRecord -> {
var key = receiverRecord.key();
var value = receiverRecord.value();
log.info("key:{}, value:{}, record:{}", key, value, receiverRecord);
valueConsumer.accept(value);
});
}
public void close() {
log.info("stop consuming");
kafkaSubscriber.dispose();
kafkaConnection.dispose();
}
private String makeGroupInstanceIdConfig(String groupId) {
try {
var hostName = InetAddress.getLocalHost().getHostName();
return String.join(
"-",
groupId,
hostName,
String.valueOf(random.nextInt(100_999_999)));
} catch (Exception ex) {
throw new ConsumerException("can't make GroupInstanceIdConfig", ex);
}
}
}
@@ -0,0 +1,92 @@
package com.datasrc.config;
import static com.datasrc.config.JsonSerializer.OBJECT_MAPPER;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.RETRIES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import com.datasrc.model.DataForSending;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
public class ReactiveSender<D, T extends DataForSending<D>> {
private static final Logger log = LoggerFactory.getLogger(ReactiveSender.class);
private final KafkaSender<Long, T> sender;
private final String topicName;
public ReactiveSender(String bootstrapServers, Scheduler schedulerKafka, String topicName) {
this.topicName = topicName;
var props = new Properties();
props.put(CLIENT_ID_CONFIG, "KafkaReactiveSender");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ACKS_CONFIG, "1");
props.put(RETRIES_CONFIG, 1);
props.put(BATCH_SIZE_CONFIG, 16384);
props.put(LINGER_MS_CONFIG, 10);
props.put(BUFFER_MEMORY_CONFIG, 26384); // bytes
props.put(MAX_BLOCK_MS_CONFIG, 1_000); // ms
props.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
var objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
props.put(OBJECT_MAPPER, objectMapper);
SenderOptions<Long, T> senderOptions =
SenderOptions.<Long, T>create(props)
.maxInFlight(10)
.scheduler(schedulerKafka);
sender = KafkaSender.create(senderOptions);
var shutdownHook =
new Thread(
() -> {
log.info("closing kafka sender");
sender.close();
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
public Mono<SenderResult<T>> send(T data, Consumer<T> sendAsk) {
return sender.send(Mono.just(SenderRecord.create(
topicName,
null,
null,
data.id(),
data,
data)))
.doOnError(error -> log.error("Send failed", error))
.doOnNext(
senderResult -> {
log.info(
"message id:{} was sent, offset:{}",
senderResult.correlationMetadata().id(),
senderResult.recordMetadata().offset());
sendAsk.accept(senderResult.correlationMetadata());
}).next();
}
public void close() {
sender.close();
}
}
@@ -0,0 +1,7 @@
package com.datasrc.model;
public interface DataForSending<T> {
long id();
T data();
}
@@ -0,0 +1,34 @@
package com.datasrc.model;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Request implements DataForSending<Long> {
private final RequestId id;
private final long seed;
@JsonCreator
public Request(@JsonProperty("id") RequestId id, @JsonProperty("seed") long seed) {
this.id = id;
this.seed = seed;
}
@Override
public long id() {
return id.id();
}
@Override
public Long data() {
return seed;
}
@Override
public String toString() {
return "Request{" +
"id=" + id +
", seed=" + seed +
'}';
}
}
@@ -0,0 +1,4 @@
package com.datasrc.model;
public record RequestId(long id) {
}
@@ -0,0 +1,33 @@
package com.datasrc.model;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Response implements DataForSending<StringValue> {
private final ResponseId id;
private final StringValue stringValue;
@JsonCreator
public Response(@JsonProperty("id") ResponseId id, @JsonProperty("stringValue") StringValue stringValue) {
this.id = id;
this.stringValue = stringValue;
}
@Override
public long id() {
return id.id();
}
@Override
public StringValue data() {
return stringValue;
}
@Override
public String toString() {
return "Response{" +
"id=" + id +
", stringValue=" + stringValue +
'}';
}
}
@@ -0,0 +1,4 @@
package com.datasrc.model;
public record ResponseId(long id) {
}
@@ -0,0 +1,4 @@
package com.datasrc.model;
public record StreamData(String value) {
}
@@ -0,0 +1,4 @@
package com.datasrc.model;
public record StringValue(RequestId requestId, String value) {
}
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,24 @@
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+105
View File
@@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>spring-41-kafka-webflux</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.1</version>
</parent>
<packaging>pom</packaging>
<modules>
<module>client</module>
<module>processor</module>
<module>source</module>
<module>common</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
<maven-assembly-plugin.version>3.1.1</maven-assembly-plugin.version>
<minimal.maven.version>3.3.9</minimal.maven.version>
<dependencyManagement.version>1.0.12.RELEASE</dependencyManagement.version>
<springframeworkBoot.version>3.0.5</springframeworkBoot.version>
<jsr305.version>3.0.2</jsr305.version>
<wiremock.version>3.0.0-beta-10</wiremock.version>
<reactorKafka.version>1.3.18</reactorKafka.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springframeworkBoot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>${reactorKafka.version}</version>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>${wiremock.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>${maven-enforcer-plugin.version}</version>
<executions>
<execution>
<id>enforce-maven</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<dependencyConvergence/>
<requireMavenVersion>
<version>${minimal.maven.version}</version>
</requireMavenVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus</groupId>
<artifactId>spring-41-kafka-webflux</artifactId>
<version>1.0</version>
</parent>
<artifactId>processor</artifactId>
<version>1.0</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>ru.otus</groupId>
<artifactId>common</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,12 @@
package com.datasrc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProcessorData {
public static void main(String[] args) {
SpringApplication.run(ProcessorData.class, args);
}
}
@@ -0,0 +1,40 @@
package com.datasrc;
import com.datasrc.model.StreamData;
import com.datasrc.processor.DataProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
@RestController
public class ProcessorDataController {
private static final Logger log = LoggerFactory.getLogger(ProcessorDataController.class);
private final DataProcessor<Flux<StreamData>> dataProcessorStringReactorFlux;
private final WebClient client;
public ProcessorDataController(WebClient client,
@Qualifier("dataProcessorFlux") DataProcessor<Flux<StreamData>> dataProcessorFlux) {
this.dataProcessorStringReactorFlux = dataProcessorFlux;
this.client = client;
}
@GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<StreamData> data(@PathVariable("seed") long seed) {
log.info("request for data, seed:{}", seed);
var srcRequest = client.get().uri(String.format("/data/%d", seed))
.accept(MediaType.APPLICATION_NDJSON)
.retrieve()
.bodyToFlux(StreamData.class);
return dataProcessorStringReactorFlux.process(srcRequest);
}
}
@@ -0,0 +1,133 @@
package com.datasrc.config;
import com.datasrc.model.Request;
import com.datasrc.model.RequestId;
import com.datasrc.model.Response;
import com.datasrc.model.ResponseId;
import com.datasrc.model.StringValue;
import com.datasrc.processor.DataProcessor;
import io.netty.channel.nio.NioEventLoopGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
@Configuration
@SuppressWarnings("java:S2095")
public class ApplConfig {
private static final Logger log = LoggerFactory.getLogger(ApplConfig.class);
private static final int THREAD_POOL_SIZE = 2;
private static final int REQUEST_RECEIVER_POOL_SIZE = 1;
private static final int KAFKA_POOL_SIZE = 1;
private final AtomicLong responseIdGenerator = new AtomicLong(0);
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
var eventLoopGroup = new NioEventLoopGroup(THREAD_POOL_SIZE,
new ThreadFactory() {
private final AtomicLong threadIdGenerator = new AtomicLong(0);
@Override
public Thread newThread(@NonNull Runnable task) {
return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet());
}
});
var factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder.runOn(eventLoopGroup));
return factory;
}
@Bean
public ReactorResourceFactory reactorResourceFactory() {
var eventLoopGroup = new NioEventLoopGroup(THREAD_POOL_SIZE,
new ThreadFactory() {
private final AtomicLong threadIdGenerator = new AtomicLong(0);
@Override
public Thread newThread(@Nullable Runnable task) {
return new Thread(task, "client-thread-" + threadIdGenerator.incrementAndGet());
}
});
var resourceFactory = new ReactorResourceFactory();
resourceFactory.setLoopResources(b -> eventLoopGroup);
resourceFactory.setUseGlobalResources(false);
return resourceFactory;
}
@Bean
public ReactorClientHttpConnector reactorClientHttpConnector(ReactorResourceFactory resourceFactory) {
return new ReactorClientHttpConnector(resourceFactory, mapper -> mapper);
}
@Bean
public Scheduler timer() {
return Schedulers.newParallel("processor-thread", 2);
}
@Bean("requestReceiverScheduler")
public Scheduler requestReceiverScheduler() {
return Schedulers.newParallel("request-receiver", REQUEST_RECEIVER_POOL_SIZE);
}
@Bean("kafkaScheduler")
public Scheduler kafkaScheduler() {
return Schedulers.newParallel("kafka-scheduler", KAFKA_POOL_SIZE);
}
@Bean
public WebClient webClient(WebClient.Builder builder,
@Value("${application.source.url}") String url) {
return builder
.baseUrl(url)
.build();
}
@Bean(destroyMethod = "close")
public ReactiveSender<StringValue, Response> responseSender(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers,
@Value("${application.topic-response}") String topicResponse,
@Qualifier("kafkaScheduler") Scheduler kafkaScheduler
) {
return new ReactiveSender<>(bootstrapServers, kafkaScheduler, topicResponse);
}
@Bean(destroyMethod = "close")
public ReactiveReceiver<Request> requestReceiver(@Value("${application.kafka-bootstrap-servers}") String bootstrapServers,
@Value("${application.topic-request}") String topicRequest,
@Value("${application.kafka-group-id}") String groupId,
@Qualifier("requestReceiverScheduler") Scheduler responseReceiverScheduler,
ReactiveSender<StringValue, Response> responseSender,
@Qualifier("dataProcessorMono") DataProcessor<String> dataProcessor,
WebClient webClient) {
return new ReactiveReceiver<>(bootstrapServers, Request.class, topicRequest, responseReceiverScheduler, groupId,
request -> webClient.get().uri(String.format("/data-mono/%d", request.data()))
.retrieve()
.bodyToMono(String.class)
.map(dataProcessor::process)
.flatMap(stringValue ->
responseSender.send(new Response(new ResponseId(responseIdGenerator.incrementAndGet()),
new StringValue(new RequestId(request.id()), stringValue)),
stringValueDataForSending -> log.info("response send:{}", stringValueDataForSending)))
.subscribe());
}
}
@@ -0,0 +1,6 @@
package com.datasrc.processor;
public interface DataProcessor<T> {
T process(T data);
}
@@ -0,0 +1,33 @@
package com.datasrc.processor;
import com.datasrc.model.StreamData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import java.time.Duration;
@Service("dataProcessorFlux")
public class DataProcessorStringReactorFlux implements DataProcessor<Flux<StreamData>> {
private static final Logger log = LoggerFactory.getLogger(DataProcessorStringReactorFlux.class);
private final Scheduler timer;
public DataProcessorStringReactorFlux(Scheduler timer) {
this.timer = timer;
}
@Override
public Flux<StreamData> process(Flux<StreamData> dataflow) {
log.info("processor");
var dataSeq = dataflow
.doOnNext(val -> log.info("in val:{}", val))
.delayElements(Duration.ofSeconds(5), timer)
.map(data -> new StreamData(data.value().toUpperCase()))
.doOnNext(val -> log.info("out val:{}", val));
log.info("processor method finished");
return dataSeq;
}
}
@@ -0,0 +1,16 @@
package com.datasrc.processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service("dataProcessorMono")
public class DataProcessorStringReactorMono implements DataProcessor<String> {
private static final Logger log = LoggerFactory.getLogger(DataProcessorStringReactorMono.class);
@Override
public String process(String value) {
log.info("processor");
return value.toUpperCase();
}
}
@@ -0,0 +1,10 @@
server:
port: 8081
application:
source:
url: http://localhost:8080
kafka-bootstrap-servers: localhost:9092
kafka-group-id: processorConsumerGroup
topic-request: request
topic-response: response
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,12 @@
###
GET http://localhost:8080/data-mono/13
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
###
GET http://localhost:8080/data/5
Accept: */*
Content-Type: application/json
Cache-Control: no-cache
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus</groupId>
<artifactId>spring-41-kafka-webflux</artifactId>
<version>1.0</version>
</parent>
<artifactId>source</artifactId>
<version>1.0</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>ru.otus</groupId>
<artifactId>common</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,12 @@
package com.datasrc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SourceData {
public static void main(String[] args) {
SpringApplication.run(SourceData.class, args);
}
}
@@ -0,0 +1,54 @@
package com.datasrc;
import com.datasrc.model.StreamData;
import com.datasrc.producer.DataProducer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class SourceDataController {
private static final Logger log = LoggerFactory.getLogger(SourceDataController.class);
private final DataProducer<Flux<StreamData>> dataProducerFlux;
private final DataProducer<String> dataProducerStringBlocked;
private final Executor blockingExecutor;
public SourceDataController(@Qualifier("dataProducerFlux") DataProducer<Flux<StreamData>> dataProducerFlux,
@Qualifier("dataProducerStringBlocked") DataProducer<String> dataProducerStringBlocked,
@Qualifier("blockingExecutor") Executor blockingExecutor) {
this.dataProducerFlux = dataProducerFlux;
this.dataProducerStringBlocked = dataProducerStringBlocked;
this.blockingExecutor = blockingExecutor;
}
@GetMapping(value = "/data/{seed}", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<StreamData> data(@PathVariable("seed") long seed) {
log.info("request for string data, seed:{}", seed);
var stringData = dataProducerFlux.produce(seed);
log.info("Method request for string data done");
return stringData;
}
@GetMapping(value = "/data-mono/{seed}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<String> dataMono(@PathVariable("seed") long seed) {
log.info("request for string data-mono, seed:{}", seed);
var future = CompletableFuture
.supplyAsync(() -> dataProducerStringBlocked.produce(seed), blockingExecutor);
var mono = Mono.fromFuture(future);
log.info("Method request for string data done");
return mono;
}
}
@@ -0,0 +1,53 @@
package com.datasrc.config;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
@Configuration
@SuppressWarnings("java:S2095")
public class ApplConfig {
private static final int THREAD_POOL_SIZE = 2;
private static final int BLOCKING_THREAD_POOL_SIZE = 2;
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
var eventLoopGroup = new NioEventLoopGroup(THREAD_POOL_SIZE,
new ThreadFactory() {
private final AtomicLong threadIdGenerator = new AtomicLong(0);
@Override
public Thread newThread(@NonNull Runnable task) {
return new Thread(task, "server-thread-" + threadIdGenerator.incrementAndGet());
}
});
var factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder.runOn(eventLoopGroup));
return factory;
}
@Bean("blockingExecutor")
public Executor blockingExecutor() {
var id = new AtomicLong(0);
return Executors.newFixedThreadPool(BLOCKING_THREAD_POOL_SIZE,
task -> new Thread(task, String.format("blocking-thread-%d", id.incrementAndGet())));
}
@Bean
public Scheduler timer() {
return Schedulers.newParallel("processor-thread", 2);
}
}
@@ -0,0 +1,6 @@
package com.datasrc.producer;
public interface DataProducer<T> {
T produce(long seed);
}
@@ -0,0 +1,26 @@
package com.datasrc.producer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service("dataProducerStringBlocked")
public class DataProducerStringBlocked implements DataProducer<String> {
private static final Logger log = LoggerFactory.getLogger(DataProducerStringBlocked.class);
@Override
public String produce(long seed) {
log.info("produce using seed:{}", seed);
sleep();
return String.format("someDataStr:%s", seed);
}
private void sleep() {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@@ -0,0 +1,39 @@
package com.datasrc.producer;
import com.datasrc.model.StreamData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.time.Duration;
import java.util.function.BiFunction;
import reactor.core.scheduler.Scheduler;
@Service("dataProducerFlux")
public class DataProducerStringFlux implements DataProducer<Flux<StreamData>> {
private static final Logger log = LoggerFactory.getLogger(DataProducerStringFlux.class);
private final Scheduler timer;
public DataProducerStringFlux(Scheduler timer) {
this.timer = timer;
}
@Override
public Flux<StreamData> produce(long seed) {
log.info("produce using seed:{}", seed);
var stringSeed = "someDataStr:";
var dataSeq = Flux.generate(() -> seed,
(BiFunction<Long, SynchronousSink<Long>, Long>) (prev, sink) -> {
var newValue = prev + 1;
sink.next(newValue);
return newValue;
})
.delayElements(Duration.ofSeconds(3), timer)
.map(val -> new StreamData(stringSeed + val))
.doOnNext(val -> log.info("val:{}", val));
log.info("produce method finished");
return dataSeq;
}
}
@@ -0,0 +1,2 @@
server:
port: 8080
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,4 @@
.idea/
*.iml
target/
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>jpql-exercise</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,13 @@
package ru.otus.example.ormdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrmDemoApplication {
public static void main(String[] args) {
SpringApplication.run(OrmDemoApplication.class, args);
}
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "avatars")
public class Avatar {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "photo_url", nullable = false, unique = true)
private String photoUrl;
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "courses")
public class Course {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "name", nullable = false, unique = true)
private String name;
}
@@ -0,0 +1,26 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "emails")
public class EMail {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "email", nullable = false, unique = true)
private String email;
}
@@ -0,0 +1,53 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.CascadeType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.JoinTable;
import jakarta.persistence.ManyToMany;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity // Указывает, что данный класс является сущностью
@Table(name = "otus_students") // Задает имя таблицы, на которую будет отображаться сущность
public class OtusStudent {
@Id // Позволяет указать какое поле является идентификатором
@GeneratedValue(strategy = GenerationType.IDENTITY) // Стратегия генерации идентификаторов
private long id;
// Задает имя и некоторые свойства поля таблицы, на которое будет отображаться поле сущности
@Column(name = "name", nullable = false, unique = true)
private String name;
// Указывает на связь между таблицами "один к одному"
@OneToOne(targetEntity = Avatar.class, cascade = CascadeType.ALL)
// Задает поле, по которому происходит объединение с таблицей для хранения связанной сущности
@JoinColumn(name = "avatar_id")
private Avatar avatar;
// Указывает на связь между таблицами "один ко многим"
@OneToMany(targetEntity = EMail.class, cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JoinColumn(name = "student_id")
private List<EMail> emails;
// Указывает на связь между таблицами "многие ко многим"
@ManyToMany(targetEntity = Course.class, fetch = FetchType.LAZY, cascade = CascadeType.PERSIST)
// Задает таблицу связей между таблицами для хранения родительской и связанной сущностью
@JoinTable(name = "student_courses", joinColumns = @JoinColumn(name = "student_id"),
inverseJoinColumns = @JoinColumn(name = "course_id"))
private List<Course> courses;
}
@@ -0,0 +1,18 @@
package ru.otus.example.ormdemo.repositories;
import ru.otus.example.ormdemo.models.OtusStudent;
import java.util.List;
import java.util.Optional;
public interface OtusStudentRepository {
OtusStudent save(OtusStudent student);
Optional<OtusStudent> findById(long id);
List<OtusStudent> findAll();
List<OtusStudent> findByName(String name);
void updateNameById(long id, String name);
void deleteById(long id);
}
@@ -0,0 +1,58 @@
package ru.otus.example.ormdemo.repositories;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.ormdemo.models.OtusStudent;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
// @Transactional должна стоять на методе сервиса.
// Причем, если метод не подразумевает изменения данных в БД то категорически желательно
// выставить у аннотации параметр readOnly в true.
// Но это только упражнение и транзакции мы пока не проходили.
// Поэтому, для упрощения, пока вешаем над классом репозитория
@Transactional
@Repository
public class OtusStudentRepositoryJpa implements OtusStudentRepository {
@PersistenceContext
private final EntityManager em;
public OtusStudentRepositoryJpa(EntityManager em) {
this.em = em;
}
@Override
public OtusStudent save(OtusStudent student) {
return null;
}
@Override
public Optional<OtusStudent> findById(long id) {
return Optional.empty();
}
@Override
public List<OtusStudent> findAll() {
return Collections.emptyList();
}
@Override
public List<OtusStudent> findByName(String name) {
return Collections.emptyList();
}
@Override
public void updateNameById(long id, String name) {
}
@Override
public void deleteById(long id) {
}
}
@@ -0,0 +1,18 @@
spring:
datasource:
url: jdbc:h2:mem:testdb
sql:
init:
mode: always
jpa:
generate-ddl: false
hibernate:
ddl-auto: none
show-sql: true
logging:
level:
ROOT: ERROR
@@ -0,0 +1,31 @@
create table avatars(
id bigserial,
photo_url varchar(8000),
primary key (id)
);
create table courses(
id bigserial,
name varchar(255),
primary key (id)
);
create table otus_students(
id bigserial,
name varchar(255),
avatar_id bigint references avatars (id),
primary key (id)
);
create table emails(
id bigserial,
student_id bigint references otus_students(id) on delete cascade,
email varchar(255),
primary key (id)
);
create table student_courses(
student_id bigint references otus_students(id) on delete cascade,
course_id bigint references courses(id),
primary key (student_id, course_id)
);
@@ -0,0 +1,126 @@
package ru.otus.example.ormdemo.repositories;
import lombok.val;
import org.hibernate.SessionFactory;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
import org.springframework.context.annotation.Import;
import ru.otus.example.ormdemo.models.OtusStudent;
import ru.otus.example.ormdemo.models.Avatar;
import ru.otus.example.ormdemo.models.Course;
import ru.otus.example.ormdemo.models.EMail;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@DisplayName("Репозиторий на основе Jpa для работы со студентами ")
@DataJpaTest
@Import(OtusStudentRepositoryJpa.class)
class OtusStudentRepositoryJpaTest {
private static final int EXPECTED_NUMBER_OF_STUDENTS = 10;
private static final long FIRST_STUDENT_ID = 1L;
private static final String FIRST_STUDENT_NAME = "student_01";
private static final int EXPECTED_QUERIES_COUNT = 31;
private static final String STUDENT_AVATAR_URL = "где-то там";
private static final String STUDENT_EMAIL = "any@mail.com";
private static final String COURSE_NAME = "Spring";
private static final String STUDENT_NAME = "Вася";
@Autowired
private OtusStudentRepositoryJpa repositoryJpa;
@Autowired
private TestEntityManager em;
@DisplayName(" должен загружать информацию о нужном студенте по его id")
@Test
void shouldFindExpectedStudentById() {
val optionalActualStudent = repositoryJpa.findById(FIRST_STUDENT_ID);
val expectedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(optionalActualStudent).isPresent().get()
.usingRecursiveComparison().isEqualTo(expectedStudent);
}
@DisplayName("должен загружать список всех студентов с полной информацией о них")
@Test
void shouldReturnCorrectStudentsListWithAllInfo() {
SessionFactory sessionFactory = em.getEntityManager().getEntityManagerFactory()
.unwrap(SessionFactory.class);
sessionFactory.getStatistics().setStatisticsEnabled(true);
System.out.println("\n\n\n\n----------------------------------------------------------------------------------------------------------");
val students = repositoryJpa.findAll();
assertThat(students).isNotNull().hasSize(EXPECTED_NUMBER_OF_STUDENTS)
.allMatch(s -> !s.getName().equals(""))
.allMatch(s -> s.getCourses() != null && s.getCourses().size() > 0)
.allMatch(s -> s.getAvatar().getPhotoUrl() != null)
.allMatch(s -> s.getEmails() != null && s.getEmails().size() > 0);
System.out.println("----------------------------------------------------------------------------------------------------------\n\n\n\n");
assertThat(sessionFactory.getStatistics().getPrepareStatementCount()).isEqualTo(EXPECTED_QUERIES_COUNT);
}
@DisplayName(" должен корректно сохранять всю информацию о студенте")
@Test
void shouldSaveAllStudentInfo() {
val avatar = new Avatar(0, STUDENT_AVATAR_URL);
val email = new EMail(0, STUDENT_EMAIL);
val emails = Collections.singletonList(email);
val course = new Course(0, COURSE_NAME);
val courses = Collections.singletonList(course);
val vasya = new OtusStudent(0, STUDENT_NAME, avatar, emails, courses);
repositoryJpa.save(vasya);
assertThat(vasya.getId()).isGreaterThan(0);
val actualStudent = em.find(OtusStudent.class, vasya.getId());
assertThat(actualStudent).isNotNull().matches(s -> !s.getName().equals(""))
.matches(s -> s.getCourses() != null && s.getCourses().size() > 0 && s.getCourses().get(0).getId() > 0)
.matches(s -> s.getAvatar() != null)
.matches(s -> s.getEmails() != null && s.getEmails().size() > 0);
}
@DisplayName(" должен загружать информацию о нужном студенте по его имени")
@Test
void shouldFindExpectedStudentByName() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
List<OtusStudent> students = repositoryJpa.findByName(FIRST_STUDENT_NAME);
assertThat(students).containsOnlyOnce(firstStudent);
}
@DisplayName(" должен изменять имя заданного студента по его id")
@Test
void shouldUpdateStudentNameById() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
String oldName = firstStudent.getName();
em.detach(firstStudent);
repositoryJpa.updateNameById(FIRST_STUDENT_ID, STUDENT_NAME);
val updatedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(updatedStudent.getName()).isNotEqualTo(oldName).isEqualTo(STUDENT_NAME);
}
@DisplayName(" должен удалять заданного студента по его id")
@Test
void shouldDeleteStudentNameById() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(firstStudent).isNotNull();
em.detach(firstStudent);
repositoryJpa.deleteById(FIRST_STUDENT_ID);
val deletedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(deletedStudent).isNull();
}
}
@@ -0,0 +1,20 @@
spring:
datasource:
url: jdbc:h2:mem:testdb
sql:
init:
mode: always
jpa:
generate-ddl: false
#generate-ddl: true
hibernate:
ddl-auto: none
#ddl-auto: create-drop
#show-sql: true
logging:
level:
ROOT: ERROR
@@ -0,0 +1,29 @@
insert into avatars(photo_url)
values ('photoUrl_01'), ('photoUrl_02'), ('photoUrl_03'), ('photoUrl_04'), ('photoUrl_05'),
('photoUrl_06'), ('photoUrl_07'), ('photoUrl_08'), ('photoUrl_09'), ('photoUrl_10');
insert into courses(name)
values ('course_name_01'), ('course_name_02'), ('course_name_03'), ('course_name_04'), ('course_name_05'),
('course_name_06'), ('course_name_07'), ('course_name_08'), ('course_name_09'), ('course_name_10'), ('not_used_11');
insert into otus_students(name, avatar_id)
values ('student_01', 1), ('student_02', 2), ('student_03', 3), ('student_04', 4), ('student_05', 5),
('student_06', 6), ('student_07', 7), ('student_08', 8), ('student_09', 9), ('student_10', 10);
insert into emails(email, student_id)
values ('email_01', 1), ('email_02', 1), ('email_03', 2), ('email_04', 2), ('email_05', 3), ('email_06', 4),
('email_07', 5), ('email_08', 6), ('email_09', 7), ('email_10', 8), ('email_11', 9), ('email_12', 10);
insert into student_courses(student_id, course_id)
values (1, 1), (1, 2), (1, 3),
(2, 2), (2, 4), (2, 5),
(3, 3), (3, 6), (3, 7),
(4, 4), (4, 8), (4, 9),
(5, 5), (5, 10), (5, 1),
(6, 6), (6, 2), (6, 3),
(7, 7), (7, 4), (7, 5),
(8, 8), (8, 6), (8, 7),
(9, 9), (9, 8), (9, 10),
(10, 10), (10, 1), (10, 2);
@@ -0,0 +1,4 @@
.idea/
*.iml
target/
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>jpql-solution-01</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,13 @@
package ru.otus.example.ormdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrmDemoApplication {
public static void main(String[] args) {
SpringApplication.run(OrmDemoApplication.class, args);
}
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "avatars")
public class Avatar {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "photo_url", nullable = false, unique = true)
private String photoUrl;
}
@@ -0,0 +1,26 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "courses")
public class Course {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "name", nullable = false, unique = true)
private String name;
}
@@ -0,0 +1,26 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "emails")
public class EMail {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "email", nullable = false, unique = true)
private String email;
}
@@ -0,0 +1,53 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.CascadeType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.JoinTable;
import jakarta.persistence.ManyToMany;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity // Указывает, что данный класс является сущностью
@Table(name = "otus_students") // Задает имя таблицы, на которую будет отображаться сущность
public class OtusStudent {
@Id // Позволяет указать какое поле является идентификатором
@GeneratedValue(strategy = GenerationType.IDENTITY) // Стратегия генерации идентификаторов
private long id;
// Задает имя и некоторые свойства поля таблицы, на которое будет отображаться поле сущности
@Column(name = "name", nullable = false, unique = true)
private String name;
// Указывает на связь между таблицами "один к одному"
@OneToOne(targetEntity = Avatar.class, cascade = CascadeType.ALL)
// Задает поле, по которому происходит объединение с таблицей для хранения связанной сущности
@JoinColumn(name = "avatar_id")
private Avatar avatar;
// Указывает на связь между таблицами "один ко многим"
@OneToMany(targetEntity = EMail.class, cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JoinColumn(name = "student_id")
private List<EMail> emails;
// Указывает на связь между таблицами "многие ко многим"
@ManyToMany(targetEntity = Course.class, fetch = FetchType.LAZY, cascade = CascadeType.PERSIST)
// Задает таблицу связей между таблицами для хранения родительской и связанной сущностью
@JoinTable(name = "student_courses", joinColumns = @JoinColumn(name = "student_id"),
inverseJoinColumns = @JoinColumn(name = "course_id"))
private List<Course> courses;
}
@@ -0,0 +1,18 @@
package ru.otus.example.ormdemo.repositories;
import ru.otus.example.ormdemo.models.OtusStudent;
import java.util.List;
import java.util.Optional;
public interface OtusStudentRepository {
OtusStudent save(OtusStudent student);
Optional<OtusStudent> findById(long id);
List<OtusStudent> findAll();
List<OtusStudent> findByName(String name);
void updateNameById(long id, String name);
void deleteById(long id);
}
@@ -0,0 +1,64 @@
package ru.otus.example.ormdemo.repositories;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.ormdemo.models.OtusStudent;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
// @Transactional должна стоять на методе сервиса.
// Причем, если метод не подразумевает изменения данных в БД то категорически желательно
// выставить у аннотации параметр readOnly в true.
// Но это только упражнение и транзакции мы пока не проходили.
// Поэтому, для упрощения, пока вешаем над классом репозитория
@Transactional
@Repository
public class OtusStudentRepositoryJpa implements OtusStudentRepository {
@PersistenceContext
private final EntityManager em;
public OtusStudentRepositoryJpa(EntityManager em) {
this.em = em;
}
@Override
public OtusStudent save(OtusStudent student) {
if (student.getId() == 0) {
em.persist(student);
return student;
}
return em.merge(student);
}
@Override
public Optional<OtusStudent> findById(long id) {
return Optional.ofNullable(em.find(OtusStudent.class, id));
}
@Override
public List<OtusStudent> findAll() {
return Collections.emptyList();
}
@Override
public List<OtusStudent> findByName(String name) {
return Collections.emptyList();
}
@Override
public void updateNameById(long id, String name) {
}
@Override
public void deleteById(long id) {
}
}
@@ -0,0 +1,18 @@
spring:
datasource:
url: jdbc:h2:mem:testdb
sql:
init:
mode: always
jpa:
generate-ddl: false
hibernate:
ddl-auto: none
show-sql: true
logging:
level:
ROOT: ERROR
@@ -0,0 +1,31 @@
create table avatars(
id bigserial,
photo_url varchar(8000),
primary key (id)
);
create table courses(
id bigserial,
name varchar(255),
primary key (id)
);
create table otus_students(
id bigserial,
name varchar(255),
avatar_id bigint references avatars (id),
primary key (id)
);
create table emails(
id bigserial,
student_id bigint references otus_students(id) on delete cascade,
email varchar(255),
primary key (id)
);
create table student_courses(
student_id bigint references otus_students(id) on delete cascade,
course_id bigint references courses(id),
primary key (student_id, course_id)
);
@@ -0,0 +1,126 @@
package ru.otus.example.ormdemo.repositories;
import lombok.val;
import org.hibernate.SessionFactory;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
import org.springframework.context.annotation.Import;
import ru.otus.example.ormdemo.models.OtusStudent;
import ru.otus.example.ormdemo.models.Avatar;
import ru.otus.example.ormdemo.models.Course;
import ru.otus.example.ormdemo.models.EMail;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@DisplayName("Репозиторий на основе Jpa для работы со студентами ")
@DataJpaTest
@Import(OtusStudentRepositoryJpa.class)
class OtusStudentRepositoryJpaTest {
private static final int EXPECTED_NUMBER_OF_STUDENTS = 10;
private static final long FIRST_STUDENT_ID = 1L;
private static final String FIRST_STUDENT_NAME = "student_01";
private static final int EXPECTED_QUERIES_COUNT = 31;
private static final String STUDENT_AVATAR_URL = "где-то там";
private static final String STUDENT_EMAIL = "any@mail.com";
private static final String COURSE_NAME = "Spring";
private static final String STUDENT_NAME = "Вася";
@Autowired
private OtusStudentRepositoryJpa repositoryJpa;
@Autowired
private TestEntityManager em;
@DisplayName(" должен загружать информацию о нужном студенте по его id")
@Test
void shouldFindExpectedStudentById() {
val optionalActualStudent = repositoryJpa.findById(FIRST_STUDENT_ID);
val expectedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(optionalActualStudent).isPresent().get()
.usingRecursiveComparison().isEqualTo(expectedStudent);
}
@DisplayName("должен загружать список всех студентов с полной информацией о них")
@Test
void shouldReturnCorrectStudentsListWithAllInfo() {
SessionFactory sessionFactory = em.getEntityManager().getEntityManagerFactory()
.unwrap(SessionFactory.class);
sessionFactory.getStatistics().setStatisticsEnabled(true);
System.out.println("\n\n\n\n----------------------------------------------------------------------------------------------------------");
val students = repositoryJpa.findAll();
assertThat(students).isNotNull().hasSize(EXPECTED_NUMBER_OF_STUDENTS)
.allMatch(s -> !s.getName().equals(""))
.allMatch(s -> s.getCourses() != null && s.getCourses().size() > 0)
.allMatch(s -> s.getAvatar().getPhotoUrl() != null)
.allMatch(s -> s.getEmails() != null && s.getEmails().size() > 0);
System.out.println("----------------------------------------------------------------------------------------------------------\n\n\n\n");
assertThat(sessionFactory.getStatistics().getPrepareStatementCount()).isEqualTo(EXPECTED_QUERIES_COUNT);
}
@DisplayName(" должен корректно сохранять всю информацию о студенте")
@Test
void shouldSaveAllStudentInfo() {
val avatar = new Avatar(0, STUDENT_AVATAR_URL);
val email = new EMail(0, STUDENT_EMAIL);
val emails = Collections.singletonList(email);
val course = new Course(0, COURSE_NAME);
val courses = Collections.singletonList(course);
val vasya = new OtusStudent(0, STUDENT_NAME, avatar, emails, courses);
repositoryJpa.save(vasya);
assertThat(vasya.getId()).isGreaterThan(0);
val actualStudent = em.find(OtusStudent.class, vasya.getId());
assertThat(actualStudent).isNotNull().matches(s -> !s.getName().equals(""))
.matches(s -> s.getCourses() != null && s.getCourses().size() > 0 && s.getCourses().get(0).getId() > 0)
.matches(s -> s.getAvatar() != null)
.matches(s -> s.getEmails() != null && s.getEmails().size() > 0);
}
@DisplayName(" должен загружать информацию о нужном студенте по его имени")
@Test
void shouldFindExpectedStudentByName() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
List<OtusStudent> students = repositoryJpa.findByName(FIRST_STUDENT_NAME);
assertThat(students).containsOnlyOnce(firstStudent);
}
@DisplayName(" должен изменять имя заданного студента по его id")
@Test
void shouldUpdateStudentNameById() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
String oldName = firstStudent.getName();
em.detach(firstStudent);
repositoryJpa.updateNameById(FIRST_STUDENT_ID, STUDENT_NAME);
val updatedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(updatedStudent.getName()).isNotEqualTo(oldName).isEqualTo(STUDENT_NAME);
}
@DisplayName(" должен удалять заданного студента по его id")
@Test
void shouldDeleteStudentNameById() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(firstStudent).isNotNull();
em.detach(firstStudent);
repositoryJpa.deleteById(FIRST_STUDENT_ID);
val deletedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(deletedStudent).isNull();
}
}
@@ -0,0 +1,20 @@
spring:
datasource:
url: jdbc:h2:mem:testdb
sql:
init:
mode: always
jpa:
generate-ddl: false
#generate-ddl: true
hibernate:
ddl-auto: none
#ddl-auto: create-drop
#show-sql: true
logging:
level:
ROOT: ERROR
@@ -0,0 +1,29 @@
insert into avatars(photo_url)
values ('photoUrl_01'), ('photoUrl_02'), ('photoUrl_03'), ('photoUrl_04'), ('photoUrl_05'),
('photoUrl_06'), ('photoUrl_07'), ('photoUrl_08'), ('photoUrl_09'), ('photoUrl_10');
insert into courses(name)
values ('course_name_01'), ('course_name_02'), ('course_name_03'), ('course_name_04'), ('course_name_05'),
('course_name_06'), ('course_name_07'), ('course_name_08'), ('course_name_09'), ('course_name_10'), ('not_used_11');
insert into otus_students(name, avatar_id)
values ('student_01', 1), ('student_02', 2), ('student_03', 3), ('student_04', 4), ('student_05', 5),
('student_06', 6), ('student_07', 7), ('student_08', 8), ('student_09', 9), ('student_10', 10);
insert into emails(email, student_id)
values ('email_01', 1), ('email_02', 1), ('email_03', 2), ('email_04', 2), ('email_05', 3), ('email_06', 4),
('email_07', 5), ('email_08', 6), ('email_09', 7), ('email_10', 8), ('email_11', 9), ('email_12', 10);
insert into student_courses(student_id, course_id)
values (1, 1), (1, 2), (1, 3),
(2, 2), (2, 4), (2, 5),
(3, 3), (3, 6), (3, 7),
(4, 4), (4, 8), (4, 9),
(5, 5), (5, 10), (5, 1),
(6, 6), (6, 2), (6, 3),
(7, 7), (7, 4), (7, 5),
(8, 8), (8, 6), (8, 7),
(9, 9), (9, 8), (9, 10),
(10, 10), (10, 1), (10, 2);
@@ -0,0 +1,4 @@
.idea/
*.iml
target/
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>jpql-solution-02</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,13 @@
package ru.otus.example.ormdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrmDemoApplication {
public static void main(String[] args) {
SpringApplication.run(OrmDemoApplication.class, args);
}
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "avatars")
public class Avatar {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "photo_url", nullable = false, unique = true)
private String photoUrl;
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "courses")
public class Course {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "name", nullable = false, unique = true)
private String name;
}
@@ -0,0 +1,26 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "emails")
public class EMail {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "email", nullable = false, unique = true)
private String email;
}
@@ -0,0 +1,53 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.CascadeType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.JoinTable;
import jakarta.persistence.ManyToMany;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity // Указывает, что данный класс является сущностью
@Table(name = "otus_students") // Задает имя таблицы, на которую будет отображаться сущность
public class OtusStudent {
@Id // Позволяет указать какое поле является идентификатором
@GeneratedValue(strategy = GenerationType.IDENTITY) // Стратегия генерации идентификаторов
private long id;
// Задает имя и некоторые свойства поля таблицы, на которое будет отображаться поле сущности
@Column(name = "name", nullable = false, unique = true)
private String name;
// Указывает на связь между таблицами "один к одному"
@OneToOne(targetEntity = Avatar.class, cascade = CascadeType.ALL)
// Задает поле, по которому происходит объединение с таблицей для хранения связанной сущности
@JoinColumn(name = "avatar_id")
private Avatar avatar;
// Указывает на связь между таблицами "один ко многим"
@OneToMany(targetEntity = EMail.class, cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JoinColumn(name = "student_id")
private List<EMail> emails;
// Указывает на связь между таблицами "многие ко многим"
@ManyToMany(targetEntity = Course.class, fetch = FetchType.LAZY, cascade = CascadeType.PERSIST)
// Задает таблицу связей между таблицами для хранения родительской и связанной сущностью
@JoinTable(name = "student_courses", joinColumns = @JoinColumn(name = "student_id"),
inverseJoinColumns = @JoinColumn(name = "course_id"))
private List<Course> courses;
}
@@ -0,0 +1,18 @@
package ru.otus.example.ormdemo.repositories;
import ru.otus.example.ormdemo.models.OtusStudent;
import java.util.List;
import java.util.Optional;
public interface OtusStudentRepository {
OtusStudent save(OtusStudent student);
Optional<OtusStudent> findById(long id);
List<OtusStudent> findAll();
List<OtusStudent> findByName(String name);
void updateNameById(long id, String name);
void deleteById(long id);
}
@@ -0,0 +1,69 @@
package ru.otus.example.ormdemo.repositories;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.ormdemo.models.OtusStudent;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.TypedQuery;
import java.util.List;
import java.util.Optional;
// @Transactional должна стоять на методе сервиса.
// Причем, если метод не подразумевает изменения данных в БД то категорически желательно
// выставить у аннотации параметр readOnly в true.
// Но это только упражнение и транзакции мы пока не проходили.
// Поэтому, для упрощения, пока вешаем над классом репозитория
@Transactional
@Repository
public class OtusStudentRepositoryJpa implements OtusStudentRepository {
@PersistenceContext
private final EntityManager em;
public OtusStudentRepositoryJpa(EntityManager em) {
this.em = em;
}
@Override
public OtusStudent save(OtusStudent student) {
if (student.getId() == 0) {
em.persist(student);
return student;
}
return em.merge(student);
}
@Override
public Optional<OtusStudent> findById(long id) {
return Optional.ofNullable(em.find(OtusStudent.class, id));
}
@Override
public List<OtusStudent> findAll() {
return em.createQuery("select s from OtusStudent s", OtusStudent.class)
.getResultList();
}
@Override
public List<OtusStudent> findByName(String name) {
TypedQuery<OtusStudent> query = em.createQuery("select s " +
"from OtusStudent s " +
"where s.name = :name",
OtusStudent.class);
query.setParameter("name", name);
return query.getResultList();
}
@Override
public void updateNameById(long id, String name) {
}
@Override
public void deleteById(long id) {
}
}
@@ -0,0 +1,18 @@
spring:
datasource:
url: jdbc:h2:mem:testdb
sql:
init:
mode: always
jpa:
generate-ddl: false
hibernate:
ddl-auto: none
show-sql: true
logging:
level:
ROOT: ERROR
@@ -0,0 +1,31 @@
create table avatars(
id bigserial,
photo_url varchar(8000),
primary key (id)
);
create table courses(
id bigserial,
name varchar(255),
primary key (id)
);
create table otus_students(
id bigserial,
name varchar(255),
avatar_id bigint references avatars (id),
primary key (id)
);
create table emails(
id bigserial,
student_id bigint references otus_students(id) on delete cascade,
email varchar(255),
primary key (id)
);
create table student_courses(
student_id bigint references otus_students(id) on delete cascade,
course_id bigint references courses(id),
primary key (student_id, course_id)
);
@@ -0,0 +1,126 @@
package ru.otus.example.ormdemo.repositories;
import lombok.val;
import org.hibernate.SessionFactory;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
import org.springframework.context.annotation.Import;
import ru.otus.example.ormdemo.models.OtusStudent;
import ru.otus.example.ormdemo.models.Avatar;
import ru.otus.example.ormdemo.models.Course;
import ru.otus.example.ormdemo.models.EMail;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@DisplayName("Репозиторий на основе Jpa для работы со студентами ")
@DataJpaTest
@Import(OtusStudentRepositoryJpa.class)
class OtusStudentRepositoryJpaTest {
private static final int EXPECTED_NUMBER_OF_STUDENTS = 10;
private static final long FIRST_STUDENT_ID = 1L;
private static final String FIRST_STUDENT_NAME = "student_01";
private static final int EXPECTED_QUERIES_COUNT = 31;
private static final String STUDENT_AVATAR_URL = "где-то там";
private static final String STUDENT_EMAIL = "any@mail.com";
private static final String COURSE_NAME = "Spring";
private static final String STUDENT_NAME = "Вася";
@Autowired
private OtusStudentRepositoryJpa repositoryJpa;
@Autowired
private TestEntityManager em;
@DisplayName(" должен загружать информацию о нужном студенте по его id")
@Test
void shouldFindExpectedStudentById() {
val optionalActualStudent = repositoryJpa.findById(FIRST_STUDENT_ID);
val expectedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(optionalActualStudent).isPresent().get()
.usingRecursiveComparison().isEqualTo(expectedStudent);
}
@DisplayName("должен загружать список всех студентов с полной информацией о них")
@Test
void shouldReturnCorrectStudentsListWithAllInfo() {
SessionFactory sessionFactory = em.getEntityManager().getEntityManagerFactory()
.unwrap(SessionFactory.class);
sessionFactory.getStatistics().setStatisticsEnabled(true);
System.out.println("\n\n\n\n----------------------------------------------------------------------------------------------------------");
val students = repositoryJpa.findAll();
assertThat(students).isNotNull().hasSize(EXPECTED_NUMBER_OF_STUDENTS)
.allMatch(s -> !s.getName().equals(""))
.allMatch(s -> s.getCourses() != null && s.getCourses().size() > 0)
.allMatch(s -> s.getAvatar().getPhotoUrl() != null)
.allMatch(s -> s.getEmails() != null && s.getEmails().size() > 0);
System.out.println("----------------------------------------------------------------------------------------------------------\n\n\n\n");
assertThat(sessionFactory.getStatistics().getPrepareStatementCount()).isEqualTo(EXPECTED_QUERIES_COUNT);
}
@DisplayName(" должен корректно сохранять всю информацию о студенте")
@Test
void shouldSaveAllStudentInfo() {
val avatar = new Avatar(0, STUDENT_AVATAR_URL);
val email = new EMail(0, STUDENT_EMAIL);
val emails = Collections.singletonList(email);
val course = new Course(0, COURSE_NAME);
val courses = Collections.singletonList(course);
val vasya = new OtusStudent(0, STUDENT_NAME, avatar, emails, courses);
repositoryJpa.save(vasya);
assertThat(vasya.getId()).isGreaterThan(0);
val actualStudent = em.find(OtusStudent.class, vasya.getId());
assertThat(actualStudent).isNotNull().matches(s -> !s.getName().equals(""))
.matches(s -> s.getCourses() != null && s.getCourses().size() > 0 && s.getCourses().get(0).getId() > 0)
.matches(s -> s.getAvatar() != null)
.matches(s -> s.getEmails() != null && s.getEmails().size() > 0);
}
@DisplayName(" должен загружать информацию о нужном студенте по его имени")
@Test
void shouldFindExpectedStudentByName() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
List<OtusStudent> students = repositoryJpa.findByName(FIRST_STUDENT_NAME);
assertThat(students).containsOnlyOnce(firstStudent);
}
@DisplayName(" должен изменять имя заданного студента по его id")
@Test
void shouldUpdateStudentNameById() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
String oldName = firstStudent.getName();
em.detach(firstStudent);
repositoryJpa.updateNameById(FIRST_STUDENT_ID, STUDENT_NAME);
val updatedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(updatedStudent.getName()).isNotEqualTo(oldName).isEqualTo(STUDENT_NAME);
}
@DisplayName(" должен удалять заданного студента по его id")
@Test
void shouldDeleteStudentNameById() {
val firstStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(firstStudent).isNotNull();
em.detach(firstStudent);
repositoryJpa.deleteById(FIRST_STUDENT_ID);
val deletedStudent = em.find(OtusStudent.class, FIRST_STUDENT_ID);
assertThat(deletedStudent).isNull();
}
}
@@ -0,0 +1,20 @@
spring:
datasource:
url: jdbc:h2:mem:testdb
sql:
init:
mode: always
jpa:
generate-ddl: false
#generate-ddl: true
hibernate:
ddl-auto: none
#ddl-auto: create-drop
#show-sql: true
logging:
level:
ROOT: ERROR
@@ -0,0 +1,29 @@
insert into avatars(photo_url)
values ('photoUrl_01'), ('photoUrl_02'), ('photoUrl_03'), ('photoUrl_04'), ('photoUrl_05'),
('photoUrl_06'), ('photoUrl_07'), ('photoUrl_08'), ('photoUrl_09'), ('photoUrl_10');
insert into courses(name)
values ('course_name_01'), ('course_name_02'), ('course_name_03'), ('course_name_04'), ('course_name_05'),
('course_name_06'), ('course_name_07'), ('course_name_08'), ('course_name_09'), ('course_name_10'), ('not_used_11');
insert into otus_students(name, avatar_id)
values ('student_01', 1), ('student_02', 2), ('student_03', 3), ('student_04', 4), ('student_05', 5),
('student_06', 6), ('student_07', 7), ('student_08', 8), ('student_09', 9), ('student_10', 10);
insert into emails(email, student_id)
values ('email_01', 1), ('email_02', 1), ('email_03', 2), ('email_04', 2), ('email_05', 3), ('email_06', 4),
('email_07', 5), ('email_08', 6), ('email_09', 7), ('email_10', 8), ('email_11', 9), ('email_12', 10);
insert into student_courses(student_id, course_id)
values (1, 1), (1, 2), (1, 3),
(2, 2), (2, 4), (2, 5),
(3, 3), (3, 6), (3, 7),
(4, 4), (4, 8), (4, 9),
(5, 5), (5, 10), (5, 1),
(6, 6), (6, 2), (6, 3),
(7, 7), (7, 4), (7, 5),
(8, 8), (8, 6), (8, 7),
(9, 9), (9, 8), (9, 10),
(10, 10), (10, 1), (10, 2);
@@ -0,0 +1,4 @@
.idea/
*.iml
target/
@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.otus</groupId>
<artifactId>jpql-solution-03</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,13 @@
package ru.otus.example.ormdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrmDemoApplication {
public static void main(String[] args) {
SpringApplication.run(OrmDemoApplication.class, args);
}
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "avatars")
public class Avatar {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "photo_url", nullable = false, unique = true)
private String photoUrl;
}
@@ -0,0 +1,25 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "courses")
public class Course {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "name", nullable = false, unique = true)
private String name;
}
@@ -0,0 +1,26 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "emails")
public class EMail {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@Column(name = "email", nullable = false, unique = true)
private String email;
}
@@ -0,0 +1,53 @@
package ru.otus.example.ormdemo.models;
import jakarta.persistence.CascadeType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.JoinTable;
import jakarta.persistence.ManyToMany;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity // Указывает, что данный класс является сущностью
@Table(name = "otus_students") // Задает имя таблицы, на которую будет отображаться сущность
public class OtusStudent {
@Id // Позволяет указать какое поле является идентификатором
@GeneratedValue(strategy = GenerationType.IDENTITY) // Стратегия генерации идентификаторов
private long id;
// Задает имя и некоторые свойства поля таблицы, на которое будет отображаться поле сущности
@Column(name = "name", nullable = false, unique = true)
private String name;
// Указывает на связь между таблицами "один к одному"
@OneToOne(targetEntity = Avatar.class, cascade = CascadeType.ALL)
// Задает поле, по которому происходит объединение с таблицей для хранения связанной сущности
@JoinColumn(name = "avatar_id")
private Avatar avatar;
// Указывает на связь между таблицами "один ко многим"
@OneToMany(targetEntity = EMail.class, cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JoinColumn(name = "student_id")
private List<EMail> emails;
// Указывает на связь между таблицами "многие ко многим"
@ManyToMany(targetEntity = Course.class, fetch = FetchType.LAZY, cascade = CascadeType.PERSIST)
// Задает таблицу связей между таблицами для хранения родительской и связанной сущностью
@JoinTable(name = "student_courses", joinColumns = @JoinColumn(name = "student_id"),
inverseJoinColumns = @JoinColumn(name = "course_id"))
private List<Course> courses;
}
@@ -0,0 +1,18 @@
package ru.otus.example.ormdemo.repositories;
import ru.otus.example.ormdemo.models.OtusStudent;
import java.util.List;
import java.util.Optional;
public interface OtusStudentRepository {
OtusStudent save(OtusStudent student);
Optional<OtusStudent> findById(long id);
List<OtusStudent> findAll();
List<OtusStudent> findByName(String name);
void updateNameById(long id, String name);
void deleteById(long id);
}
@@ -0,0 +1,77 @@
package ru.otus.example.ormdemo.repositories;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.ormdemo.models.OtusStudent;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.Query;
import jakarta.persistence.TypedQuery;
import java.util.List;
import java.util.Optional;
// @Transactional должна стоять на методе сервиса.
// Причем, если метод не подразумевает изменения данных в БД то категорически желательно
// выставить у аннотации параметр readOnly в true.
// Но это только упражнение и транзакции мы пока не проходили.
// Поэтому, для упрощения, пока вешаем над классом репозитория
@Transactional
@Repository
public class OtusStudentRepositoryJpa implements OtusStudentRepository {
@PersistenceContext
private EntityManager em;
@Override
public OtusStudent save(OtusStudent student) {
if (student.getId() == 0) {
em.persist(student);
return student;
}
return em.merge(student);
}
@Override
public Optional<OtusStudent> findById(long id) {
return Optional.ofNullable(em.find(OtusStudent.class, id));
}
@Override
public List<OtusStudent> findAll() {
return em.createQuery("select s from OtusStudent s", OtusStudent.class)
.getResultList();
}
@Override
public List<OtusStudent> findByName(String name) {
TypedQuery<OtusStudent> query = em.createQuery("select s " +
"from OtusStudent s " +
"where s.name = :name",
OtusStudent.class);
query.setParameter("name", name);
return query.getResultList();
}
// Только для примера, в реальности JPQL лучше использовать только для массовых операций
@Override
public void updateNameById(long id, String name) {
Query query = em.createQuery("update OtusStudent s " +
"set s.name = :name " +
"where s.id = :id");
query.setParameter("name", name);
query.setParameter("id", id);
query.executeUpdate();
}
// Только для примера, в реальности JPQL лучше использовать только для массовых операций
@Override
public void deleteById(long id) {
Query query = em.createQuery("delete " +
"from OtusStudent s " +
"where s.id = :id");
query.setParameter("id", id);
query.executeUpdate();
}
}

Some files were not shown because too many files have changed in this diff Show More