RabbitMQ 4.1.1-Local random exchange体验

Local Random Exchange

一种 RabbitMQ 4.0+ 引入的新型交换机,主要是为 request-reply(RPC)场景 设计的。

  • 使用这种交换机时,消息只会被路由到本地节点上的队列,可以确保极低的消息发布延迟。
  • 如果有多个本地队列绑定到该交换机,它会随机选择一个队列接收消息。

关键点总结:

  • 本地传输:不会把消息发到其他节点的队列。
  • 随机选队列:多个本地队列中随机挑一个。
  • 发布快:避免了跨节点网络通信,延迟低。
  • 最适合用于 RPC 模式,即“请求-响应”

建议将 Local Random Exchange 和 exclusive(独占)队列搭配使用,这样可以为 RPC 场景提供更低延迟的组合。
注意

  • Exclusive 队列是 RabbitMQ 中只对某个连接开放的临时队列(通常用于响应)。
  • LRE + Exclusive Queue,可以避免消息在集群中转发,提高响应速度。

LRE 不转发消息到其他节点,所以如果当前节点没有合适的队列,消息会被直接丢弃!
所以使用时你必须确保每个节点上都至少有一个消费者绑定的队列

在这里插入图片描述

在 RabbitMQ 前面加负载均衡器(load balancer)会让这种交换机类型几乎无法正常工作。

原因分析

  • Local Random Exchange 依赖于消息被投递到“本地绑定队列(local queues)”
  • 如果用了负载均衡,客户端连接可能随机落在任何节点上,消息将发给该节点的本地队列
  • 如果该节点上没有消费者绑定本地队列,消息就会被丢弃

实操如下
application.properties

# JVM内存配置
# 设置较小的堆内存,避免占用过多系统资源
spring.jvm.memory=-Xmx256m -Xms128m -XX:MaxMetaspaceSize=128m# 设置较小的线程栈大小
spring.jvm.thread-stack-size=-Xss256k# 启用GC日志,帮助诊断内存问题
spring.jvm.gc-log=-Xlog:gc*:file=./logs/gc.log:time,uptime,level,tags:filecount=5,filesize=10m# 设置较小的代码缓存大小
spring.jvm.code-cache=-XX:ReservedCodeCacheSize=128m# 启用内存压缩指针基址设置,将Java堆放在4GB以上地址空间
spring.jvm.heap-base=-XX:HeapBaseMinAddress=4g# 启用G1垃圾收集器的更积极设置
spring.jvm.gc-tuning=-XX:G1ReservePercent=10 -XX:G1HeapRegionSize=4m -XX:InitiatingHeapOccupancyPercent=30# 禁用显式GC调用
spring.jvm.disable-explicit-gc=-XX:+DisableExplicitGC

application.yml

#定义要使用的交换机和队列名称
spring:application:name: local-random-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host# 连接超时时间connection-timeout: 10000# 日志配置
logging:level:org.springframework.amqp: INFO     # AMQP日志级别com.example: DEBUG                 # 应用日志级别
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ 配置类,用于创建 Local Random Exchange(本地随机交换机)和绑定的 RPC 队列。* 本配置主要用于实现基于 RabbitMQ 的 RPC 模式,使用 Local Random Exchange 类型降低延迟。*/
@Configuration
public class RabbitConfig {// Local Random Exchange 名称(自定义交换机)public static final String LRE_EXCHANGE = "lre.exchange";// RPC 使用的队列名称public static final String RPC_QUEUE_NAME = "rpc.queue";/*** 声明一个 Local Random Exchange(x-local-random 类型的交换机)。** - durable: true 表示交换机会持久化* - autoDelete: false 表示不会在没有绑定队列时自动删除* - arguments: 可选参数,此处为空*/@Beanpublic CustomExchange lreExchange() {Map<String, Object> args = new HashMap<>();return new CustomExchange(LRE_EXCHANGE, "x-local-random", true, false, args);}/*** 声明一个 RPC 队列。** - durable: false 表示不持久化(重启后丢失)* - exclusive: false 表示不是只被当前连接独占* - autoDelete: true 表示连接断开后自动删除队列** 如果你要模拟 RPC Client 的 exclusive 回调队列,建议用 `exclusive = true`。*/@Beanpublic Queue rpcQueue() {return new Queue(RPC_QUEUE_NAME, false, false, true);}/*** 将 RPC 队列绑定到 Local Random Exchange 上。** - routingKey 设置为 "",因为 Local Random Exchange 不关心路由键*/@Beanpublic Binding binding(Queue rpcQueue, CustomExchange lreExchange) {return BindingBuilder.bind(rpcQueue).to(lreExchange).with("").noargs();}
}

