diff --git a/2024-05/spring-27-integrations-channels/.gitignore b/2024-05/spring-27-integrations-channels/.gitignore new file mode 100644 index 00000000..549e00a2 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/pom.xml b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/pom.xml new file mode 100644 index 00000000..f3095768 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + ru.otus + integrations-channels-exercise + 1.0-SNAPSHOT + + + ru.otus + integrations-channels + 1.0 + + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework + spring-messaging + + + + org.springframework.boot + spring-boot-starter-test + + + + org.projectlombok + lombok + provided + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/App.java b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/App.java new file mode 100644 index 00000000..f5a7725d --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/App.java @@ -0,0 +1,17 @@ +package ru.otus.spring.integration; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.integration.annotation.IntegrationComponentScan; + +import lombok.extern.slf4j.Slf4j; + +@SpringBootApplication +@IntegrationComponentScan +@Slf4j +public class App { + + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/AppRunner.java b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/AppRunner.java new file mode 100644 index 00000000..9841e1ca --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/AppRunner.java @@ -0,0 +1,63 @@ +package ru.otus.spring.integration; + +import static java.util.Objects.isNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class AppRunner implements CommandLineRunner { + final PollableChannel queueChannel; + final SubscribableChannel subscribableDirectChannel; + + @Override + public void run(String... args) throws Exception { + log.warn("INIT"); + for (int i = 0; i < 10; i++) { + queueChannel.send(MessageBuilder.withPayload("Start " + i).build()); + } + log.warn("INIT FINISH"); + Thread.sleep(5000); + + log.warn("START"); + + subscribableDirectChannel.subscribe((msg) -> log.warn("Receive msg: {}", msg)); + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleWithFixedDelay(() -> { + log.warn("I am here!!!"); + Message receivedMessage = queueChannel.receive(5000); + if (isNull(receivedMessage)) { + return; + } + subscribableDirectChannel.send(receivedMessage); + }, 100, 300, TimeUnit.MILLISECONDS); + log.warn("START FINISH"); + + log.warn(""); + queueChannel.send(MessageBuilder.withPayload("Hello").build()); + log.warn(""); + queueChannel.send(MessageBuilder.withPayload("Hello2").build()); + + Thread.sleep(2_000); + + log.warn(""); + queueChannel.send(MessageBuilder.withPayload("Hello3").build()); + + Thread.sleep(3_000); + executor.shutdown(); + } +} diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/IntegrationConfig.java b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/IntegrationConfig.java new file mode 100644 index 00000000..aeffcc34 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/java/ru/otus/spring/integration/IntegrationConfig.java @@ -0,0 +1,23 @@ +package ru.otus.spring.integration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.dsl.DirectChannelSpec; +import org.springframework.integration.dsl.MessageChannels; +import org.springframework.messaging.PollableChannel; + +@Configuration +public class IntegrationConfig { + + @Bean + public PollableChannel queueChannel() { + return new QueueChannel(100); + } + + @Bean + public DirectChannelSpec subscribableDirectChannel() { + return MessageChannels.direct("subscribableDirectChannel"); + } + +} diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/resources/application.yml b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/resources/application.yml new file mode 100644 index 00000000..1a9aeb48 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/main/resources/application.yml @@ -0,0 +1,5 @@ +logging: + level: + root: info + + org.springframework.integration: debug \ No newline at end of file diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/test/java/ru/otus/spring/integration/MessagesTest.java b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/test/java/ru/otus/spring/integration/MessagesTest.java new file mode 100644 index 00000000..fd9a0571 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/test/java/ru/otus/spring/integration/MessagesTest.java @@ -0,0 +1,98 @@ +package ru.otus.spring.integration; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; + + +@SuppressWarnings("all") +public class MessagesTest { + + @Test + public void testCreateSimpleGenericMessage() { + // TODO: Создайте сообщение с payload-ом "Hello" с помощью конструктора + Message message = null; + + assertNotNull(message); + assertEquals(GenericMessage.class, message.getClass()); + assertNotNull(message.getPayload()); + assertEquals("Hello", message.getPayload()); + } + + @Test + public void testCreateGenericMessage() { + // TODO: Создайте сообщение с пользователем с помощью конструктора + Message message = null; + + assertNotNull(message); + assertEquals(GenericMessage.class, message.getClass()); + assertNotNull(message.getPayload()); + assertEquals(new User("John", 23), message.getPayload()); + } + + @Test + public void testGenericMessageWithHeaders() { + // TODO: Создайте сообщение с payload-ом "Hello" и header-ом "to":"World" + Map headers = null; + Message message = null; + + assertNotNull(message); + assertEquals("Hello", message.getPayload()); + assertEquals("World", message.getHeaders().get("to", String.class)); + } + + @Test + public void testGenericMessageWithMessageHeaders() { + // TODO: Создайте сообщение с payload-ом "Hello" и header-ом "to":"World" + MessageHeaders headers = null; + Message message = null; + + assertNotNull(message); + assertEquals("Hello", message.getPayload()); + assertEquals("World", message.getHeaders().get("to", String.class)); + } + + @Test + public void testErrorMessage() { + // TODO: Создайте сообщение об ошибки с объектом NullPointerException внутри + Message errorMessage = null; + + assertNotNull(errorMessage); + assertEquals(ErrorMessage.class, errorMessage.getClass()); + assertEquals(NullPointerException.class, errorMessage.getPayload().getClass()); + } + + @Test + public void testMessageBuilder() { + // TODO: Создайте сообщение с payload-ом "Hello" и header-ом "to":"World" с помощью MessageBuilder + Message message = null; + + assertNotNull(message); + assertEquals("Hello", message.getPayload()); + assertEquals("World", message.getHeaders().get("to", String.class)); + } + + @Test + public void testBuildFromMessage() { + Message original = MessageBuilder + .withPayload(new User("Kate", 30)) + .setHeader("processor", "userService") + .build(); + + // TODO: Создайте новое сообщение с теми же payload и header-ами c помощью MessageBuilder + Message newMessage = null; + + assertNotNull(newMessage); + assertEquals(original.getPayload(), newMessage.getPayload()); + assertEquals(original.getHeaders().get("processor"), newMessage.getHeaders().get("processor")); + } +} diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/test/java/ru/otus/spring/integration/User.java b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/test/java/ru/otus/spring/integration/User.java new file mode 100644 index 00000000..a423dd62 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-exercise/src/test/java/ru/otus/spring/integration/User.java @@ -0,0 +1,35 @@ +package ru.otus.spring.integration; + +import java.util.Objects; + +public class User { + + private final String name; + private final int age; + + public User(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public int getAge() { + return age; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof User user)) return false; + return age == user.age && + Objects.equals(name, user.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, age); + } +} diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-solution/pom.xml b/2024-05/spring-27-integrations-channels/integrations-channels-solution/pom.xml new file mode 100644 index 00000000..90d25b5e --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-solution/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + + ru.otus + integrations-channels-solution + 1.0-SNAPSHOT + + + ru.otus + integrations-channels + 1.0 + + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework + spring-messaging + + + + org.springframework.boot + spring-boot-starter-test + + + org.projectlombok + lombok + provided + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/main/java/ru/otus/spring/integration/App.java b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/main/java/ru/otus/spring/integration/App.java new file mode 100644 index 00000000..4a3c1647 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/main/java/ru/otus/spring/integration/App.java @@ -0,0 +1,80 @@ +package ru.otus.spring.integration; + +import static java.util.Objects.isNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.dsl.DirectChannelSpec; +import org.springframework.integration.dsl.MessageChannels; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.SubscribableChannel; + +import lombok.extern.slf4j.Slf4j; + +@SpringBootApplication +@IntegrationComponentScan +@Slf4j +public class App { + + public static void main(String[] args) throws InterruptedException { + ConfigurableApplicationContext ctx = SpringApplication.run(App.class, args); + + PollableChannel queueChannel = ctx.getBean("queueChannel", PollableChannel.class); + SubscribableChannel subscribableDirectChannel = ctx.getBean("subscribableDirectChannel", SubscribableChannel.class); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleWithFixedDelay(() -> { + log.warn("I am here!!!"); + Message receivedMessage = queueChannel.receive(5000); + if (isNull(receivedMessage)) { + return; + } + subscribableDirectChannel.send(receivedMessage); + }, 100, 300, TimeUnit.MILLISECONDS); + subscribableDirectChannel.subscribe((msg) -> log.warn("Receive msg: {}", msg)); + + + log.warn("INIT"); + for (int i = 0; i < 10; i++) { + queueChannel.send(MessageBuilder.withPayload("Start " + i).build()); + } + log.warn("INIT FINISH"); + Thread.sleep(5000); + + log.warn("START"); + + log.warn("START FINISH"); + + log.warn(""); + queueChannel.send(MessageBuilder.withPayload("Hello").build()); + log.warn(""); + queueChannel.send(MessageBuilder.withPayload("Hello2").build()); + + Thread.sleep(2_000); + + log.warn(""); + queueChannel.send(MessageBuilder.withPayload("Hello3").build()); + + Thread.sleep(3_000); + executor.shutdown(); + } + + @Bean + public PollableChannel queueChannel() { + return new QueueChannel(9); + } + + @Bean + public DirectChannelSpec subscribableDirectChannel() { + return MessageChannels.direct("subscribableDirectChannel"); + } +} \ No newline at end of file diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/main/resources/application.yml b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/main/resources/application.yml new file mode 100644 index 00000000..1a9aeb48 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/main/resources/application.yml @@ -0,0 +1,5 @@ +logging: + level: + root: info + + org.springframework.integration: debug \ No newline at end of file diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/test/java/ru/otus/spring/integration/MessagesTest.java b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/test/java/ru/otus/spring/integration/MessagesTest.java new file mode 100644 index 00000000..1fcaf28b --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/test/java/ru/otus/spring/integration/MessagesTest.java @@ -0,0 +1,101 @@ +package ru.otus.spring.integration; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Collections; +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; + + +@SuppressWarnings("all") +public class MessagesTest { + + @Test + public void testCreateSimpleGenericMessage() { + // TODO: Создайте сообщение с payload-ом "Hello" с помощью конструктора + Message message = new GenericMessage<>("Hello"); + + assertNotNull(message); + assertEquals(GenericMessage.class, message.getClass()); + assertNotNull(message.getPayload()); + assertEquals("Hello", message.getPayload()); + } + + @Test + public void testCreateGenericMessage() { + // TODO: Создайте сообщение с пользователем с помощью конструктора + Message message = new GenericMessage<>(new User("John", 23)); + + assertNotNull(message); + assertEquals(GenericMessage.class, message.getClass()); + assertNotNull(message.getPayload()); + assertEquals(new User("John", 23), message.getPayload()); + } + + @Test + public void testGenericMessageWithHeaders() { + // TODO: Создайте сообщение с payload-ом "Hello" и header-ом "to":"World" + Map headers = Collections.singletonMap("to", "World"); + Message message = new GenericMessage<>("Hello", headers); + + assertNotNull(message); + assertEquals("Hello", message.getPayload()); + assertEquals("World", message.getHeaders().get("to", String.class)); + } + + @Test + public void testGenericMessageWithMessageHeaders() { + // TODO: Создайте сообщение с payload-ом "Hello" и header-ом "to":"World" + MessageHeaders headers = new MessageHeaders(Collections.singletonMap("to", "World")); + Message message = new GenericMessage<>("Hello", headers); + + assertNotNull(message); + assertEquals("Hello", message.getPayload()); + assertEquals("World", message.getHeaders().get("to", String.class)); + } + + @Test + public void testErrorMessage() { + // TODO: Создайте сообщение об ошибки с объектом NullPointerException внутри + Message errorMessage = new ErrorMessage(new NullPointerException()); + + assertNotNull(errorMessage); + assertEquals(ErrorMessage.class, errorMessage.getClass()); + assertEquals(NullPointerException.class, errorMessage.getPayload().getClass()); + } + + @Test + public void testMessageBuilder() { + // TODO: Создайте сообщение с payload-ом "Hello" и header-ом "to":"World" с помощью MessageBuilder + Message message = MessageBuilder.withPayload("Hello") + .setHeader("to", "World") + .build(); + + assertNotNull(message); + assertEquals("Hello", message.getPayload()); + assertEquals("World", message.getHeaders().get("to", String.class)); + } + + @Test + public void testBuildFromMessage() { + Message original = MessageBuilder + .withPayload(new User("Kate", 30)) + .setHeader("processor", "userService") + .build(); + + // TODO: Создайте новое сообщение с теми же payload и header-ами c помощью MessageBuilder + Message newMessage = MessageBuilder.fromMessage(original).build(); + + assertNotNull(newMessage); + assertEquals(original.getPayload(), newMessage.getPayload()); + assertEquals(original.getHeaders().get("processor"), newMessage.getHeaders().get("processor")); + } +} diff --git a/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/test/java/ru/otus/spring/integration/User.java b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/test/java/ru/otus/spring/integration/User.java new file mode 100644 index 00000000..a423dd62 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/integrations-channels-solution/src/test/java/ru/otus/spring/integration/User.java @@ -0,0 +1,35 @@ +package ru.otus.spring.integration; + +import java.util.Objects; + +public class User { + + private final String name; + private final int age; + + public User(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public int getAge() { + return age; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof User user)) return false; + return age == user.age && + Objects.equals(name, user.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, age); + } +} diff --git a/2024-05/spring-27-integrations-channels/pom.xml b/2024-05/spring-27-integrations-channels/pom.xml new file mode 100644 index 00000000..3e8dfde7 --- /dev/null +++ b/2024-05/spring-27-integrations-channels/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.3.3 + + + ru.otus + integrations-channels + 1.0 + + pom + + + integrations-channels-exercise + integrations-channels-solution + + + + 17 + 17 + UTF-8 + +