2025-09 spring-37-rabbit

This commit is contained in:
Vladimir Ivanov
2026-02-19 19:07:41 +03:00
parent 8ff44e4c86
commit cebc83bf18
35 changed files with 1067 additions and 0 deletions
+41
View File
@@ -0,0 +1,41 @@
## Пример взаимодействия приложений через RabbitMQ
В примере демонстрируется:
* *использование `@Scheduled` таймера для имитации пользовательской активности*
* *оправка и прием сообщений между двумя приложениями через RabbitMQ*
* *создание и отправка email сообщений средствами SpringMail*
* *аггрегация данных с помощью SpringData/JPQL*
* *создание кастомного endpoint-а actuator-а для вывода статистики*
Описание примера:
user-activity-emitter-microservice:
* *по таймеру `UserActivityEmitterService` достает из БД случайный тип активности и пользователя*
* *после чего формирует из них объект активности (`UserActivity`) и отправляет в очередь сообщений RabbitMQ с
помощью `RabbitTemplate`
настроенный на работу с "main-exchange"*
* *большее число активностей имеют `routingKey` = "user.activity.message.simple" *
* *активности, у которых в названии типа есть вхождение "Вредн" имееют свой `routingKey` ("
user.activity.message.important") *
* *так же приложение по таймеру, с помощью `ActivityStatCalculationEmitterSerivce` инициирует подсчет статистики с
помощью отправки
сообщения в очередь RabbitMQ c `routingKey` = "user.activity.stat"*
user-activity-processor-microservice:
* *в приложении для обработки сообщений есть несколько очередей, куда попадают сообщения в зависимости от `routingKey`*
* *их прослушивает компонент `RabbitMqListener`, который содержит по методу на каждую очередь"*
* *в "all-activity-queue" попадают все активности. На выходе из данной очереди активности сохраняются в БД*
* *в "important-activity-queue" попадают важные активности. На выходе из данной очереди активности преобразуются в
письма и отправляются на
почту администратору*
* *в "stat-calc-commands-queue" попадают команды, которые инициируют расчет статистики. *
* *в методе-обработчике сообщений данной очереди происходит удаление старых статистических данных, а так же подсчет и
сохранение в БД новых*
* *за вывод статистических данных отвечает кастомный endpoint actuator-а `ActivityStatEndpoint`*
Для работы приложений требуется работающий RabbitMQ. За его запуск отвечает docker-compose.yml
Адрес консоли RabbitMQ: http://localhost:15672/
Логин и пароль: guest
@@ -0,0 +1,7 @@
version: '3'
services:
rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"
+38
View File
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>ru.otus.example</groupId>
<artifactId>spring-mail-rabbitmq-demo</artifactId>
<version>1.0</version>
<packaging>pom</packaging>
<modules>
<module>user-activity-models</module>
<module>user-activity-emitter-microservice</module>
<module>user-activity-processor-microservice</module>
</modules>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus.example</groupId>
<artifactId>spring-mail-rabbitmq-demo</artifactId>
<version>1.0</version>
</parent>
<artifactId>user-activity-emitter-microservice</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>user-activity-emitter-microservice</name>
<description>User activity emitter microservice</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>ru.otus.example</groupId>
<artifactId>user-activity-models</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,17 @@
package ru.otus.example.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@EntityScan("ru.otus.example.useractivitymodels")
@SpringBootApplication
public class EmitterMicroServiceApplication {
public static void main(String[] args) {
SpringApplication.run(EmitterMicroServiceApplication.class, args);
}
}
@@ -0,0 +1,34 @@
package ru.otus.example.rabbitmq.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
private static final String MAIN_EXCHANGE_NAME = "main-exchange";
@Bean
public Jackson2JsonMessageConverter jsonConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter jsonConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(MAIN_EXCHANGE_NAME);
rabbitTemplate.setMessageConverter(jsonConverter);
return rabbitTemplate;
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MAIN_EXCHANGE_NAME);
}
}
@@ -0,0 +1,9 @@
package ru.otus.example.rabbitmq.repositories;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.useractivitymodels.UserActivity;
@Transactional
public interface ActivityRepository extends JpaRepository<UserActivity, Long> {
}
@@ -0,0 +1,7 @@
package ru.otus.example.rabbitmq.repositories;
import org.springframework.data.jpa.repository.JpaRepository;
import ru.otus.example.useractivitymodels.ActivityType;
public interface ActivityTypeRepository extends JpaRepository<ActivityType, Long> {
}
@@ -0,0 +1,9 @@
package ru.otus.example.rabbitmq.repositories;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.useractivitymodels.AppUser;
@Transactional
public interface AppUserRepository extends JpaRepository<AppUser, Long> {
}
@@ -0,0 +1,24 @@
package ru.otus.example.rabbitmq.services;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Service
@Slf4j
public class ActivityStatCalculationEmitterSerivce {
private static final String USER_ACTIVITY_STAT_ROUTING_KEY = "user.activity.stat";
private static final String CALC_STAT_COMMAND = "{\"command\": \"calc stat now\"}";
private final RabbitTemplate rabbitTemplate;
@Scheduled(initialDelay = 3000, fixedRate = 10000)
public void emitAppUserActivityStatCalculation() {
// log.warn("Stat send!!!");
rabbitTemplate.convertAndSend(USER_ACTIVITY_STAT_ROUTING_KEY, CALC_STAT_COMMAND);
log.warn("Stat calculated!!!");
}
}
@@ -0,0 +1,40 @@
package ru.otus.example.rabbitmq.services;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import ru.otus.example.rabbitmq.repositories.ActivityTypeRepository;
import ru.otus.example.rabbitmq.repositories.AppUserRepository;
import ru.otus.example.useractivitymodels.UserActivity;
import java.util.Random;
@RequiredArgsConstructor
@Service
@Slf4j
public class UserActivityEmitterService {
private final ActivityTypeRepository activityTypeRepository;
private final AppUserRepository appUserRepository;
private final RabbitTemplate rabbitTemplate;
@SuppressWarnings("unused")
@Scheduled(initialDelay = 2000, fixedRate = 3000)
public void emitAppUserActivity() {
val random = new Random();
val activityTypes = activityTypeRepository.findAll();
val appUsers = appUserRepository.findAll();
val activityType = activityTypes.get(random.nextInt(activityTypes.size()));
val appUser = appUsers.get(random.nextInt(appUsers.size()));
val appUserActivity = new UserActivity(activityType, appUser);
val isImportant = activityType.getName().contains("Вредн");
val routingKey = String.format("user.activity.message.%s", isImportant ? "important" : "simple");
rabbitTemplate.convertAndSend(routingKey, appUserActivity);
log.info("Send activity: {}", appUserActivity);
}
}
@@ -0,0 +1,14 @@
server:
port: 9090
spring:
jpa:
generate-ddl: false
hibernate:
ddl-auto: none
show-sql: false
rabbitmq:
addresses: "localhost"
sql:
init:
mode: always
@@ -0,0 +1,44 @@
insert into app_users(name, email)
values ('Рафаель Губерманович Тыгыдым', 'test@mail.ru');
insert into app_users(name, email)
values ('Артем Демосфенович Шмяк', 'test@mail.ru');
insert into app_users(name, email)
values ('Ифигения Бореславовна Фуфелшмерц', 'test@mail.ru');
insert into activity_types(name)
values ('Очень полезное дело №4');
insert into activity_types(name)
values ('Очень полезное дело №13');
insert into activity_types(name)
values ('Очень полезное дело №34');
insert into activity_types(name)
values ('Очень полезное дело №48');
insert into activity_types(name)
values ('Очень полезное дело №53');
insert into activity_types(name)
values ('Вредное дело №11');
insert into activity_types(name)
values ('Вредное дело №12');
insert into activity_types(name)
values ('Вредное дело №13');
insert into activity_types(name)
values ('Вредное дело №14');
insert into activity_types(name)
values ('Вредное дело №15');
insert into activity_types(name)
values ('Вредное дело №16');
@@ -0,0 +1,31 @@
create table activity_types
(
id bigint auto_increment,
name varchar(255),
primary key (id)
);
create table app_users
(
id bigint auto_increment,
email varchar(255),
name varchar(255),
primary key (id)
);
create table app_users_activity
(
id bigint auto_increment,
activity_time timestamp,
app_user_id bigint,
activity_type_id bigint,
primary key (id)
);
alter table app_users_activity
add constraint app_users_activity_user_id_fk foreign key (app_user_id) references app_users;
alter table app_users_activity
add constraint app_users_activity_activity_type_id_fk foreign key (activity_type_id) references activity_types;
@@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus.example</groupId>
<artifactId>spring-mail-rabbitmq-demo</artifactId>
<version>1.0</version>
</parent>
<artifactId>user-activity-models</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>user-activity-models</name>
<description>Models for spring mail rabbitmq demo project</description>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,37 @@
package ru.otus.example.useractivitymodels;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.persistence.*;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@Entity
@Table(name = "activity_stat")
public class ActivityStatElem {
@JsonIgnore
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
@JsonProperty("Имя пользователя")
@Column(name = "app_user_name")
private String appUserName;
@JsonProperty("Тип активности")
@Column(name = "activity_type")
private String activityType;
@JsonProperty("Количество")
@Column(name = "activities_count")
private long activitiesCount;
public ActivityStatElem(String appUserName, String activityType, long activitiesCount) {
this.appUserName = appUserName;
this.activityType = activityType;
this.activitiesCount = activitiesCount;
}
}
@@ -0,0 +1,21 @@
package ru.otus.example.useractivitymodels;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "activity_types")
public class ActivityType {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private long id;
@Column(name = "name")
private String name;
}
@@ -0,0 +1,24 @@
package ru.otus.example.useractivitymodels;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "app_users")
public class AppUser {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private long id;
@Column(name = "email")
private String email;
@Column(name = "name")
private String name;
}
@@ -0,0 +1,38 @@
package ru.otus.example.useractivitymodels;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "app_users_activity")
public class UserActivity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private long id;
@Column(name = "activity_time")
private LocalDateTime activityTime;
@ManyToOne(fetch = FetchType.EAGER)
@JoinColumn(name = "activity_type_id")
private ActivityType type;
@ManyToOne(fetch = FetchType.EAGER)
@JoinColumn(name = "app_user_id")
private AppUser appUser;
public UserActivity(ActivityType type, AppUser appUser) {
this.id = id;
this.type = type;
this.appUser = appUser;
this.activityTime = LocalDateTime.now();
}
}
@@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>ru.otus.example</groupId>
<artifactId>spring-mail-rabbitmq-demo</artifactId>
<version>1.0</version>
</parent>
<artifactId>user-activity-processor-microservice</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>user-activity-processor-microservice</name>
<description>User activity processor microservice</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>ru.otus.example</groupId>
<artifactId>user-activity-models</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,17 @@
package ru.otus.example.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@EntityScan("ru.otus.example.useractivitymodels")
@SpringBootApplication
public class ProcessorMicroServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessorMicroServiceApplication.class, args);
}
}
@@ -0,0 +1,23 @@
package ru.otus.example.rabbitmq.actuator;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.stereotype.Component;
import ru.otus.example.rabbitmq.repositories.ActivityStatRepository;
import ru.otus.example.useractivitymodels.ActivityStatElem;
import java.util.List;
@RequiredArgsConstructor
@Component
@Endpoint(id = "activity-stat")
public class ActivityStatEndpoint {
private final ActivityStatRepository activityStatRepository;
@ReadOperation
public List<ActivityStatElem> getAppUsersActivityStat() {
return activityStatRepository.findAll();
}
}
@@ -0,0 +1,13 @@
package ru.otus.example.rabbitmq.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties("app")
public class AppProps {
private String serverEmail;
private String adminEmail;
}
@@ -0,0 +1,75 @@
package ru.otus.example.rabbitmq.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public Jackson2JsonMessageConverter jsonConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public Queue allActivityQueue() {
return new Queue("all-activity-queue");
}
@Bean
public Queue importantActivityQueue() {
return new Queue("important-activity-queue");
}
@Bean
public Queue statCalcCommandsQueue() {
return QueueBuilder.durable("stat-calc-commands-queue")
.maxLength(5)
.deadLetterExchange("dead-letter-exchange")
.build();
}
@Bean
public Queue deadLetterQueue() {
return new Queue("dead-letter-queue");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("main-exchange");
}
@Bean
public FanoutExchange deadLetterExchange() {
return new FanoutExchange("dead-letter-exchange");
}
@Bean
public Binding allActivityBinding() {
return BindingBuilder.bind(allActivityQueue())
.to(topicExchange())
.with("user.activity.message.*");
}
@Bean
public Binding importantActivityBinding() {
return BindingBuilder.bind(importantActivityQueue())
.to(topicExchange())
.with("user.activity.message.important");
}
@Bean
public Binding statCalcCommandsBinding() {
return BindingBuilder.bind(statCalcCommandsQueue())
.to(topicExchange())
.with("user.activity.stat");
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange());
}
}
@@ -0,0 +1,70 @@
package ru.otus.example.rabbitmq.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import ru.otus.example.rabbitmq.repositories.ActivityRepository;
import ru.otus.example.rabbitmq.repositories.ActivityStatRepository;
import ru.otus.example.rabbitmq.services.UserActivityToEmailTransformer;
import ru.otus.example.useractivitymodels.UserActivity;
@RequiredArgsConstructor
@Service
@Slf4j
public class RabbitMqListener {
private final ActivityRepository activityRepository;
private final ActivityStatRepository activityStatRepository;
private final UserActivityToEmailTransformer messageTransformer;
private final JavaMailSender mailSender;
@RabbitListener(queues = "important-activity-queue")
public void processImportantMessages(UserActivity message) {
log.info("RECEIVED FROM important-activity-queue: " + message);
try {
val mailMessage = messageTransformer.transform(message);
log.info("Как будто посылаем письмо: " + mailMessage);
//mailSender.send(mailMessage);
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("Ooops");
}
}
@RabbitListener(queues = "all-activity-queue")
public void processAllMessages(UserActivity message) {
log.info("RECEIVED FROM all-activity-queue: " + message);
try {
activityRepository.save(message);
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("Ooops");
}
}
@RabbitListener(queues = "stat-calc-commands-queue", ackMode = "MANUAL")
public void processStatCalcCommandsMessages(String message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.warn("RECEIVED FROM stat-calc-commands-queue: " + message);
activityStatRepository.deleteAll();
val activityStat = activityStatRepository.calcActivityStat();
activityStatRepository.saveAll(activityStat);
// sleep(5000);
channel.basicAck(tag, false);
// Для ackMode = "MANUAL" и перехода в dead letter exchange
//channel.basicNack(tag, false, false);
// Для ackMode = "AUTO" и перехода в dead letter exchange
//throw new AmqpRejectAndDontRequeueException("Ooops");
}
}
@@ -0,0 +1,9 @@
package ru.otus.example.rabbitmq.repositories;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.useractivitymodels.UserActivity;
@Transactional
public interface ActivityRepository extends JpaRepository<UserActivity, Long> {
}
@@ -0,0 +1,21 @@
package ru.otus.example.rabbitmq.repositories;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;
import ru.otus.example.useractivitymodels.ActivityStatElem;
import java.util.List;
@Transactional
public interface ActivityStatRepository extends JpaRepository<ActivityStatElem, Long> {
@Transactional(readOnly = true)
@Query("select new ru.otus.example.useractivitymodels.ActivityStatElem(u.name, t.name, count(a)) " +
"from UserActivity a left join a.appUser u left join a.type t " +
"group by u.name, t.name " +
"order by count(a) desc, u.name, t.name")
List<ActivityStatElem> calcActivityStat();
}
@@ -0,0 +1,8 @@
package ru.otus.example.rabbitmq.services;
import org.springframework.mail.SimpleMailMessage;
import ru.otus.example.useractivitymodels.UserActivity;
public interface UserActivityToEmailTransformer {
SimpleMailMessage transform(UserActivity activity);
}
@@ -0,0 +1,25 @@
package ru.otus.example.rabbitmq.services;
import lombok.RequiredArgsConstructor;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.stereotype.Service;
import ru.otus.example.rabbitmq.config.AppProps;
import ru.otus.example.useractivitymodels.UserActivity;
@RequiredArgsConstructor
@Service
public class UserActivityToEmailTransformerImpl implements UserActivityToEmailTransformer {
private final AppProps appProps;
@Override
public SimpleMailMessage transform(UserActivity activity) {
SimpleMailMessage mailMessage = new SimpleMailMessage();
mailMessage.setTo(appProps.getAdminEmail());
mailMessage.setFrom(appProps.getServerEmail());
mailMessage.setSubject("Обнаружена вредная активность");
mailMessage.setText(String.format("Внимание!!! Обнаружена вредная активность! Время: %s, пользователь: %s, тип активности: %s",
activity.getActivityTime(), activity.getAppUser().getName(), activity.getType().getName()));
return mailMessage;
}
}
@@ -0,0 +1,40 @@
app:
# адрес почты, через которую сервер отправляет письма
server-email: ${server.email}
# адрес почты администратора, на которую сервер отправляет письма
admin-email: ${admin.email}
spring:
jpa:
generate-ddl: false
hibernate:
ddl-auto: none
show-sql: true
rabbitmq:
addresses: "localhost"
mail:
host: smtp.mail.ru
port: 465
# логин и пароль для почты, через которую сервер отправляет письма
username: ${email.server.user}
password: ${email.server.password}
protocol: smtps
properties:
mail:
smtp:
auth: true
starttls.enable: true
sql:
init:
mode:
management:
endpoints:
web:
exposure:
include: health, info, activity-stat
endpoint:
health:
show-details: always
@@ -0,0 +1,44 @@
insert into app_users(name, email)
values ('Рафаель Губерманович Тыгыдым', 'test@mail.ru');
insert into app_users(name, email)
values ('Артем Демосфенович Шмяк', 'test@mail.ru');
insert into app_users(name, email)
values ('Ифигения Бореславовна Фуфелшмерц', 'test@mail.ru');
insert into activity_types(name)
values ('Очень полезное дело №4');
insert into activity_types(name)
values ('Очень полезное дело №13');
insert into activity_types(name)
values ('Очень полезное дело №34');
insert into activity_types(name)
values ('Очень полезное дело №48');
insert into activity_types(name)
values ('Очень полезное дело №53');
insert into activity_types(name)
values ('Вредное дело №11');
insert into activity_types(name)
values ('Вредное дело №12');
insert into activity_types(name)
values ('Вредное дело №13');
insert into activity_types(name)
values ('Вредное дело №14');
insert into activity_types(name)
values ('Вредное дело №15');
insert into activity_types(name)
values ('Вредное дело №16');
@@ -0,0 +1,41 @@
create table activity_types
(
id bigint auto_increment,
name varchar(255),
primary key (id)
);
create table app_users
(
id bigint auto_increment,
email varchar(255),
name varchar(255),
primary key (id)
);
create table app_users_activity
(
id bigint auto_increment,
activity_time timestamp,
app_user_id bigint,
activity_type_id bigint,
primary key (id)
);
alter table app_users_activity
add constraint app_users_activity_user_id_fk foreign key (app_user_id) references app_users;
alter table app_users_activity
add constraint app_users_activity_activity_type_id_fk foreign key (activity_type_id) references activity_types;
create table activity_stat
(
id bigint auto_increment,
app_user_name varchar(255),
activity_type varchar(255),
activities_count bigint,
primary key (id)
);