方式一、手动监听

package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** 模拟 RPC 客户端,用于通过 RabbitMQ 的 Local Random Exchange 发送请求并接收异步响应。*/
@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 向服务器发送请求,并设置回调队列接收响应。** @param message 请求消息内容* @return 返回一个确认字符串(实际响应在回调中处理)*/public String sendRequest(String message) throws Exception {// 生成唯一标识 correlationId,用于标识请求-响应配对String correlationId = UUID.randomUUID().toString();// 临时生成一个独占的匿名回调队列(例如 amq.gen-xxxxxx)String replyQueue = rabbitTemplate.execute(channel -> channel.queueDeclare().getQueue());// 设置 RabbitTemplate 的回调地址(其实不会生效于 send 模式,仅用于演示)rabbitTemplate.setReplyAddress(replyQueue);rabbitTemplate.setReplyTimeout(5000); // 设置超时时间(ms)rabbitTemplate.setCorrelationKey("correlation_id"); // 设置用于匹配的属性名(可选)// 设置监听器容器,监听回调队列中的响应消息SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());container.setQueueNames(replyQueue); // 指定监听的队列container.setMessageListener(new MessageListenerAdapter(new Object() {// 定义接收到消息后的处理方法(方法名必须与监听器默认匹配或显式指定)@SuppressWarnings("unused")public void handleMessage(byte[] reply) {String response = new String(reply, StandardCharsets.UTF_8);System.out.println("Got reply: " + response);// 实际中这里应唤醒等待线程或放入响应Map中(基于 correlationId)}}));container.start(); // 启动监听器容器// 构造请求消息,设置 reply_to 和 correlation_id 属性MessageProperties props = new MessageProperties();props.setReplyTo(replyQueue);             // 告诉服务端响应要发到这个队列props.setCorrelationId(correlationId);    // 服务端会原样返回,用于客户端识别对应响应Message request = new Message(message.getBytes(), props);// 通过 RabbitTemplate 发送消息到本地随机交换机(Local Random Exchange)rabbitTemplate.send(RabbitConfig.LRE_EXCHANGE, "", request);return "Request sent with correlationId: " + correlationId;}
}

方式二、推荐写法
也可以用使用 Spring AMQP 的官方推荐 RPC 模式(即 convertSendAndReceive())的实现方式。这种方式完全利用了 RabbitTemplate 的自动 reply-to、correlationId、超时机制 —— 更加简单可靠

package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public String sendRequest(String message) {// 设置超时时间(可选)rabbitTemplate.setReplyTimeout(5000);// 使用 convertSendAndReceive 会自动:// 1. 创建一个临时 reply queue(exclusive)// 2. 设置 reply_to 和 correlation_id// 3. 等待结果并返回Object response = rabbitTemplate.convertSendAndReceive(RabbitConfig.LRE_EXCHANGE, "", message);if (response != null) {return "Received response: " + response.toString();} else {return "No response received (timeout or error)";}}
}

两者优点总结

功能原来方式(手动监听)convertSendAndReceive()(推荐)
reply_to自动处理❌ 手动✅ 自动
correlation_id 匹配❌ 手动✅ 自动
超时控制❌ 复杂✅ 简单
代码复杂度
推荐程度✅✅✅

RPC服务端处理
方式一 手动

