【面试题】如何保证MQ的消息不丢失、不重复

文章目录

  • 一、消息丢失问题的解决方案
    • (一)发送端丢失
    • (二)存储端丢失
      • 1. 同步刷盘
      • 2. Broker 集群
    • (三)消费端丢失
  • 二、消息重复问题的解决方案
    • (一)唯一键约束
    • (二)保存消费记录
  • 三、总结

在消息队列的使用过程中, 消息丢失和消息重复是两个常见且令开发人员困扰的问题。

因为从生产者发送消息,到 Broker 保存消息,再到消费者消费消息,每个环节都暗藏着消息丢失的风险;而消息重复的产生,往往源于生产者的重复发送或消费者的重复接收。

那么接下来,我们深入剖析一下这两个问题及其对应的策略。

一、消息丢失问题的解决方案

(一)发送端丢失

生产者发送消息时,处理不当极易造成消息丢失。目前,主流消息队列普遍支持同步发送和异步发送两种模式。

同步发送时,生产者发送消息后会同步等待 Broker 返回的 ACK 确认消息,只有收到 ACK 才认定消息发送成功;若长时间未收到,则判定发送失败并进行重试。这种方式虽能确保消息不丢失,但会带来性能瓶颈,因此在实际应用中,异步发送更为常用。

以 Kafka 为例,主流消息队列(如 Kafka 和 RocketMQ)通常采用回调函数来保障异步发送时消息不丢失,具体代码如下:

// 配置Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保所有副本都收到消息才确认
props.put("retries", 3); // 重试次数Producer<String, String> producer = new KafkaProducer<>(props);// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "message");// 异步发送消息并添加回调处理
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {// 处理发送失败的情况logger.error("消息发送失败,topic: {}, partition: {}, 异常信息: {}", metadata.topic(), metadata.partition(), exception.getMessage());// 可在此处添加重试逻辑或告警机制} else {// 处理发送成功的情况logger.info("消息发送成功,topic: {}, partition: {}, offset: {}",metadata.topic(), metadata.partition(), metadata.offset());}}
});// 关闭生产者
producer.close();

(二)存储端丢失

即便生产者成功发送消息,也无法保证消息绝对不丢失。因为若消息发送到 Broker 后,在消费者拉取之前,Broker 突然宕机且消息尚未落盘,同样会导致消息丢失。为避免存储阶段的消息丢失,可从以下方面着手:

1. 同步刷盘

异步刷盘存在消息未落盘 Broker 就宕机的风险,而同步刷盘则是在消息成功落盘后,才向 Sender 返回发送成功的确认,从而从消息发送环节保障消息不丢失。在 RocketMQ 中,只需将flushDiskType参数配置为SYNC_FLUSH,即可开启同步刷盘功能。

以下是两种刷盘机制的对比示意图:
在这里插入图片描述
在这里插入图片描述

2. Broker 集群

若 Broker 集群仅有一个节点,即便消息成功落盘,一旦 Broker 发生故障,在恢复前消费者将无法拉取消息;若出现磁盘故障且无法恢复,消息更是会永久丢失。

采用Broker 集群可有效解决该问题。在 Broker 集群环境下,可设置等待 2 个以上节点同步完消息后,再向 Producer 返回成功确认。如此一来,即便某个 Broker 节点挂掉,也能迅速找到替代节点,确保消息的可用性。

以下是 Broker 集群架构图:
在这里插入图片描述

(三)消费端丢失

消费者要确保消息不丢失,需在消费完成后再向 Broker 返回 ACK 确认。主流消息队列中,若 Broker 未收到 ACK,会重新向消费者发送消息。

有时为了解决消息积压问题,消费者会在拉取消息后直接返回 ACK,再异步执行消息处理逻辑。此时,为保证消息不丢失,需在返回 ACK 前将消息持久化到本地,例如保存至数据库,后续可从数据库读取消息进行处理。

以下是消费者消息处理流程图:
在这里插入图片描述

二、消息重复问题的解决方案

消息重复产生的原因主要有两点:

  • 一是生产者发送消息后未收到 ACK,进而进行重复发送;
  • 二是消费者消费完成后,Broker 未收到 ACK,导致消息被重复推送给消费者。

消息重复会对业务产生严重影响,比如电商场景中的重复支付、账务场景中的重复记账等。

以下是消息重复产生原因的分析图:
在这里插入图片描述

在这里插入图片描述

从当前主流消息队列来看,尚无一款能够直接解决消息重复的消费问题,所以通常需要在消费端进行幂等处理

以下是几种常见的幂等处理思路:

(一)唯一键约束

若消息会存储到本地数据库,可将消息 ID 设为唯一键;若消息不存入数据库,也可选取消息 ID 或消息中其他具有唯一性的属性,作为唯一键存储到业务数据表中,以此避免重复消费。

(二)保存消费记录

借助 Redis 保存消息 ID 也是一种有效方式。在消费消息前,先判断 Redis 中是否已存在该消息 ID,示例代码如下:

