diff --git a/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/bridge/BridgeApp.java b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/bridge/BridgeApp.java new file mode 100644 index 00000000..da5501fe --- /dev/null +++ b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/bridge/BridgeApp.java @@ -0,0 +1,52 @@ +package ru.otus.spring.test.bridge; + +import java.util.Collection; +import java.util.Map; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.MessageChannels; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import lombok.extern.slf4j.Slf4j; + +@SpringBootApplication +@IntegrationComponentScan +@Slf4j +public class BridgeApp { + public static void main(String[] args) { + ConfigurableApplicationContext ctx = SpringApplication.run(BridgeApp.class, args); + Map channels = ctx.getBeansOfType(MessageChannel.class); + log.warn("CHANNELS:"); + int i = 0; + for (Map.Entry entry : channels.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + log.warn("HANDLERS:"); + i = 0; + Map endpoints = ctx.getBeansOfType(MessageHandler.class); + for (Map.Entry entry : endpoints.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + + } + + @MessagingGateway + public interface Bridge { + @Gateway(requestChannel = "flow.input") + Collection send(Collection strings); + } + + @Bean + public IntegrationFlow flow() { + return f -> f + .channel(MessageChannels.queue("p2pChannel", 10)); + } +} diff --git a/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/gateway/GatewayApp.java b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/gateway/GatewayApp.java new file mode 100644 index 00000000..5b217c48 --- /dev/null +++ b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/gateway/GatewayApp.java @@ -0,0 +1,70 @@ +package ru.otus.spring.test.gateway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import lombok.extern.slf4j.Slf4j; + +@SpringBootApplication +@IntegrationComponentScan +@Slf4j +public class GatewayApp { + public static void main(String[] args) { + ConfigurableApplicationContext ctx = SpringApplication.run(GatewayApp.class, args); + Map channels = ctx.getBeansOfType(MessageChannel.class); + log.warn("CHANNELS:"); + int i = 0; + for (Map.Entry entry : channels.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + log.warn("HANDLERS:"); + i = 0; + Map endpoints = ctx.getBeansOfType(MessageHandler.class); + for (Map.Entry entry : endpoints.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + Upcase upcase = ctx.getBean(Upcase.class); + Collection result = upcase.upcase(Arrays.asList("test", "new", "last")); + log.warn("Upcase result: {}", result); + + } + + @MessagingGateway + public interface Upcase { + @Gateway(requestChannel = "upcase.input") + Collection upcase(Collection strings); + } + + @Bean + public IntegrationFlow upcase() { + return f -> f + .split() +// .split(list -> list.get().spliterator()) +// .split(getCustomSplitter(), "split") + .transform(String::toUpperCase) + .aggregate(); + } +// +// @Bean +// CustomSplitter getCustomSplitter() { +// return new CustomSplitter(); +// } +// public class CustomSplitter { +// public Collection split(Message> message) { +// return message.getPayload().stream().skip(1).collect(Collectors.toList()); +// } +// } + +} diff --git a/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/polling/PollingApp.java b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/polling/PollingApp.java new file mode 100644 index 00000000..45535c7a --- /dev/null +++ b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/polling/PollingApp.java @@ -0,0 +1,90 @@ +package ru.otus.spring.test.polling; + +import java.util.Map; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.MessageChannels; +import org.springframework.integration.endpoint.PollingConsumer; +import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.scheduling.support.PeriodicTrigger; + +import lombok.extern.slf4j.Slf4j; + +@SpringBootApplication +@IntegrationComponentScan +@Slf4j +public class PollingApp { + public static void main(String[] args) { + ConfigurableApplicationContext ctx = SpringApplication.run(PollingApp.class, args); + log.warn("POLLER:"); + Map pollers = ctx.getBeansOfType(PollingConsumer.class); + int i = 0; + for (Map.Entry entry : pollers.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + log.warn("CHANNELS:"); + Map channels = ctx.getBeansOfType(MessageChannel.class); + i = 0; + for (Map.Entry entry : channels.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + log.warn("HANDLERS:"); + i = 0; + Map endpoints = ctx.getBeansOfType(MessageHandler.class); + for (Map.Entry entry : endpoints.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + + Polling polling = ctx.getBean(Polling.class); + String result = polling.send("test"); + log.warn("Polling result: {}", result); + } + + @MessagingGateway + public interface Polling { + @Gateway(requestChannel = "flow.input", replyChannel = "pubSub") + String send(String value); + } + + @Bean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata defaultPoller() { +// return Pollers.fixedRate(10_000).get(); + PollerMetadata pollerMetadata = new PollerMetadata(); + pollerMetadata.setMaxMessagesPerPoll(5); + pollerMetadata.setTrigger(new PeriodicTrigger(3000)); + return pollerMetadata; + } + + @Bean + public IntegrationFlow flow() { + return f -> f + .channel("p2p") + .channel("pubSub"); + } + + @Bean + public PollableChannel p2p() { + return MessageChannels.queue("p2p", 10).get(); + } + + @Bean + public PollableChannel p2p2() { + return MessageChannels.priority("p2p2").capacity(10).get(); + } + + @Bean + public SubscribableChannel pubSub() { + return MessageChannels.publishSubscribe("pubSub").get(); + } +} diff --git a/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/transform/TransformApp.java b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/transform/TransformApp.java new file mode 100644 index 00000000..2525bb15 --- /dev/null +++ b/2022-11/spring-31/spring-31-solution/src/main/java/ru/otus/spring/test/transform/TransformApp.java @@ -0,0 +1,91 @@ +package ru.otus.spring.test.transform; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.Transformers; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import lombok.extern.slf4j.Slf4j; + +@SpringBootApplication +@IntegrationComponentScan +@Slf4j +public class TransformApp { + public static void main(String[] args) { + ConfigurableApplicationContext ctx = SpringApplication.run(TransformApp.class, args); + Map channels = ctx.getBeansOfType(MessageChannel.class); + log.warn("CHANNELS:"); + int i = 0; + for (Map.Entry entry : channels.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + log.warn("HANDLERS:"); + i = 0; + Map endpoints = ctx.getBeansOfType(MessageHandler.class); + for (Map.Entry entry : endpoints.entrySet()) { + log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue()); + } + Upcase upcase = ctx.getBean(Upcase.class); + Collection result = upcase.upcase(Arrays.asList(new Item("test"), new Item("new"), new Item("last"))); +// Collection result = upcase.upcase(Arrays.asList("test", "new", "last")); + log.warn("Upcase result: {}", result); + + } + + public static class Item { + public Item() { + } + + public Item(String value) { + this.value = value; + } + + String value; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public String toString() { + return "Item{" + + "value='" + value + '\'' + + '}'; + } + } + + @MessagingGateway + public interface Upcase { + @Gateway(requestChannel = "upcase.input") + Collection upcase(Collection strings); + } + + @Bean + public IntegrationFlow upcase() { + return f -> f + .split() + .transform(Transformers.toMap()) + ., Map>transform(map -> { + map.replaceAll((k, v) -> v.toUpperCase()); + return map; + }) + .transform(Transformers.fromMap(Item.class)) + .aggregate(); + } + +}