package com.example.consumer;import com.example.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RpcServer {/*** RabbitMQ RPC 服务端处理方法* * 使用 @RabbitListener 监听指定队列,当接收到客户端请求时,手动获取 reply_to 和 correlation_id,* 并通过底层 channel 手动发送响应消息。** @param message        收到的消息正文* @param correlationId  唯一标识此次 RPC 请求的 ID(由客户端生成并设置)* @param replyTo        回调队列(客户端临时队列)* @param requestMessage 原始 AMQP 消息对象* @param channel        底层通信通道,用于手动发送响应* @return null(返回值不会被用来发送响应,因为我们是手动发送的)*/@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)public String handleRpc(String message,@Header(AmqpHeaders.CORRELATION_ID) String correlationId,@Header(AmqpHeaders.REPLY_TO) String replyTo,Message requestMessage,Channel channel) throws IOException {// 构造服务端响应内容String response = "Processed: " + message;// 打印收到的信息和即将回应的队列System.out.println("replyTo: " + replyTo + ", Server received: " + message + ", correlationId: " + correlationId);// 构造响应消息的属性,确保带上原始 correlationIdMessageProperties replyProps = new MessageProperties();replyProps.setCorrelationId(correlationId);// 构造响应消息对象Message reply = new Message(response.getBytes(), replyProps);// 手动发送响应消息到客户端指定的临时队列channel.basicPublish("", replyTo, null, reply.getBody());// 因为手动处理了响应,不需要 Spring 自动回传return null;}
}

方式二自动处理

@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)
public String handleRpc(String message) {System.out.println("Server received: " + message);return "Processed: " + message;
}

运行结果

Request sent with correlationId: 9cf6df25-3e02-47da-96ad-23a21791b391
replay:amq.gen-CcSRdsuLJtjtXOzFUE3Eug Server received: 0测试0 correlationId:9cf6df25-3e02-47da-96ad-23a21791b391
Got reply: Processed: 0测试0
Request sent with correlationId: d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
replay:amq.gen-jnFzJQallOE6QRkZEZyn-Q Server received: 3测试1 correlationId:d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
Got reply: Processed: 3测试1
Request sent with correlationId: 2009671b-ef8d-418c-ae9b-c58c8e0dac83
replay:amq.gen--tLpLz3xs9p_BEZmqJUjFg Server received: 6测试2 correlationId:2009671b-ef8d-418c-ae9b-c58c8e0dac83
Got reply: Processed: 6测试2
Request sent with correlationId: 6637a3dd-4e24-48e5-871f-cd671ea6d9b6
replay:amq.gen-CejNGqwNk6bWPkxrQLvH7Q Server received: 9测试3 correlationId:6637a3dd-4e24-48e5-871f-cd671ea6d9b6
Got reply: Processed: 9测试3
Request sent with correlationId: c994fab1-75c4-4618-8af8-b03f2fcdfa6f
replay:amq.gen-mdKE_hhHhj_ZEgT-fIm4nw Server received: 12测试4 correlationId:c994fab1-75c4-4618-8af8-b03f2fcdfa6f
Got reply: Processed: 12测试4
Request sent with correlationId: b27d1409-d595-47f8-b920-2d4ad23288d2
replay:amq.gen-ofZgztMXNh9MMEejK6DDGA Server received: 15测试5 correlationId:b27d1409-d595-47f8-b920-2d4ad23288d2
Got reply: Processed: 15测试5
Request sent with correlationId: adc98f0d-5270-4033-86c0-e863cd56ecee
replay:amq.gen-xKkf-7LcEhOzamv892nL8A Server received: 18测试6 correlationId:adc98f0d-5270-4033-86c0-e863cd56ecee
Got reply: Processed: 18测试6
Request sent with correlationId: 87f6722d-e974-474d-a79c-9aea69401fa7
replay:amq.gen-r5jjy4ypnSDso-HZ5PuNWA Server received: 21测试7 correlationId:87f6722d-e974-474d-a79c-9aea69401fa7
Got reply: Processed: 21测试7
Request sent with correlationId: de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
replay:amq.gen-7QDoBB5wqbjLC0MidVSkbA Server received: 24测试8 correlationId:de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
Got reply: Processed: 24测试8
Request sent with correlationId: 1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
replay:amq.gen-1rFRnN9vKCUt6HIrRLSoBw Server received: 27测试9 correlationId:1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
Got reply: Processed: 27测试9

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/pingmian/88184.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中山排气歧管批量自动化智能化3D尺寸测量及cav检测分析

