Appearance
Spring Cloud Stream 集成 RocketMQ
创建项目,引入依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-stream</name>
<description>demo-stream</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.7.18</spring-boot.version>
<spring-cloud-alibaba.version>2021.0.6.1</spring-cloud-alibaba.version>
<spring-cloud.version>2021.0.9</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>${project.artifactId}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.example.stream.DemoStreamApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>local</id>
<properties>
<profiles.active>local</profiles.active>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>dev</id>
<properties>
<profiles.active>dev</profiles.active>
</properties>
</profile>
<profile>
<id>test</id>
<properties>
<profiles.active>test</profiles.active>
</properties>
</profile>
</profiles>
</project>
配置
yaml
spring:
cloud:
stream:
rocketmq:
binder:
enabled: true
name-server: 192.168.118.128:9876
binders:
rocketmq:
type: rocketmq
bindings:
topic1-out-0:
binder: rocketmq
destination: topic1
topic2-in-0:
binder: rocketmq
destination: topic1
group: test-group
nacos:
server-addr: ${NACOS_HOST:127.0.0.1}:8848
config:
namespace: local
group: DEFAULT_GROUP
file-extension: yml
server-addr: ${spring.cloud.nacos.server-addr}
discovery:
namespace: ${spring.cloud.nacos.config.namespace}
group: ${spring.cloud.nacos.config.group}
server-addr: ${spring.cloud.nacos.server-addr}
metadata:
Version: ${service.version}
ip: ${STREAM_IP:127.0.0.1}
config:
import:
- optional:nacos:${spring.application.name}-@profiles.active@.${spring.cloud.nacos.config.file-extension}
生产者发送消息
java
package com.example.stream.controller;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/stream/mq")
public class MQController {
@Resource
private StreamBridge streamBridge;
@PostMapping("/rocket/send")
public String sendRocketMQ() {
String bindingName = "topic1-out-0";
streamBridge.send(bindingName, "rocketmq", "Hello RocketMQ");
return "success";
}
}
消费者接收消息
java
package com.example.stream.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
@Configuration
public class RocketConfig {
@Bean
public Consumer<Message<String>> topic2() {
return message -> {
System.out.println("从topic2收到消息Payload: " + message.getPayload());
System.out.println("从topic2收到消息Header: " + message.getHeaders());
};
}
}