diff --git a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java index 2ec8b58e..a0319eeb 100644 --- a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java +++ b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/App.java @@ -7,12 +7,14 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.channel.MessageChannels; +import org.springframework.integration.scheduling.PollerMetadata; import ru.otus.spring.integration.domain.Food; import ru.otus.spring.integration.domain.OrderItem; @@ -29,25 +31,29 @@ import java.util.stream.Collectors; public class App { private static final String[] MENU = { "coffee", "tea", "smoothie", "whiskey", "beer", "cola", "water" }; + @Bean - public DirectChannel itemsChannel() { - return MessageChannels.direct().datatype( OrderItem.class ).get(); + public QueueChannel itemsChannel() { + return MessageChannels.queue( 10 ).get(); } @Bean public PublishSubscribeChannel foodChannel() { - return MessageChannels.publishSubscribe().datatype( Food.class ).get(); + return MessageChannels.publishSubscribe().get(); } - // TODO: create default poller + @Bean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata poller() { + return Pollers.fixedRate( 100 ).maxMessagesPerPoll( 2 ).get(); + } @Bean public IntegrationFlow cafeFlow() { return IntegrationFlows.from( "itemsChannel" ) - // TODO: cook OrderItem in the kitchen - // TODO*: add router and subflows to process iced and usual items - // TODO*: add splitter and aggregator - // TODO: forward it to the publish subscriber channel + .split() + .handle( "kitchenService", "cook" ) + .aggregate() + .channel( "foodChannel" ) .get(); } @@ -60,14 +66,25 @@ public class App { while ( true ) { Thread.sleep( 1000 ); - OrderItem items = generateOrderItem(); + Collection items = generateOrderItems(); System.out.println( "New orderItems: " + - items.getItemName() ); - Food food = cafe.process( items ); - System.out.println( "Ready food: " + food.getName() ); + items.stream().map( OrderItem::getItemName ) + .collect( Collectors.joining( "," ) ) ); + Collection food = cafe.process( items ); + System.out.println( "Ready food: " + food.stream() + .map( Food::getName ) + .collect( Collectors.joining( "," ) ) ); } } + private static Collection generateOrderItems() { + List items = new ArrayList<>(); + for ( int i = 0; i < RandomUtils.nextInt( 1, 5 ); ++ i ) { + items.add( generateOrderItem() ); + } + return items; + } + private static OrderItem generateOrderItem() { return new OrderItem( MENU[ RandomUtils.nextInt( 0, MENU.length ) ] ); } diff --git a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java index 180de97b..cec8f49a 100644 --- a/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java +++ b/2019-08/spring-26/src/main/java/ru/otus/spring/integration/Cafe.java @@ -1,13 +1,18 @@ package ru.otus.spring.integration; + +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.MessagingGateway; import ru.otus.spring.integration.domain.Food; import ru.otus.spring.integration.domain.OrderItem; import java.util.Collection; // TODO: add messaging gateway annotation +@MessagingGateway public interface Cafe { // TODO: add gateway annotation with required channels - Food process( OrderItem orderItem); + @Gateway(requestChannel = "itemsChannel", replyChannel = "foodChannel") + Collection process( Collection orderItem); }