@Service
public class MessageConsumerService {private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate BusinessService businessService; // 业务处理服务// 消费消息的方法public void consumeMessage(String messageId, String messageBody) {try {// 1. 检查消息是否已消费(利用Redis的原子性操作)Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent("message:consumed:" + messageId, // Redis键名,格式为 message:consumed:{消息ID}"1",                             // 值设为1表示已消费30, TimeUnit.DAYS);             // 设置过期时间,防止内存泄漏if (isConsumed != null && isConsumed) {// 2. 消息未被消费,执行具体业务逻辑try {businessService.processMessage(messageBody);logger.info("消息处理成功,messageId: {}", messageId);} catch (Exception e) {// 业务处理失败,删除Redis标记以便重新消费redisTemplate.delete("message:consumed:" + messageId);logger.error("消息处理失败,已删除消费标记,messageId: {}", messageId, e);throw e; // 向上抛出异常,触发重试机制}} else {// 3. 消息已被消费,直接跳过logger.info("消息已被消费,跳过处理,messageId: {}", messageId);}} catch (Exception e) {// 处理异常情况,可根据业务需求添加告警或补偿逻辑logger.error("消息消费过程中发生异常,messageId: {}", messageId, e);// 可添加额外的重试逻辑或告警通知}}
}

需要注意的是,若消费失败,需及时删除 Redis 中保存的消息 ID,防止后续消息无法正常消费。

在这里插入图片描述

三、总结

最后我们用一张图总结一下这篇文章:
在这里插入图片描述

消息不丢失、不重复是消息队列的核心需求,但在实际应用中,满足这一要求并非易事。

对于消息丢失问题,主流消息队列可通过消息重试和消息持久化等手段有效解决;然而,消息重试机制又不可避免地带来了消息重复的风险。

目前,主流消息队列在处理消息重复问题上缺乏现成解决方案,对于不允许重复消费的业务场景,开发人员需在 消费端实现幂等处理逻辑,以保障业务的准确性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/diannao/86330.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ArcGIS Maps SDK for JavaScript:使用图层过滤器只显示FeatureLayer的部分要素

文章目录 引言1 需求场景分析2精确过滤实现方案2.1 基础过滤语法2.2 动态过滤实现 3 模糊查询进阶技巧3.1 LIKE操作符使用3.2 特殊字段处理 4. 性能优化与注意事项4.1 服务端vs客户端过滤4.2 最佳实践建议 5 常见问题解答 引言 在地图应用开发中&#xff0c;图层过滤是常见的需…

day25-计算机网络-3

1. DNS解析流程 windows host文件是否配置域名对应的ip查询本地DNS缓存是否有这个域名对应的ip询问本地DNS&#xff08;网卡配置的&#xff09;是否知晓域名对应的ip本地DNS访问根域名解析服务器&#xff0c;但是根DNS只有顶级域名的记录&#xff0c;根告诉我们.cn顶级域名的D…

中达瑞和SHIS高光谱相机在黑色水彩笔墨迹鉴定中的应用

在文件检验与物证溯源领域&#xff0c;对书写材料&#xff08;如墨水&#xff09;进行快速、准确、无损的鉴别至关重要。由陈维娜等人撰写的《高光谱技术结合化学计量法鉴别黑色水彩笔墨迹》&#xff08;发表于《光谱学与光谱分析》2023年第7期&#xff09;利用中达瑞和SHIS凝采…

华为OD机考 - 水仙花数 Ⅰ(2025B卷 100分)