当前制造业快速发展&#xff0c;传统测量方法正面临严峻挑战。生产规模的持续扩张使得现有测量手段逐渐暴露出效率不足的问题&#xff0c;这种技术滞后性正在直接影响企业的整体生产效率。具体表现为测量速度跟不上生产节拍&#xff0c;精度要求难以达标&#xff0c;最终导致生…

Debian 11 Bullseye 在线安装docker

首先移除所有错误的 Docker 软件源&#xff1a;sudo rm -f /etc/apt/sources.list.d/docker*安装必要依赖sudo apt update sudo apt install -y ca-certificates curl gnupg添加 Docker 官方 GPG 密钥&#xff08;使用国内镜像&#xff09;&#xff1a;curl -fsSL https://mirr…

Spring Boot 项目中多数据源配置使用场景

在 Spring Boot 中配置多数据源是一个非常常见的需求&#xff0c;主要用于以下场景&#xff1a; 读写分离&#xff1a;一个主数据库&#xff08;Master&#xff09;负责写操作&#xff0c;一个或多个从数据库&#xff08;Slave&#xff09;负责读操作&#xff0c;以提高性能和可…

FAAC 在海思平台使用得到aac实时音频流

FAAC 在海思平台使用得到aac实时音频流 使用 FAAC将音频 pcm转为 aac 主要参见这篇博客 FAAC 在君正平台使用得到aac实时音频流_君正 x2600 音频-CSDN博客

javascript函数参数类似python函数参数星号*解耦数组

序言通常情况下&#xff0c;我们很可能不清楚参数有多少&#xff0c;这个时候用的都是数组。但是使用数组和单个元素&#xff0c;从内心情感来说&#xff0c;它们是两种维度&#xff0c;为了让参数成为一个数组&#xff0c;把单个输入的参数强加一个数组的外壳&#xff0c;并不…

C语言基础(1)

1.编译器的选择 我们的c语言是一门&#xff0c;我们写的c语言代码是文本文件(存放在.c为后缀的文件中)&#xff0c;文本文件本身无法被执行&#xff0c;必须通过编译器的编译和链接器的链接&#xff0c;生成可执行的二进制文件&#xff0c;才能够被执行注意&#xff1a; 每个源…

Rust赋能美团云原生DevOps实践

Rust 云原生 DevOps 实践 在云原生环境中,Rust 的高性能与安全性使其成为构建微服务和基础设施工具的理想选择。Docker 作为容器化标准工具,结合 Rust 的跨平台特性,可高效实现持续集成与部署(CI/CD)。 构建优化的 Rust Docker 镜像 多阶段构建是 Rust 项目容器化的关键…

计算机网络实验——配置ACL

ACL基础一、实验目的1. 配置H3C路由器基本ACL。二、实验要求1. 熟练掌握网络配置能力。2. 熟练掌握ACL基本配置。三、实验步骤&#xff08;1&#xff09;使用reset saved-configuration命令和reboot命令&#xff0c;重置路由器原有配置&#xff0c;如图1所示。图 1&#xff08;…

在本地部署mcp服务器实现自然语言操作mysql数据库,轻松实现数据表的增~ 删~ 改~ 查~

1.将写好的mcp_server代码放在本地任意盘&#xff01; import asyncio import logging import os import sys from mysql.connector import connect, Error from mcp.server import Server from mcp.types import Resource, Tool, TextContent from pydantic import AnyUrl# Co…

2025快手创作者中心发布视频python实现

难度还行&#xff0c;只有一个__NS_sig3加密&#xff0c;流程麻烦点cookies_list cookie.split("; ")cookie_dict {}# 遍历每个 Cookie&#xff0c;根据等号将键值对拆分并添加到字典中for cookie in cookies_list:key_value cookie.split("")if len(ke…

