diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/App.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/App.java index bd7a48f2..4aaa6f39 100644 --- a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/App.java +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/App.java @@ -1,96 +1,19 @@ package ru.otus.spring.integration; -import org.apache.commons.lang3.RandomUtils; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; import org.springframework.context.support.AbstractApplicationContext; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.channel.PublishSubscribeChannel; -import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; -import org.springframework.integration.dsl.MessageChannels; -import org.springframework.integration.dsl.Pollers; -import org.springframework.integration.scheduling.PollerMetadata; -import ru.otus.spring.integration.domain.Food; -import ru.otus.spring.integration.domain.OrderItem; +import ru.otus.spring.integration.service.OrderService; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ForkJoinPool; -import java.util.stream.Collectors; - - -@IntegrationComponentScan -@SuppressWarnings({ "resource", "Duplicates", "InfiniteLoopStatement" }) -@ComponentScan -@Configuration -@EnableIntegration +@SpringBootApplication public class App { - private static final String[] MENU = { "coffee", "tea", "smoothie", "whiskey", "beer", "cola", "water" }; - - @Bean - public QueueChannel itemsChannel() { - return MessageChannels.queue( 10 ).get(); - } - - @Bean - public PublishSubscribeChannel foodChannel() { - return MessageChannels.publishSubscribe().get(); - } - - @Bean(name = PollerMetadata.DEFAULT_POLLER) - public PollerMetadata poller() { - return Pollers.fixedRate( 100 ).maxMessagesPerPoll( 2 ).get(); - } - - @Bean - public IntegrationFlow cafeFlow() { - return IntegrationFlows.from( "itemsChannel" ) - .split() - .handle( "kitchenService", "cook" ) - .aggregate() - .channel( "foodChannel" ) - .get(); - } public static void main( String[] args ) throws Exception { AbstractApplicationContext ctx = new AnnotationConfigApplicationContext( App.class ); - // here we works with cafe using interface - Cafe cafe = ctx.getBean( Cafe.class ); + OrderService orderService = ctx.getBean(OrderService.class); + orderService.startOrdersLoop(); - ForkJoinPool pool = ForkJoinPool.commonPool(); - - while ( true ) { - Thread.sleep( 7000 ); - - pool.execute( () -> { - Collection items = generateOrderItems(); - System.out.println( "New orderItems: " + - items.stream().map( OrderItem::getItemName ) - .collect( Collectors.joining( "," ) ) ); - Collection food = cafe.process( items ); - System.out.println( "Ready food: " + food.stream() - .map( Food::getName ) - .collect( Collectors.joining( "," ) ) ); - } ); - } } - private static OrderItem generateOrderItem() { - return new OrderItem( MENU[ RandomUtils.nextInt( 0, MENU.length ) ] ); - } - - private static Collection generateOrderItems() { - List items = new ArrayList<>(); - for ( int i = 0; i < RandomUtils.nextInt( 1, 5 ); ++ i ) { - items.add( generateOrderItem() ); - } - return items; - } } diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/Cafe.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/integration/Cafe.java similarity index 90% rename from 2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/Cafe.java rename to 2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/integration/Cafe.java index 6b31f40a..d3983bf9 100644 --- a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/Cafe.java +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/integration/Cafe.java @@ -1,4 +1,4 @@ -package ru.otus.spring.integration; +package ru.otus.spring.integration.integration; import org.springframework.integration.annotation.Gateway; diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/integration/IntegrationConfig.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/integration/IntegrationConfig.java new file mode 100644 index 00000000..d685b61b --- /dev/null +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/integration/IntegrationConfig.java @@ -0,0 +1,45 @@ +package ru.otus.spring.integration.integration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.MessageChannels; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.scheduling.PollerMetadata; +import ru.otus.spring.integration.service.KitchenService; + +@Configuration +public class IntegrationConfig { + + private static final int QUEUE_CAPACITY = 10; + private static final String COOK_METHOD_NAME = "cook"; + + @Bean + public QueueChannel itemsChannel() { + return MessageChannels.queue(QUEUE_CAPACITY).get(); + } + + @Bean + public PublishSubscribeChannel foodChannel() { + return MessageChannels.publishSubscribe().get(); + } + + @Bean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata poller() { + return Pollers.fixedRate(100).maxMessagesPerPoll(2).get(); + } + + @Bean + public IntegrationFlow cafeFlow(KitchenService kitchenService) { + return IntegrationFlows.from(itemsChannel()) + .split() + .handle(kitchenService, COOK_METHOD_NAME) + .aggregate() + .channel(foodChannel()) + .get(); + } + +} diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/KitchenService.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/KitchenService.java new file mode 100644 index 00000000..c0d9d8ad --- /dev/null +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/KitchenService.java @@ -0,0 +1,8 @@ +package ru.otus.spring.integration.service; + +import ru.otus.spring.integration.domain.Food; +import ru.otus.spring.integration.domain.OrderItem; + +public interface KitchenService { + Food cook(OrderItem orderItem) throws Exception; +} diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/kitchen/KitchenService.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/KitchenServiceImpl.java similarity index 79% rename from 2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/kitchen/KitchenService.java rename to 2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/KitchenServiceImpl.java index 8e40715f..2f7fcc08 100644 --- a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/kitchen/KitchenService.java +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/KitchenServiceImpl.java @@ -1,12 +1,13 @@ -package ru.otus.spring.integration.kitchen; +package ru.otus.spring.integration.service; import org.springframework.stereotype.Service; import ru.otus.spring.integration.domain.Food; import ru.otus.spring.integration.domain.OrderItem; @Service -public class KitchenService { +public class KitchenServiceImpl implements KitchenService { + @Override public Food cook(OrderItem orderItem) throws Exception { System.out.println("Cooking " + orderItem.getItemName()); Thread.sleep(3000); diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/OrderService.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/OrderService.java new file mode 100644 index 00000000..143bfd89 --- /dev/null +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/OrderService.java @@ -0,0 +1,5 @@ +package ru.otus.spring.integration.service; + +public interface OrderService { + void startOrdersLoop() throws Exception; +} diff --git a/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/OrderServiceImpl.java b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/OrderServiceImpl.java new file mode 100644 index 00000000..7ff6acf4 --- /dev/null +++ b/2021-05/spring-27-integration-example/src/main/java/ru/otus/spring/integration/service/OrderServiceImpl.java @@ -0,0 +1,60 @@ +package ru.otus.spring.integration.service; + +import org.apache.commons.lang3.RandomUtils; +import org.springframework.stereotype.Service; +import ru.otus.spring.integration.domain.Food; +import ru.otus.spring.integration.domain.OrderItem; +import ru.otus.spring.integration.integration.Cafe; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; + +@Service +public class OrderServiceImpl implements OrderService { + + private static final int ORDERS_DELAY_MILLS = 7000; + private static final String[] MENU = {"coffee", "tea", "smoothie", "whiskey", "beer", "cola", "water"}; + + private final Cafe cafe; + + public OrderServiceImpl(Cafe cafe) { + this.cafe = cafe; + } + + @SuppressWarnings({"resource", "Duplicates", "InfiniteLoopStatement", "BusyWait"}) + @Override + public void startOrdersLoop() throws Exception { + ForkJoinPool pool = ForkJoinPool.commonPool(); + + while (true) { + Thread.sleep(ORDERS_DELAY_MILLS); + + pool.execute(() -> { + Collection items = generateOrderItems(); + System.out.println("New orderItems: " + + items.stream().map(OrderItem::getItemName) + .collect(Collectors.joining(","))); + Collection food = cafe.process(items); + System.out.println("Ready food: " + food.stream() + .map(Food::getName) + .collect(Collectors.joining(","))); + }); + } + } + + + private OrderItem generateOrderItem() { + return new OrderItem(MENU[RandomUtils.nextInt(0, MENU.length)]); + } + + private Collection generateOrderItems() { + List items = new ArrayList<>(); + for (int i = 0; i < RandomUtils.nextInt(1, 5); ++i) { + items.add(generateOrderItem()); + } + return items; + } +}