From 91021e6c1e5f75ca94c0ba102ad905daa8b538c4 Mon Sep 17 00:00:00 2001 From: kataus Date: Wed, 4 Dec 2019 18:28:48 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A4=D0=B8=D0=BD=D0=B0=D0=BB=D1=8C=D0=BD?= =?UTF-8?q?=D0=B0=D1=8F=20=D0=B2=D0=B5=D1=80=D1=81=D0=B8=D1=8F=20=D0=BF?= =?UTF-8?q?=D1=80=D0=B8=D0=BC=D0=B5=D1=80=D0=BE=D0=B2=20=D0=BD=D0=B0=20?= =?UTF-8?q?=D0=B7=D0=B0=D0=BD=D1=8F=D1=82=D0=B8=D0=B5=2026=20(SI:=20Endpoi?= =?UTF-8?q?nts=20=D0=B8=20Flow=20Components)=20=D0=B4=D0=BB=D1=8F=20=D0=B3?= =?UTF-8?q?=D1=80=D1=83=D0=BF=D0=BF=D1=8B=202019-08?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/ru/otus/spring/integration/App.java | 43 +++++++++++++------ .../java/ru/otus/spring/integration/Cafe.java | 7 ++- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java index 2ec8b58e..a0319eeb 100644 --- a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java +++ b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java @@ -7,12 +7,14 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.channel.MessageChannels; +import org.springframework.integration.scheduling.PollerMetadata; import ru.otus.spring.integration.domain.Food; import ru.otus.spring.integration.domain.OrderItem; @@ -29,25 +31,29 @@ import java.util.stream.Collectors; public class App { private static final String[] MENU = { "coffee", "tea", "smoothie", "whiskey", "beer", "cola", "water" }; + @Bean - public DirectChannel itemsChannel() { - return MessageChannels.direct().datatype( OrderItem.class ).get(); + public QueueChannel itemsChannel() { + return MessageChannels.queue( 10 ).get(); } @Bean public PublishSubscribeChannel foodChannel() { - return MessageChannels.publishSubscribe().datatype( Food.class ).get(); + return MessageChannels.publishSubscribe().get(); } - // TODO: create default poller + @Bean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata poller() { + return Pollers.fixedRate( 100 ).maxMessagesPerPoll( 2 ).get(); + } @Bean public IntegrationFlow cafeFlow() { return IntegrationFlows.from( "itemsChannel" ) - // TODO: cook OrderItem in the kitchen - // TODO*: add router and subflows to process iced and usual items - // TODO*: add splitter and aggregator - // TODO: forward it to the publish subscriber channel + .split() + .handle( "kitchenService", "cook" ) + .aggregate() + .channel( "foodChannel" ) .get(); } @@ -60,14 +66,25 @@ public class App { while ( true ) { Thread.sleep( 1000 ); - OrderItem items = generateOrderItem(); + Collection items = generateOrderItems(); System.out.println( "New orderItems: " + - items.getItemName() ); - Food food = cafe.process( items ); - System.out.println( "Ready food: " + food.getName() ); + items.stream().map( OrderItem::getItemName ) + .collect( Collectors.joining( "," ) ) ); + Collection food = cafe.process( items ); + System.out.println( "Ready food: " + food.stream() + .map( Food::getName ) + .collect( Collectors.joining( "," ) ) ); } } + private static Collection generateOrderItems() { + List items = new ArrayList<>(); + for ( int i = 0; i < RandomUtils.nextInt( 1, 5 ); ++ i ) { + items.add( generateOrderItem() ); + } + return items; + } + private static OrderItem generateOrderItem() { return new OrderItem( MENU[ RandomUtils.nextInt( 0, MENU.length ) ] ); } diff --git a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java index 180de97b..cec8f49a 100644 --- a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java +++ b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java @@ -1,13 +1,18 @@ package ru.otus.spring.integration; + +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.MessagingGateway; import ru.otus.spring.integration.domain.Food; import ru.otus.spring.integration.domain.OrderItem; import java.util.Collection; // TODO: add messaging gateway annotation +@MessagingGateway public interface Cafe { // TODO: add gateway annotation with required channels - Food process( OrderItem orderItem); + @Gateway(requestChannel = "itemsChannel", replyChannel = "foodChannel") + Collection process( Collection orderItem); }