mirror of
https://github.com/OtusTeam/Spring.git
synced 2026-05-30 10:50:42 +00:00
2025-11 spring-37-rabbit
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
## Пример взаимодействия приложений через RabbitMQ
|
||||
|
||||
В примере демонстрируется:
|
||||
|
||||
* *использование `@Scheduled` таймера для имитации пользовательской активности*
|
||||
* *оправка и прием сообщений между двумя приложениями через RabbitMQ*
|
||||
* *создание и отправка email сообщений средствами SpringMail*
|
||||
* *аггрегация данных с помощью SpringData/JPQL*
|
||||
* *создание кастомного endpoint-а actuator-а для вывода статистики*
|
||||
|
||||
Описание примера:
|
||||
|
||||
user-activity-emitter-microservice:
|
||||
|
||||
* *по таймеру `UserActivityEmitterService` достает из БД случайный тип активности и пользователя*
|
||||
* *после чего формирует из них объект активности (`UserActivity`) и отправляет в очередь сообщений RabbitMQ с
|
||||
помощью `RabbitTemplate`
|
||||
настроенный на работу с "main-exchange"*
|
||||
* *большее число активностей имеют `routingKey` = "user.activity.message.simple" *
|
||||
* *активности, у которых в названии типа есть вхождение "Вредн" имееют свой `routingKey` ("
|
||||
user.activity.message.important") *
|
||||
* *так же приложение по таймеру, с помощью `ActivityStatCalculationEmitterSerivce` инициирует подсчет статистики с
|
||||
помощью отправки
|
||||
сообщения в очередь RabbitMQ c `routingKey` = "user.activity.stat"*
|
||||
|
||||
user-activity-processor-microservice:
|
||||
|
||||
* *в приложении для обработки сообщений есть несколько очередей, куда попадают сообщения в зависимости от `routingKey`*
|
||||
* *их прослушивает компонент `RabbitMqListener`, который содержит по методу на каждую очередь"*
|
||||
* *в "all-activity-queue" попадают все активности. На выходе из данной очереди активности сохраняются в БД*
|
||||
* *в "important-activity-queue" попадают важные активности. На выходе из данной очереди активности преобразуются в
|
||||
письма и отправляются на
|
||||
почту администратору*
|
||||
* *в "stat-calc-commands-queue" попадают команды, которые инициируют расчет статистики. *
|
||||
* *в методе-обработчике сообщений данной очереди происходит удаление старых статистических данных, а так же подсчет и
|
||||
сохранение в БД новых*
|
||||
* *за вывод статистических данных отвечает кастомный endpoint actuator-а `ActivityStatEndpoint`*
|
||||
|
||||
Для работы приложений требуется работающий RabbitMQ. За его запуск отвечает docker-compose.yml
|
||||
Адрес консоли RabbitMQ: http://localhost:15672/
|
||||
Логин и пароль: guest
|
||||
@@ -0,0 +1,7 @@
|
||||
version: '3'
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:management
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
@@ -0,0 +1,38 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>3.5.2</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
|
||||
<groupId>ru.otus.example</groupId>
|
||||
<artifactId>spring-mail-rabbitmq-demo</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
<module>user-activity-models</module>
|
||||
<module>user-activity-emitter-microservice</module>
|
||||
<module>user-activity-processor-microservice</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,31 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**
|
||||
!**/src/test/**
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
@@ -0,0 +1,55 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>ru.otus.example</groupId>
|
||||
<artifactId>spring-mail-rabbitmq-demo</artifactId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>user-activity-emitter-microservice</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>user-activity-emitter-microservice</name>
|
||||
<description>User activity emitter microservice</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>ru.otus.example</groupId>
|
||||
<artifactId>user-activity-models</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
package ru.otus.example.rabbitmq;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@EnableScheduling
|
||||
@EntityScan("ru.otus.example.useractivitymodels")
|
||||
@SpringBootApplication
|
||||
public class EmitterMicroServiceApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(EmitterMicroServiceApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
+34
@@ -0,0 +1,34 @@
|
||||
package ru.otus.example.rabbitmq.rabbitmq;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class RabbitMqConfig {
|
||||
|
||||
private static final String MAIN_EXCHANGE_NAME = "main-exchange";
|
||||
|
||||
@Bean
|
||||
public Jackson2JsonMessageConverter jsonConverter(ObjectMapper objectMapper) {
|
||||
return new Jackson2JsonMessageConverter(objectMapper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
|
||||
Jackson2JsonMessageConverter jsonConverter) {
|
||||
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
||||
rabbitTemplate.setExchange(MAIN_EXCHANGE_NAME);
|
||||
rabbitTemplate.setMessageConverter(jsonConverter);
|
||||
return rabbitTemplate;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TopicExchange topicExchange() {
|
||||
return new TopicExchange(MAIN_EXCHANGE_NAME);
|
||||
}
|
||||
}
|
||||
+9
@@ -0,0 +1,9 @@
|
||||
package ru.otus.example.rabbitmq.repositories;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import ru.otus.example.useractivitymodels.UserActivity;
|
||||
|
||||
@Transactional
|
||||
public interface ActivityRepository extends JpaRepository<UserActivity, Long> {
|
||||
}
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package ru.otus.example.rabbitmq.repositories;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import ru.otus.example.useractivitymodels.ActivityType;
|
||||
|
||||
public interface ActivityTypeRepository extends JpaRepository<ActivityType, Long> {
|
||||
}
|
||||
+9
@@ -0,0 +1,9 @@
|
||||
package ru.otus.example.rabbitmq.repositories;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import ru.otus.example.useractivitymodels.AppUser;
|
||||
|
||||
@Transactional
|
||||
public interface AppUserRepository extends JpaRepository<AppUser, Long> {
|
||||
}
|
||||
+24
@@ -0,0 +1,24 @@
|
||||
package ru.otus.example.rabbitmq.services;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ActivityStatCalculationEmitterSerivce {
|
||||
private static final String USER_ACTIVITY_STAT_ROUTING_KEY = "user.activity.stat";
|
||||
private static final String CALC_STAT_COMMAND = "{\"command\": \"calc stat now\"}";
|
||||
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Scheduled(initialDelay = 3000, fixedRate = 10000)
|
||||
public void emitAppUserActivityStatCalculation() {
|
||||
// log.warn("Stat send!!!");
|
||||
rabbitTemplate.convertAndSend(USER_ACTIVITY_STAT_ROUTING_KEY, CALC_STAT_COMMAND);
|
||||
log.warn("Stat calculated!!!");
|
||||
}
|
||||
}
|
||||
+40
@@ -0,0 +1,40 @@
|
||||
package ru.otus.example.rabbitmq.services;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import lombok.val;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import ru.otus.example.rabbitmq.repositories.ActivityTypeRepository;
|
||||
import ru.otus.example.rabbitmq.repositories.AppUserRepository;
|
||||
import ru.otus.example.useractivitymodels.UserActivity;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
@Slf4j
|
||||
public class UserActivityEmitterService {
|
||||
private final ActivityTypeRepository activityTypeRepository;
|
||||
private final AppUserRepository appUserRepository;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Scheduled(initialDelay = 2000, fixedRate = 3000)
|
||||
public void emitAppUserActivity() {
|
||||
val random = new Random();
|
||||
val activityTypes = activityTypeRepository.findAll();
|
||||
val appUsers = appUserRepository.findAll();
|
||||
|
||||
val activityType = activityTypes.get(random.nextInt(activityTypes.size()));
|
||||
val appUser = appUsers.get(random.nextInt(appUsers.size()));
|
||||
val appUserActivity = new UserActivity(activityType, appUser);
|
||||
val isImportant = activityType.getName().contains("Вредн");
|
||||
|
||||
val routingKey = String.format("user.activity.message.%s", isImportant ? "important" : "simple");
|
||||
rabbitTemplate.convertAndSend(routingKey, appUserActivity);
|
||||
log.info("Send activity: {}", appUserActivity);
|
||||
|
||||
}
|
||||
}
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
server:
|
||||
port: 9090
|
||||
spring:
|
||||
jpa:
|
||||
generate-ddl: false
|
||||
hibernate:
|
||||
ddl-auto: none
|
||||
show-sql: false
|
||||
|
||||
rabbitmq:
|
||||
addresses: "localhost"
|
||||
sql:
|
||||
init:
|
||||
mode: always
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
insert into app_users(name, email)
|
||||
values ('Рафаель Губерманович Тыгыдым', 'test@mail.ru');
|
||||
|
||||
insert into app_users(name, email)
|
||||
values ('Артем Демосфенович Шмяк', 'test@mail.ru');
|
||||
|
||||
insert into app_users(name, email)
|
||||
values ('Ифигения Бореславовна Фуфелшмерц', 'test@mail.ru');
|
||||
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №4');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №13');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №34');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №48');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №53');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №11');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №12');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №13');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №14');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №15');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №16');
|
||||
|
||||
|
||||
+31
@@ -0,0 +1,31 @@
|
||||
create table activity_types
|
||||
(
|
||||
id bigint auto_increment,
|
||||
name varchar(255),
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
create table app_users
|
||||
(
|
||||
id bigint auto_increment,
|
||||
email varchar(255),
|
||||
name varchar(255),
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
|
||||
create table app_users_activity
|
||||
(
|
||||
id bigint auto_increment,
|
||||
activity_time timestamp,
|
||||
app_user_id bigint,
|
||||
activity_type_id bigint,
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
alter table app_users_activity
|
||||
add constraint app_users_activity_user_id_fk foreign key (app_user_id) references app_users;
|
||||
|
||||
alter table app_users_activity
|
||||
add constraint app_users_activity_activity_type_id_fk foreign key (activity_type_id) references activity_types;
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**
|
||||
!**/src/test/**
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
@@ -0,0 +1,32 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>ru.otus.example</groupId>
|
||||
<artifactId>spring-mail-rabbitmq-demo</artifactId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>user-activity-models</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>user-activity-models</name>
|
||||
<description>Models for spring mail rabbitmq demo project</description>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>jakarta.persistence</groupId>
|
||||
<artifactId>jakarta.persistence-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
+37
@@ -0,0 +1,37 @@
|
||||
package ru.otus.example.useractivitymodels;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import jakarta.persistence.*;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@Entity
|
||||
@Table(name = "activity_stat")
|
||||
public class ActivityStatElem {
|
||||
|
||||
@JsonIgnore
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
private long id;
|
||||
|
||||
@JsonProperty("Имя пользователя")
|
||||
@Column(name = "app_user_name")
|
||||
private String appUserName;
|
||||
|
||||
@JsonProperty("Тип активности")
|
||||
@Column(name = "activity_type")
|
||||
private String activityType;
|
||||
|
||||
@JsonProperty("Количество")
|
||||
@Column(name = "activities_count")
|
||||
private long activitiesCount;
|
||||
|
||||
public ActivityStatElem(String appUserName, String activityType, long activitiesCount) {
|
||||
this.appUserName = appUserName;
|
||||
this.activityType = activityType;
|
||||
this.activitiesCount = activitiesCount;
|
||||
}
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package ru.otus.example.useractivitymodels;
|
||||
|
||||
import jakarta.persistence.*;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Entity
|
||||
@Table(name = "activity_types")
|
||||
public class ActivityType {
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
@Column(name = "id")
|
||||
private long id;
|
||||
|
||||
@Column(name = "name")
|
||||
private String name;
|
||||
}
|
||||
+24
@@ -0,0 +1,24 @@
|
||||
package ru.otus.example.useractivitymodels;
|
||||
|
||||
import jakarta.persistence.*;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Entity
|
||||
@Table(name = "app_users")
|
||||
public class AppUser {
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
@Column(name = "id")
|
||||
private long id;
|
||||
|
||||
@Column(name = "email")
|
||||
private String email;
|
||||
|
||||
@Column(name = "name")
|
||||
private String name;
|
||||
}
|
||||
+38
@@ -0,0 +1,38 @@
|
||||
package ru.otus.example.useractivitymodels;
|
||||
|
||||
import jakarta.persistence.*;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Entity
|
||||
@Table(name = "app_users_activity")
|
||||
public class UserActivity {
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
@Column(name = "id")
|
||||
private long id;
|
||||
|
||||
@Column(name = "activity_time")
|
||||
private LocalDateTime activityTime;
|
||||
|
||||
@ManyToOne(fetch = FetchType.EAGER)
|
||||
@JoinColumn(name = "activity_type_id")
|
||||
private ActivityType type;
|
||||
|
||||
@ManyToOne(fetch = FetchType.EAGER)
|
||||
@JoinColumn(name = "app_user_id")
|
||||
private AppUser appUser;
|
||||
|
||||
public UserActivity(ActivityType type, AppUser appUser) {
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.appUser = appUser;
|
||||
this.activityTime = LocalDateTime.now();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**
|
||||
!**/src/test/**
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
@@ -0,0 +1,66 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>ru.otus.example</groupId>
|
||||
<artifactId>spring-mail-rabbitmq-demo</artifactId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>user-activity-processor-microservice</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>user-activity-processor-microservice</name>
|
||||
<description>User activity processor microservice</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-mail</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>ru.otus.example</groupId>
|
||||
<artifactId>user-activity-models</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
package ru.otus.example.rabbitmq;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@EnableScheduling
|
||||
@EntityScan("ru.otus.example.useractivitymodels")
|
||||
@SpringBootApplication
|
||||
public class ProcessorMicroServiceApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ProcessorMicroServiceApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
+23
@@ -0,0 +1,23 @@
|
||||
package ru.otus.example.rabbitmq.actuator;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
|
||||
import org.springframework.stereotype.Component;
|
||||
import ru.otus.example.rabbitmq.repositories.ActivityStatRepository;
|
||||
import ru.otus.example.useractivitymodels.ActivityStatElem;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Component
|
||||
@Endpoint(id = "activity-stat")
|
||||
public class ActivityStatEndpoint {
|
||||
|
||||
private final ActivityStatRepository activityStatRepository;
|
||||
|
||||
@ReadOperation
|
||||
public List<ActivityStatElem> getAppUsersActivityStat() {
|
||||
return activityStatRepository.findAll();
|
||||
}
|
||||
}
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package ru.otus.example.rabbitmq.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties("app")
|
||||
public class AppProps {
|
||||
private String serverEmail;
|
||||
private String adminEmail;
|
||||
}
|
||||
+75
@@ -0,0 +1,75 @@
|
||||
package ru.otus.example.rabbitmq.rabbitmq;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class RabbitMqConfig {
|
||||
@Bean
|
||||
public Jackson2JsonMessageConverter jsonConverter(ObjectMapper objectMapper) {
|
||||
return new Jackson2JsonMessageConverter(objectMapper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue allActivityQueue() {
|
||||
return new Queue("all-activity-queue");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue importantActivityQueue() {
|
||||
return new Queue("important-activity-queue");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue statCalcCommandsQueue() {
|
||||
return QueueBuilder.durable("stat-calc-commands-queue")
|
||||
.maxLength(5)
|
||||
.deadLetterExchange("dead-letter-exchange")
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue deadLetterQueue() {
|
||||
return new Queue("dead-letter-queue");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TopicExchange topicExchange() {
|
||||
return new TopicExchange("main-exchange");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public FanoutExchange deadLetterExchange() {
|
||||
return new FanoutExchange("dead-letter-exchange");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding allActivityBinding() {
|
||||
return BindingBuilder.bind(allActivityQueue())
|
||||
.to(topicExchange())
|
||||
.with("user.activity.message.*");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding importantActivityBinding() {
|
||||
return BindingBuilder.bind(importantActivityQueue())
|
||||
.to(topicExchange())
|
||||
.with("user.activity.message.important");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding statCalcCommandsBinding() {
|
||||
return BindingBuilder.bind(statCalcCommandsQueue())
|
||||
.to(topicExchange())
|
||||
.with("user.activity.stat");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding deadLetterBinding() {
|
||||
return BindingBuilder.bind(deadLetterQueue())
|
||||
.to(deadLetterExchange());
|
||||
}
|
||||
}
|
||||
+70
@@ -0,0 +1,70 @@
|
||||
package ru.otus.example.rabbitmq.rabbitmq;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import lombok.val;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.mail.javamail.JavaMailSender;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.stereotype.Service;
|
||||
import ru.otus.example.rabbitmq.repositories.ActivityRepository;
|
||||
import ru.otus.example.rabbitmq.repositories.ActivityStatRepository;
|
||||
import ru.otus.example.rabbitmq.services.UserActivityToEmailTransformer;
|
||||
import ru.otus.example.useractivitymodels.UserActivity;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
@Slf4j
|
||||
public class RabbitMqListener {
|
||||
|
||||
private final ActivityRepository activityRepository;
|
||||
private final ActivityStatRepository activityStatRepository;
|
||||
private final UserActivityToEmailTransformer messageTransformer;
|
||||
private final JavaMailSender mailSender;
|
||||
|
||||
@RabbitListener(queues = "important-activity-queue")
|
||||
public void processImportantMessages(UserActivity message) {
|
||||
log.info("RECEIVED FROM important-activity-queue: " + message);
|
||||
|
||||
try {
|
||||
val mailMessage = messageTransformer.transform(message);
|
||||
log.info("Как будто посылаем письмо: " + mailMessage);
|
||||
//mailSender.send(mailMessage);
|
||||
} catch (Exception e) {
|
||||
throw new AmqpRejectAndDontRequeueException("Ooops");
|
||||
}
|
||||
}
|
||||
|
||||
@RabbitListener(queues = "all-activity-queue")
|
||||
public void processAllMessages(UserActivity message) {
|
||||
log.info("RECEIVED FROM all-activity-queue: " + message);
|
||||
try {
|
||||
activityRepository.save(message);
|
||||
} catch (Exception e) {
|
||||
throw new AmqpRejectAndDontRequeueException("Ooops");
|
||||
}
|
||||
}
|
||||
|
||||
@RabbitListener(queues = "stat-calc-commands-queue", ackMode = "MANUAL")
|
||||
public void processStatCalcCommandsMessages(String message,
|
||||
Channel channel,
|
||||
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
|
||||
log.warn("RECEIVED FROM stat-calc-commands-queue: " + message);
|
||||
|
||||
activityStatRepository.deleteAll();
|
||||
val activityStat = activityStatRepository.calcActivityStat();
|
||||
activityStatRepository.saveAll(activityStat);
|
||||
// sleep(5000);
|
||||
channel.basicAck(tag, false);
|
||||
|
||||
// Для ackMode = "MANUAL" и перехода в dead letter exchange
|
||||
//channel.basicNack(tag, false, false);
|
||||
|
||||
// Для ackMode = "AUTO" и перехода в dead letter exchange
|
||||
//throw new AmqpRejectAndDontRequeueException("Ooops");
|
||||
|
||||
}
|
||||
}
|
||||
+9
@@ -0,0 +1,9 @@
|
||||
package ru.otus.example.rabbitmq.repositories;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import ru.otus.example.useractivitymodels.UserActivity;
|
||||
|
||||
@Transactional
|
||||
public interface ActivityRepository extends JpaRepository<UserActivity, Long> {
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package ru.otus.example.rabbitmq.repositories;
|
||||
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import ru.otus.example.useractivitymodels.ActivityStatElem;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Transactional
|
||||
public interface ActivityStatRepository extends JpaRepository<ActivityStatElem, Long> {
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
@Query("select new ru.otus.example.useractivitymodels.ActivityStatElem(u.name, t.name, count(a)) " +
|
||||
"from UserActivity a left join a.appUser u left join a.type t " +
|
||||
"group by u.name, t.name " +
|
||||
"order by count(a) desc, u.name, t.name")
|
||||
List<ActivityStatElem> calcActivityStat();
|
||||
|
||||
}
|
||||
+8
@@ -0,0 +1,8 @@
|
||||
package ru.otus.example.rabbitmq.services;
|
||||
|
||||
import org.springframework.mail.SimpleMailMessage;
|
||||
import ru.otus.example.useractivitymodels.UserActivity;
|
||||
|
||||
public interface UserActivityToEmailTransformer {
|
||||
SimpleMailMessage transform(UserActivity activity);
|
||||
}
|
||||
+25
@@ -0,0 +1,25 @@
|
||||
package ru.otus.example.rabbitmq.services;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.mail.SimpleMailMessage;
|
||||
import org.springframework.stereotype.Service;
|
||||
import ru.otus.example.rabbitmq.config.AppProps;
|
||||
import ru.otus.example.useractivitymodels.UserActivity;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
public class UserActivityToEmailTransformerImpl implements UserActivityToEmailTransformer {
|
||||
|
||||
private final AppProps appProps;
|
||||
|
||||
@Override
|
||||
public SimpleMailMessage transform(UserActivity activity) {
|
||||
SimpleMailMessage mailMessage = new SimpleMailMessage();
|
||||
mailMessage.setTo(appProps.getAdminEmail());
|
||||
mailMessage.setFrom(appProps.getServerEmail());
|
||||
mailMessage.setSubject("Обнаружена вредная активность");
|
||||
mailMessage.setText(String.format("Внимание!!! Обнаружена вредная активность! Время: %s, пользователь: %s, тип активности: %s",
|
||||
activity.getActivityTime(), activity.getAppUser().getName(), activity.getType().getName()));
|
||||
return mailMessage;
|
||||
}
|
||||
}
|
||||
+40
@@ -0,0 +1,40 @@
|
||||
app:
|
||||
# адрес почты, через которую сервер отправляет письма
|
||||
server-email: ${server.email}
|
||||
# адрес почты администратора, на которую сервер отправляет письма
|
||||
admin-email: ${admin.email}
|
||||
|
||||
spring:
|
||||
jpa:
|
||||
generate-ddl: false
|
||||
hibernate:
|
||||
ddl-auto: none
|
||||
show-sql: true
|
||||
|
||||
rabbitmq:
|
||||
addresses: "localhost"
|
||||
|
||||
mail:
|
||||
host: smtp.mail.ru
|
||||
port: 465
|
||||
# логин и пароль для почты, через которую сервер отправляет письма
|
||||
username: ${email.server.user}
|
||||
password: ${email.server.password}
|
||||
protocol: smtps
|
||||
properties:
|
||||
mail:
|
||||
smtp:
|
||||
auth: true
|
||||
starttls.enable: true
|
||||
sql:
|
||||
init:
|
||||
mode:
|
||||
|
||||
management:
|
||||
endpoints:
|
||||
web:
|
||||
exposure:
|
||||
include: health, info, activity-stat
|
||||
endpoint:
|
||||
health:
|
||||
show-details: always
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
insert into app_users(name, email)
|
||||
values ('Рафаель Губерманович Тыгыдым', 'test@mail.ru');
|
||||
|
||||
insert into app_users(name, email)
|
||||
values ('Артем Демосфенович Шмяк', 'test@mail.ru');
|
||||
|
||||
insert into app_users(name, email)
|
||||
values ('Ифигения Бореславовна Фуфелшмерц', 'test@mail.ru');
|
||||
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №4');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №13');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №34');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №48');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Очень полезное дело №53');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №11');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №12');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №13');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №14');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №15');
|
||||
|
||||
insert into activity_types(name)
|
||||
values ('Вредное дело №16');
|
||||
|
||||
|
||||
+41
@@ -0,0 +1,41 @@
|
||||
create table activity_types
|
||||
(
|
||||
id bigint auto_increment,
|
||||
name varchar(255),
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
create table app_users
|
||||
(
|
||||
id bigint auto_increment,
|
||||
email varchar(255),
|
||||
name varchar(255),
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
|
||||
create table app_users_activity
|
||||
(
|
||||
id bigint auto_increment,
|
||||
activity_time timestamp,
|
||||
app_user_id bigint,
|
||||
activity_type_id bigint,
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
alter table app_users_activity
|
||||
add constraint app_users_activity_user_id_fk foreign key (app_user_id) references app_users;
|
||||
|
||||
alter table app_users_activity
|
||||
add constraint app_users_activity_activity_type_id_fk foreign key (activity_type_id) references activity_types;
|
||||
|
||||
|
||||
create table activity_stat
|
||||
(
|
||||
id bigint auto_increment,
|
||||
app_user_name varchar(255),
|
||||
activity_type varchar(255),
|
||||
activities_count bigint,
|
||||
primary key (id)
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user