Appearance
RocketMQ 入门
简介
RocketMQ 是阿里巴巴开源的一款高性能、高可靠的消息队列中间件,它具有以下特点:
- 高性能:RocketMQ 支持高并发、高吞吐量的消息处理能力,能够满足大规模分布式系统的需求。
- 高可靠性:RocketMQ 提供了多种消息存储和消费机制,确保消息的可靠性和持久性。
- 高可用性:RocketMQ 支持集群部署,能够自动进行故障转移和负载均衡,提高系统的可用性。
官网: https://rocketmq.apache.org/zh/
文档:https://rocketmq.apache.org/zh/docs/quickStart/02quickstartWithDocker
Docker 部署RocketMQ
1. 拉取镜像
bash
docker pull apache/rocketmq:5.3.0
2. 创建网络
bash
docker network create rocketmq
2. 启动Name Server容器
bash
docker run -d --name rmqnamesrv -p 9876:9876 --network rocketmq apache/rocketmq:5.3.0 sh rmqnamesrv
查看日志
bash
docker logs -f rmqnamesrv
3. 启动Broker容器
bash
mkdir -p /data/mq/broker
docker cp rmqnamesrv:/home/rocketmq/rocketmq-5.3.0/conf /data/mq/broker
说明
先把name server中的配置文件拷贝出来一份,因为broker启动时需要指定配置文件路径 修改broker.conf,新增brokerIP1=xxx,xxx为宿主机的ip地址
broker.conf文件内容
properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 192.168.118.128
bash
docker run -d \
--name rmqbroker \
--network rocketmq \
-p 10912:10912 -p 10911:10911 -p 10909:10909 \
-p 8080:8080 -p 8081:8081 \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
-v /data/mq/broker/conf:/home/rocketmq/rocketmq-5.3.0/conf \
apache/rocketmq:5.3.0 sh rmqbroker --enable-proxy \
-c /home/rocketmq/rocketmq-5.3.0/conf/broker.conf
4. 启动Console容器
bash
docker pull apacherocketmq/rocketmq-dashboard:latest
bash
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.118.128:9876" -p 9999:8080 -t apacherocketmq/rocketmq-dashboard:latest
使用 RocketMQ
说明
RocketMQ 提供了多种语言的客户端,包括 Java、Python、C++ 等。本文将介绍如何使用 Java 客户端发送和接收消息。
说明
消费者待补充
java项目
创建一个简单的maven项目
引入依赖
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
</dependency>
其中日志不是必须的,可以替换成别的
发送消息
这里需要一个RocketMQ主题
进入之前创建的Broker容器,创建主题
bash
$ docker exec -it rmqbroker bash
$ sh rmqadmin updatetopic -t MyTopic -c DefaultCluster
或者通过控制台创建
java
package com.example;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
// 此处为示例,实际使用时请替换为真实的 Proxy 地址和端口
String endpoint = "192.168.118.128:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "MyTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
try {
producer.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
点击运行,查看结果
接收消息
java
package com.example;
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
// 此处为示例,实际使用时请替换为真实的 Proxy 地址和端口
String endpoints = "192.168.118.128:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "MyTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
pushConsumer.close();
}
}
运行,保持接收接口,再运行发送消息接口,查看结果
Spring Cloud 项目
说明
Spring Cloud 项目
引入依赖
xml
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<spring-cloud-alibaba.version>2021.0.5.0</spring-cloud-alibaba.version>
<spring-cloud.version>2021.0.5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
这里使用的Spring Cloud Alibaba 2021.0.5.0
,Spring Cloud 2021.0.5
版本,Spring Boot 2.6.13
版本
配置文件
yaml
spring:
cloud:
stream:
rocketmq:
binder:
enabled: true
name-server: 192.168.118.128:9876
bindings:
topic1-out-0:
producer:
sync: true
创建一个Controller
java
package com.example.stream.controller;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/stream/mq")
public class MQController {
@Resource
private StreamBridge streamBridge;
@PostMapping("/rocket/send")
public String sendRocketMQ() {
String bindingName = "topic1-out-0";
streamBridge.send(bindingName, "rocketmq", "Hello RocketMQ");
return "success";
}
}
假设nacos
、网关
已经配置好,启动项目,调用接口http://localhost:8080/stream/mq/rocketmq/send
,查看控制台输出
说明
Spring Cloud 版本不需要创建Topic, 项目会自动把topic1-out-0
作为Topic创建。
Spring Cloud Stream RocketMQ
待补充