import java.util.*; public static Integer get(int count,int c){if(count<3||count>7){return -1;}//存储每位数的最高位……最低位int[] arr new int[count];List<Integer> res new ArrayList<>();for(int i(int) Math.pow(10,count-1);i<(int) Math…

Go 标准库 encoding/gob 快速上手

文章目录 1.简介2.基础3.类型和值4.编码细节5.安全6.主要函数6.1 注册1. 接口的底层类型在运行时才能确定2.类型标识的唯一性3.安全性与显式意图4.与结构体的自动处理对比5.示例分析为什么不能像 JSON 那样自动处理&#xff1f;总结 6.2 编码6.3 解码 7.示例7.1 编解码结构体7.…

Ubuntu ifconfig 查不到ens33网卡

BUG&#xff1a;ifconfig查看网络配置信息&#xff1a; 终端输入以下命令&#xff1a; sudo service network-manager stop sudo rm /var/lib/NetworkManager/NetworkManager.state sudo service network-manager start - service network - manager stop &#xff1a;停止…

算法-数论

C-小红的数组查询&#xff08;二&#xff09;_牛客周赛 Round 95 思路&#xff1a;不难看出a数组是有循环的 d3,p4时&#xff0c;a数组&#xff1a;1、0、3、2、1、0、3、2....... 最小循环节为4&#xff0c;即最多4种不同的数 d4,p6时&#xff0c;a数组&#xff1a;1、5、3、…

CSS中text-align: justify文本两端对齐

text-align: justify; 是 CSS 中用于控制文本对齐方式的属性值&#xff0c;它的核心作用是让文本两端对齐&#xff08;分散对齐&#xff09;&#xff0c;使段落左右边缘整齐排列。以下是详细解析&#xff1a; 作用效果 均匀分布间距 浏览器会自动调整单词/字符之间的间距&#…

WebFuture:启动数据库提示: error while loading shared libraries: libaio.so.1问题处理

问题分析 当出现./mysqld: error while loading shared libraries: libaio.so.1: cannot open shared object file: No such file or directory这个错误时&#xff0c;这意味着 MySQL 服务器&#xff08;mysqld&#xff09;在启动过程中无法找到libaio.so.1这个共享库文件。li…

74常用控件_QSpacerItem的使用

目录 代码⽰例: 创建⼀组左右排列的按钮. Spacer 使⽤布局管理器的时候, 可能需要在控件之间, 添加⼀段空⽩. 就可以使⽤ QSpacerItem 来表⽰. 核⼼属性 属性说明width宽度height高度hData水平方向的 sizePolicy - QSizePolicy::Ignored&#xff1a;忽略控件的尺寸&#xf…

vmware 设置 dns

vmware 设置 dns 常用的 DNS&#xff08;Domain Name System&#xff09;服务器地址可以帮助你更快、更安全地解析域名。以下是一些国内外常用的公共 DNS 服务&#xff1a; 国内常用 DNS 阿里云 DNS IPv4: 223.5.5.5、223.6.6.6IPv6: 2400:3200::1、2400:3200:baba::1特点&am…

从一次日期格式踩坑经历,谈谈接口设计中的“约定大于配置“

从一次日期格式踩坑经历&#xff0c;谈谈接口设计中的"约定大于配置" 背景 最近在对接一个第三方接口时&#xff0c;遇到了一个有趣的"坑"。接口文档中要求传入一个符合 RFC3339 格式的日期时间字符串&#xff0c;格式示例为&#xff1a;2019-10-01T08:1…

高考数学易错考点01 | 临阵磨枪

文章目录 前言集合与函数不等式数列三角函数 前言 本篇内容下载于网络&#xff0c;网络上的都是以 WORD 版本呈现&#xff0c;缺字缺图很不完整&#xff0c;没法使用&#xff0c;我只是做了补充和完善。有空准备进行第二次完善&#xff0c;添加问题解释的链接。 集合与函数 …

YOLO12 改进|融入 Mamba 架构:插入视觉状态空间模块 VSS Block 的硬核升级

在医学图像分割领域&#xff0c;传统卷积神经网络&#xff08;CNNs&#xff09;受限于局部感受野&#xff0c;难以捕捉长距离依赖关系&#xff0c;而基于 Transformer 的模型因自注意力机制的二次计算复杂度&#xff0c;在处理高分辨率图像时效率低下。近年来&#xff0c;状态空…

MATLAB遍历生成20到1000个节点的无线通信网络拓扑推理数据

功能&#xff1a; 遍历生成20到1000个节点的无线通信网络拓扑推理数据&#xff0c;包括网络拓扑和每个节点发射的电磁信号&#xff0c;采样率1MHz/3000&#xff0c;信号时长5.7s&#xff0c;单帧数据波形为实采 数据生成效果&#xff1a; 拓扑及空间位置&#xff1a; 节点电磁…

oss:上传图片到阿里云403 Forbidden

访问图片出现403Forbidden问题&#xff0c;我们可以直接登录oss账号&#xff0c;查看对应权限是否开通&#xff0c;是否存在跨域问题

香橙派3B学习笔记8:snap安装管理软件包_打包俩个有调用的python文件

现在尝试一下打包多个有互相调用的 py程序&#xff1a; ssh &#xff1a; orangepi本地ip 密码 &#xff1a; orangepi 操作系统发行版&#xff1a; 基于 Ubuntu 20.04.6 LTS&#xff08;Focal Fossa&#xff09;的定制版本&#xff0c;专门为 Orange Pi 设备优化。PRETTY_NAM…

Spring Boot 中实现 HTTPS 加密通信及常见问题排查指南

Spring Boot 中实现 HTTPS 加密通信及常见问题排查指南 在金融行业安全审计中&#xff0c;未启用HTTPS的Web应用被列为高危漏洞。通过正确配置HTTPS&#xff0c;可将中间人攻击风险降低98%——本文将全面解析Spring Boot中HTTPS的实现方案与实战避坑指南。 一、HTTPS 核心原理与…

前端对WebSocket进行封装,并建立心跳监测

WebSocket的介绍&#xff1a; WebSocket 是一种在客户端和服务器之间进行全双工、双向通信的协议。它是基于 HTTP 协议&#xff0c;但通过升级&#xff08;HTTP 升级请求&#xff09;将连接转换为 WebSocket 协议&#xff0c;从而提供更高效的实时数据交换。 WebSocket 的特点…

【AI】智驾地图在不同自动驾驶等级中的作用演变

一、功能价值动态模型&#xff1a;基于自动驾驶等级的权重迁移 功能演变四阶段&#xff1a; █ 辅助阶段&#xff08;L2&#xff09;&#xff1a;单功能补足 → █ 拓展阶段&#xff08;L2 NOA&#xff09;&#xff1a;多模态增强 → █ 融合阶段&#xff08;L3&#xff09;…