2022-11 spring-31 examples

This commit is contained in:
Vladimir Ivanov
2023-04-07 10:33:03 +03:00
parent 21a05ed909
commit 39e7d442f2
4 changed files with 303 additions and 0 deletions
@@ -0,0 +1,52 @@
package ru.otus.spring.test.bridge;
import java.util.Collection;
import java.util.Map;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import lombok.extern.slf4j.Slf4j;
@SpringBootApplication
@IntegrationComponentScan
@Slf4j
public class BridgeApp {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = SpringApplication.run(BridgeApp.class, args);
Map<String, MessageChannel> channels = ctx.getBeansOfType(MessageChannel.class);
log.warn("CHANNELS:");
int i = 0;
for (Map.Entry<String, MessageChannel> entry : channels.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
log.warn("HANDLERS:");
i = 0;
Map<String, MessageHandler> endpoints = ctx.getBeansOfType(MessageHandler.class);
for (Map.Entry<String, MessageHandler> entry : endpoints.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
}
@MessagingGateway
public interface Bridge {
@Gateway(requestChannel = "flow.input")
Collection<String> send(Collection<String> strings);
}
@Bean
public IntegrationFlow flow() {
return f -> f
.channel(MessageChannels.queue("p2pChannel", 10));
}
}
@@ -0,0 +1,70 @@
package ru.otus.spring.test.gateway;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import lombok.extern.slf4j.Slf4j;
@SpringBootApplication
@IntegrationComponentScan
@Slf4j
public class GatewayApp {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = SpringApplication.run(GatewayApp.class, args);
Map<String, MessageChannel> channels = ctx.getBeansOfType(MessageChannel.class);
log.warn("CHANNELS:");
int i = 0;
for (Map.Entry<String, MessageChannel> entry : channels.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
log.warn("HANDLERS:");
i = 0;
Map<String, MessageHandler> endpoints = ctx.getBeansOfType(MessageHandler.class);
for (Map.Entry<String, MessageHandler> entry : endpoints.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
Upcase upcase = ctx.getBean(Upcase.class);
Collection<String> result = upcase.upcase(Arrays.asList("test", "new", "last"));
log.warn("Upcase result: {}", result);
}
@MessagingGateway
public interface Upcase {
@Gateway(requestChannel = "upcase.input")
Collection<String> upcase(Collection<String> strings);
}
@Bean
public IntegrationFlow upcase() {
return f -> f
.split()
// .split(list -> list.get().spliterator())
// .split(getCustomSplitter(), "split")
.<String, String>transform(String::toUpperCase)
.aggregate();
}
//
// @Bean
// CustomSplitter getCustomSplitter() {
// return new CustomSplitter();
// }
// public class CustomSplitter {
// public Collection<String> split(Message<Collection<String>> message) {
// return message.getPayload().stream().skip(1).collect(Collectors.toList());
// }
// }
}
@@ -0,0 +1,90 @@
package ru.otus.spring.test.polling;
import java.util.Map;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.support.PeriodicTrigger;
import lombok.extern.slf4j.Slf4j;
@SpringBootApplication
@IntegrationComponentScan
@Slf4j
public class PollingApp {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = SpringApplication.run(PollingApp.class, args);
log.warn("POLLER:");
Map<String, PollingConsumer> pollers = ctx.getBeansOfType(PollingConsumer.class);
int i = 0;
for (Map.Entry<String, PollingConsumer> entry : pollers.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
log.warn("CHANNELS:");
Map<String, MessageChannel> channels = ctx.getBeansOfType(MessageChannel.class);
i = 0;
for (Map.Entry<String, MessageChannel> entry : channels.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
log.warn("HANDLERS:");
i = 0;
Map<String, MessageHandler> endpoints = ctx.getBeansOfType(MessageHandler.class);
for (Map.Entry<String, MessageHandler> entry : endpoints.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
Polling polling = ctx.getBean(Polling.class);
String result = polling.send("test");
log.warn("Polling result: {}", result);
}
@MessagingGateway
public interface Polling {
@Gateway(requestChannel = "flow.input", replyChannel = "pubSub")
String send(String value);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
// return Pollers.fixedRate(10_000).get();
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public IntegrationFlow flow() {
return f -> f
.channel("p2p")
.channel("pubSub");
}
@Bean
public PollableChannel p2p() {
return MessageChannels.queue("p2p", 10).get();
}
@Bean
public PollableChannel p2p2() {
return MessageChannels.priority("p2p2").capacity(10).get();
}
@Bean
public SubscribableChannel pubSub() {
return MessageChannels.publishSubscribe("pubSub").get();
}
}
@@ -0,0 +1,91 @@
package ru.otus.spring.test.transform;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Transformers;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import lombok.extern.slf4j.Slf4j;
@SpringBootApplication
@IntegrationComponentScan
@Slf4j
public class TransformApp {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = SpringApplication.run(TransformApp.class, args);
Map<String, MessageChannel> channels = ctx.getBeansOfType(MessageChannel.class);
log.warn("CHANNELS:");
int i = 0;
for (Map.Entry<String, MessageChannel> entry : channels.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
log.warn("HANDLERS:");
i = 0;
Map<String, MessageHandler> endpoints = ctx.getBeansOfType(MessageHandler.class);
for (Map.Entry<String, MessageHandler> entry : endpoints.entrySet()) {
log.warn("{}. {} -> {}", ++i, entry.getKey(), entry.getValue());
}
Upcase upcase = ctx.getBean(Upcase.class);
Collection<Item> result = upcase.upcase(Arrays.asList(new Item("test"), new Item("new"), new Item("last")));
// Collection<String> result = upcase.upcase(Arrays.asList("test", "new", "last"));
log.warn("Upcase result: {}", result);
}
public static class Item {
public Item() {
}
public Item(String value) {
this.value = value;
}
String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Item{" +
"value='" + value + '\'' +
'}';
}
}
@MessagingGateway
public interface Upcase {
@Gateway(requestChannel = "upcase.input")
Collection<Item> upcase(Collection<Item> strings);
}
@Bean
public IntegrationFlow upcase() {
return f -> f
.split()
.transform(Transformers.toMap())
.<Map<String, String>, Map<String, String>>transform(map -> {
map.replaceAll((k, v) -> v.toUpperCase());
return map;
})
.transform(Transformers.fromMap(Item.class))
.aggregate();
}
}