网站首页 > 精选教程 正文
想象一下,你正在构建一个超级重要的电商系统,就像一个庞大而繁忙的商业帝国。在这个系统中,消息的传递就像是帝国的信使,负责在各个部门之间传递重要的指令和信息。但是,如果这些消息像调皮的小精灵一样,时不时地重复出现,那可就会引发一场大混乱啦!比如说,用户可能会收到重复的订单通知,商家可能会重复处理同一个订单,这不仅会让用户感到困惑和不满,还可能给系统带来严重的问题。所以呀,在 RabbitMQ 中,如何保证消息不被重复消费,就成了我们必须要攻克的一个难题!我将带你深入RabbitMQ的腹地,揭秘那些让消息消费稳如泰山的独门秘籍,让你的系统从此告别“重复消费”的噩梦!
在RabbitMQ的世界里,保证消息不被重复消费,其实是一场与时间和逻辑的精彩博弈。关键在于,我们要确保每个消息只被处理一次,即便在系统故障、重启或网络波动等情况下也能如此。那么,如何实现这一宏伟目标呢?
一、消息唯一标识
首先,我们要为每一条消息赋予一个独一无二的标识,就像给每个小精灵贴上专属的魔法标签。在发送消息的时候,把这个魔法标签一并发送出去。当消费者接收到消息后,先检查一下这个标签是否已经见过了。如果见过,那就说明这个小精灵已经完成任务啦,直接忽略它;如果没见过,那就让它开始工作,并把这个魔法标签记录下来,以便下次再见到时能够识别出来。
实现步骤:
- 在发送消息之前,生成一个唯一的消息标识,比如使用 UUID 算法。
- 将消息标识和消息内容一起发送到 RabbitMQ 中。
- 消费者接收到消息后,从消息中提取出消息标识,进行重复判断。
import java.util.UUID;
// 生成唯一消息标识的方法
public class MessageUtils {
public static String generateUniqueMessageId() {
// 使用 UUID 生成唯一标识
return UUID.randomUUID().toString();
}
}
举个例子,在我们的电商魔法王国中,当用户下单成功后,系统会这样发送消息:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 生成唯一消息标识
String messageId = MessageUtils.generateUniqueMessageId();
// 要发送的消息内容
String message = "订单信息:商品 A,数量 1";
// 将消息标识和消息内容一起发送到 RabbitMQ 中
channel.basicPublish("exchange_name", "routing_key", null, (messageId + "," + message).getBytes());
System.out.println("消息已发送:" + message);
// 关闭信道和连接
channel.close();
connection.close();
}
}
消费者接收到消息后,从消息中提取出消息标识,进行重复判断:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashSet;
import java.util.Set;
public class Consumer {
private static final Set<String> processedMessageIds = new HashSet<>();
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("queue_name", true, false, false, null);
// 设置消息接收回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 从消息中提取消息标识
String messageId = message.split(",")[0];
// 检查消息标识是否已处理过
if (!processedMessageIds.contains(messageId)) {
System.out.println("接收到新消息:" + message);
// 处理消息逻辑...
// 将消息标识添加到已处理集合中
processedMessageIds.add(messageId);
} else {
System.out.println("重复消息,已忽略:" + message);
}
};
// 开始接收消息
channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> { });
}
}
二、数据库去重表
另一种方法是利用数据库来实现消息的去重。我们可以在数据库中创建一个专门的去重表,用来记录已经处理过的消息标识。当消费者接收到消息后,先在去重表中查询这个消息标识是否存在。如果存在,说明这条消息已经处理过了,直接忽略;如果不存在,就正常处理消息,并在去重表中插入这条消息的标识。
比如说,我们可以在数据库中创建一个名为 “message_processed” 的表,包含两个字段:“message_id” 和 “processed_time”。当消费者接收到消息后,先查询 “message_processed” 表中是否存在相同的 “message_id”。如果存在,说明消息已经处理过了;如果不存在,就处理消息,并在 “message_processed” 表中插入一条新记录,记录消息标识和处理时间。
实现步骤:
- 创建一个数据库表,用于存储已处理消息的标识。
- 消费者在处理消息之前,先查询数据库表中是否存在该消息标识。
- 如果不存在,处理消息,并将消息标识插入到数据库表中。
三、Redis 去重
除了数据库,我们还可以借助 Redis 来实现消息的去重。Redis 是一个非常高效的内存数据库,它的操作速度非常快,可以满足高并发场景下的需求。我们可以使用 Redis 的集合(Set)数据结构来存储已经处理过的消息标识。当消费者接收到消息后,先在 Redis 中查询这个消息标识是否存在于集合中。如果存在,说明消息已经处理过了,直接忽略;如果不存在,就正常处理消息,并将消息标识添加到 Redis 集合中。
比如说,在我们的电商系统中,当用户支付成功后,系统会发送一条支付成功的消息到 RabbitMQ 中。消费者在接收到消息后,先从 Redis 中获取一个名为 “processed_messages” 的集合,然后检查集合中是否包含该消息的标识。如果包含,说明消息已经处理过了;如果不包含,就处理消息,并将消息标识添加到 “processed_messages” 集合中。
实现步骤:
- 连接到 Redis 服务器。
- 在 Redis 中创建一个集合,用于存储已处理消息的标识。
- 消费者在处理消息之前,先查询 Redis 集合中是否存在该消息标识。
- 如果不存在,处理消息,并将消息标识添加到 Redis 集合中。
四、死信队列与重试机制
在 RabbitMQ 的魔法世界里,还有一个非常强大的工具,那就是死信队列(Dead Letter Queue,简称 DLQ)和重试机制。当消息在处理过程中遇到问题,比如消费者处理消息时抛出了异常,或者消息在队列中等待了太长时间而没有被消费,这些消息就可以被转移到死信队列中。然后,我们可以针对死信队列中的消息进行重试或者其他特殊处理。
实现步骤:
- 创建普通队列和死信交换器、死信队列:首先,我们需要创建一个普通的队列,用于接收和处理消息。同时,还需要创建一个死信交换器和一个死信队列。死信交换器用于将死信路由到死信队列中。
channel.queueDeclare("normal_queue", true, false, false, null);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlq_queue", true, false, false, null);
channel.queueBind("dlq_queue", "dlx_exchange", "dlq_routing_key");
- 设置普通队列的死信参数:在创建普通队列时,我们需要设置一些死信参数,告诉 RabbitMQ 当消息满足什么条件时,将其转移到死信队列中。这些参数包括死信交换器的名称、死信路由键以及消息的过期时间等。
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "dlq_routing_key");
args.put("x-message-ttl", 60000); // 设置消息过期时间为 60 秒(可根据实际情况调整)
channel.queueDeclare("normal_queue", true, false, false, args);
- 消费者处理消息并进行重试:当消费者从普通队列中获取到消息并进行处理时,如果处理过程中出现异常,我们可以根据实际情况决定是否进行重试。如果需要重试,我们可以将消息重新发送回原来的队列,或者发送到一个专门用于重试的队列中。在重新发送消息之前,我们可以根据需要修改消息的一些属性,比如重试次数等。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理消息逻辑,可能会抛出异常
System.out.println("正在处理消息:" + message);
// 模拟处理消息时出现异常
throw new Exception("处理消息失败");
} catch (Exception e) {
System.out.println("处理消息失败,将进行重试");
// 获取消息的重试次数
int retryCount = getRetryCount(delivery);
if (retryCount < 3) { // 假设最大重试次数为 3 次
// 增加重试次数
retryCount++;
// 将重试次数设置到消息头部
AMQP.BasicProperties properties = delivery.getProperties();
properties = properties.builder().headers(putRetryCount(retryCount)).build();
// 将消息重新发送回队列
channel.basicPublish("", "normal_queue", properties, delivery.getBody());
} else {
// 超过最大重试次数,将消息发送到死信队列
System.out.println("超过最大重试次数,将消息发送到死信队列");
channel.basicPublish("dlx_exchange", "dlq_routing_key", null, delivery.getBody());
}
}
};
- 处理死信队列中的消息:我们可以创建一个专门的消费者来处理死信队列中的消息。这个消费者可以根据具体的业务需求,对死信队列中的消息进行相应的处理,比如记录日志、发送告警通知等。
DeliverCallback dlqDeliverCallback = (consumerTag, delivery) -> {
String deadLetterMessage = new String(delivery.getBody(), "UTF-8");
System.out.println("从死信队列中获取到消息:" + deadLetterMessage);
// 处理死信队列中的消息逻辑...
};
channel.basicConsume("dlq_queue", true, dlqDeliverCallback, consumerTag -> { });
五、ACK 机制
ACK(Acknowledgment)机制就像是消费者给 RabbitMQ 的一个反馈信号,告诉它消息是否已经被成功处理。当消费者成功处理完一条消息后,它会向 RabbitMQ 发送一个 ACK 确认消息。RabbitMQ 收到 ACK 后,就会知道这条消息已经被处理了,然后将其从队列中移除。如果消费者在处理消息的过程中出现了问题,或者没有在规定的时间内发送 ACK,RabbitMQ 会认为这条消息没有被成功处理,从而将其重新发送给其他消费者进行处理。
实现步骤:
- 消费者在订阅队列时,设置 autoAck 参数为 false,表示需要手动发送 ACK。
channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> { });
- 在消息处理逻辑中,当消息处理成功后,调用 channel.basicAck 方法发送 ACK 确认消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理消息逻辑
System.out.println("正在处理消息:" + message);
// 处理成功后发送 ACK 确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 处理消息失败时的逻辑
System.out.println("处理消息失败");
}
};
六、RabbitMQ 事务
此外,RabbitMQ 还提供了事务机制,就像给我们的魔法操作加上了一层坚固的护盾。通过事务,我们可以确保消息的发送和相关操作要么全部成功,要么全部失败,从而避免出现消息发送成功但后续处理失败的情况。
实现步骤:
- 开启事务:在生产者代码中,使用 channel.txSelect () 方法开启事务。
- 发送消息:像平常一样发送消息到 RabbitMQ 队列中。
- 提交事务:如果消息发送和相关操作都成功,使用 channel.txCommit () 方法提交事务,此时消息才会真正被发送到队列中。
- 回滚事务:如果在发送消息或执行相关操作过程中出现错误,可以使用 channel.txRollback () 方法回滚事务,消息将不会被发送到队列中。
好啦,今天关于 RabbitMQ 中如何保证消息不被重复消费、死信队列与重试机制以及事务的实现方法就介绍到这里啦!希望这些魔法秘籍能够帮助大家在实际开发中更好地驾驭 RabbitMQ,避免消息重复消费和丢失带来的混乱,让我们的系统更加稳定和可靠。
小伙伴们,你们在使用 RabbitMQ 的奇妙旅程中还遇到过哪些有趣的挑战和问题呢?快来评论区和大家分享吧!让我们一起在 Java 的魔法世界里不断探索,共同成长!
别忘了,点赞、分享、关注,让我们一起在技术的海洋里乘风破浪,共创辉煌!你的每一次互动,都是我前进的动力!
- 上一篇: 踩坑日记(四):记一次重复提交引发的线上事故
- 下一篇: Java实现Http多次请求复用同一连接
猜你喜欢
- 2024-12-05 Java11新特性-效能翻倍的HttpClient
- 2024-12-05 LeetCode每日一题,无重复字符的最长子串
- 2024-12-05 类型安全的http客户端retrofit介绍、使用、实现原理分析
- 2024-12-05 RabbitMQ消息重复消费问题如何解决
- 2024-12-05 高频面试题:kafka怎么避免重复消费?
- 2024-12-05 RabbitMQ消息丢失、积压、重复等解决方案
- 2024-12-05 程序员们一定要注意避免重复记录日志撑爆ELK而被辞退
- 2024-12-05 面试:如何保证接口的幂等性?常见的实现方案有哪些?
- 2024-12-05 每日分享- 如何保证 Java 语言接口的幂等性?
- 2024-12-05 Kafka如何防止消费速度过慢触发rebalance导致重复消费
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- nginx反向代理 (57)
- nginx日志 (56)
- nginx限制ip访问 (62)
- mac安装nginx (55)
- java和mysql (59)
- java中final (62)
- win10安装java (72)
- java启动参数 (64)
- java链表反转 (64)
- 字符串反转java (72)
- java逻辑运算符 (59)
- java 请求url (65)
- java信号量 (57)
- java定义枚举 (59)
- java字符串压缩 (56)
- java中的反射 (59)
- java 三维数组 (55)
- java插入排序 (68)
- java线程的状态 (62)
- java异步调用 (55)
- java中的异常处理 (62)
- java锁机制 (54)
- java静态内部类 (55)
- java怎么添加图片 (60)
- java 权限框架 (55)
本文暂时没有评论,来添加一个吧(●'◡'●)