RocketMQ 5.x 端口差异及代码解释
端口差异的原因
在 RocketMQ 5.x 版本中,端口使用与之前版本不同,主要原因如下:
- 架构变化:RocketMQ 5.x 引入了新的 Proxy 模块,作为客户端与 Broker 之间的中间层
- 默认端口:
- 4.x 版本:NameServer 默认端口 9876,Broker 默认端口 10911
- 5.x 版本:Proxy 默认端口 8080/8081
- 分离设计:5.x 将路由发现与消息传输分离,客户端直接连接 Proxy 而非 NameServer/Broker
代码详细解释
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ProducerExample {private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。String endpoint = "localhost:8081";// 消息发送的目标Topic名称,需要提前创建。String topic = "TestTopic";// 1. 获取客户端服务提供者实例ClientServiceProvider provider = ClientServiceProvider.loadService();// 2. 构建客户端配置ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 3. 初始化ProducerProducer producer = provider.newProducerBuilder().setTopics(topic) // 设置预绑定的Topic.setClientConfiguration(configuration) // 设置通信配置.build();// 4. 构建消息Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")// 消息体。.setBody("messageBody".getBytes()).build();try {// 5. 发送消息SendReceipt sendReceipt = producer.send(message);logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {logger.error("Failed to send message", e);}// producer.close();}
}
关键点说明
- Proxy 端口 (8080/8081):
- 这是 RocketMQ 5.x 新增的代理层端口
- 客户端不再直接连接 NameServer 或 Broker
- 支持 gRPC 协议,提供更好的跨语言支持和性能
- ClientServiceProvider:
- 新 API 的入口点,用于获取各种客户端构建器
- 替代了旧版的 DefaultMQProducer 等类
- 消息构建器模式:
- 采用 Builder 模式构建消息和客户端
- 更灵活的配置方式
- 更清晰的 API 设计
- SendReceipt:
- 发送结果对象,包含消息ID等信息
- 替代了旧版的 SendResult
- 资源管理:
- 示例中注释掉了 producer.close()
- 实际生产代码中应该确保关闭资源
与传统版本(4.x)对比
特性 | RocketMQ 4.x | RocketMQ 5.x |
连接方式 | 直连Broker/NameServer | 通过Proxy连接 |
默认端口 | NameServer:9876, Broker:10911 | Proxy:8080/8081 |
客户端API | DefaultMQProducer | ClientServiceProvider + ProducerBuilder |
协议 | Remoting | gRPC为主 |
消息构建 | Message对象构造 | Builder模式 |
这种变化使 RocketMQ 5.x 架构更清晰,更易于维护和扩展,同时提供了更好的跨语言支持。
修改 RocketMQ Proxy 默认端口为 9878
要修改 RocketMQ 5.x 中 Proxy 的默认端口从 8081 改为 9878,你需要进行以下配置调整:
1. 修改 Proxy 配置文件
找到 Proxy 的配置文件(通常是 conf/proxy.conf
),修改以下参数:
# 代理服务端口 (gRPC)
proxy.grpc.listenPort=9878# 如果需要 HTTP 代理端口也修改
proxy.http.listenPort=9879 # 可以保持默认或改为其他端口
2. 修改客户端连接配置
在你的 Java 代码中,将端点(endpoint)修改为新的端口:
// 修改前
String endpoint = "localhost:8081";// 修改后
String endpoint = "localhost:9878";
3. 启动 Proxy 时指定配置文件
启动 Proxy 时确保指定修改后的配置文件:
./bin/mqproxy -c conf/proxy.conf
4. 验证配置
检查 Proxy 是否在新端口上运行:
netstat -tulnp | grep 9878
5. 完整修改后的代码示例
public class ProducerExample {private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException {// 修改为新的 Proxy 端口 9878String endpoint = "localhost:9878";String topic = "TestTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints(endpoint).build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();// ... 其余代码保持不变 ...}
}
注意事项
- 防火墙设置:确保新端口(9878)在防火墙中是开放的
- 多节点部署:如果是集群环境,所有 Proxy 节点都需要相应修改
- 客户端一致性:确保所有生产者和消费者客户端都使用新的端口配置
- 端口冲突检查:确认 9878 端口确实可用,没有被其他服务占用
# 检查端口占用情况
lsof -i :9878
- 日志验证:启动 Proxy 后检查日志,确认它确实监听在新端口上
通过以上步骤,你就可以成功将 RocketMQ Proxy 的端口从默认的 8081 修改为 9878 了。