Android 组件内核

文章目录什么是binder1. 什么是Binder&#xff1f;2. Binder架构组成3. 工作原理与通信流程1&#xff09;服务注册2&#xff09;服务查询3&#xff09;通信过程4&#xff09;核心数据结构4. 关键技术点5. 常见面试考点1&#xff09;Binder与传统IPC&#xff08;Socket、管道、共…

java类加载机制:Tomcat的类加载机制

Tomcat类加载机制深度解析&#xff1a;打破双亲委派的Web容器实现 Tomcat作为Java Web容器&#xff0c;其类加载机制为满足Web应用的隔离性、热部署和兼容性需求&#xff0c;对标准Java类加载机制进行了定制化扩展&#xff0c;核心是打破双亲委派模型并引入多层级类加载器。以下…

【PTA数据结构 | C语言版】从顺序表 list 中删除第 i 个元素

本专栏持续输出数据结构题目集&#xff0c;欢迎订阅。 文章目录题目代码题目 请编写程序&#xff0c;将 n 个整数存入顺序表&#xff0c;对任一指定的第 i 个位置&#xff0c;将这个位置上的元素从顺序表中删除。注意&#xff1a;i 代表位序&#xff0c;从 1 开始&#xff0c;…

VS2022 C++ EasyX库 扫雷游戏项目开发:打造经典游戏的详细之旅

老样子&#xff0c;先上效果 视频演示 C经典扫雷-介绍一、引言 在这篇博客中&#xff0c;我将详细介绍扫雷游戏项目的开发过程。扫雷作为一款经典的游戏&#xff0c;其规则简单但富有挑战性。通过开发这个项目&#xff0c;我不仅加深了对 C 编程的理解&#xff0c;还提升了自己…

Go语言网络游戏服务器模块化编程

本文以使用origin框架&#xff08;一款使用Go语言写的开源游戏服务器框架&#xff09;为例进行说明&#xff0c;当然也可以使用其它的框架或者自己写。 在框架中PBProcessor用来处理Protobuf消息&#xff0c;在使用之前&#xff0c;需要使用Register函数注册网络消息&#xff…

【机器人】Aether 多任务世界模型 | 4D动态重建 | 视频预测 | 视觉规划

Aether 是一个的世界模型&#xff0c;整合几何重建与生成建模的统一框架&#xff0c;实现类人空间推理能力。 来自ICCV 2025&#xff0c;该框架具有三大核心功能&#xff1a; (1) 4D动态重建&#xff0c;(2) 动作条件视频预测&#xff0c; (3) 目标条件视觉规划。 代码地址&…

MiniMind:3小时训练26MB微型语言模型,开源项目助力AI初学者快速入门

开发&#xff5c;界面&#xff5c;引擎&#xff5c;交付&#xff5c;副驾——重写全栈法则&#xff1a;AI原生的倍速造应用流来自全栈程序员 nine 的探索与实践&#xff0c;持续迭代中。 欢迎关注评论私信交流~ 在大型语言模型(LLaMA、GPT等)日益流行的今天&#xff0c;一个名为…

相机Camera日志实例分析之五:相机Camx【萌拍闪光灯后置拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…

[2-02-02].第03节:环境搭建 - Win10搭建ES集群环境

ElasticSearch学习大纲 基于ElasticSearch7.8版本 一、ElasticStack下载&#xff1a; 1.Elasticsearch 的官方地址 2.Elasticsearch 下载地址&#xff1a; 二、集群搭建: 第1步&#xff1a;创建es目录&#xff1a; 1.创建 elasticsearch-cluster 文件夹&#xff0c;在内部…

操作系统核心技术剖析:从Android驱动模型到鸿蒙微内核的国产化实践

目录 一、移动端操作系统技术细节 1. Android 内核版本 核心模块 驱动架构 国内定制案例 2. iOS XNU内核关键模块 安全机制 3. HarmonyOS 多内核架构 驱动隔离 二、PC端操作系统技术细节 1. Windows NT内核 模块分层 驱动模型 国内适配 2. macOS&#xff08;X…