博客
关于我
springboot+redis整合rabbitmq
阅读量:773 次
发布时间:2019-03-24

本文共 10503 字,大约阅读时间需要 35 分钟。

RabbitMQ入门及实战指南

什么是消息队列 messaging queue (MQ)

消息队列是一种应用程序间的通信方法,通过将消息发送到队列并由接收方读取,是一种脱耦(Decoupling)机制。其核心优势在于:

  • 异步通信:消息发送者与接收者不需要即时响应
  • 系统间通信:支持系统、服务、文件间信息交换
  • 可扩展性:系统性能可通过增加队列节点来提升
  • 灵活性:适用于计数、日志、订单、异步任务等多种场景

RabbitMQ概述

RabbitMQ是一款流行的开源消息中间件,由LIFY团队开发,广泛应用于互联网企业及中小型项目。其独特之处在于:

  • 交换机类型:提供Direct Exchange、Topic Exchange、Fanout Exchange
  • 消息确认机制:确保消息可靠传输
  • 高可用性:支持集群部署,消息持久化
  • 可伸缩性:通过增加队列节点处理更多请求

本文将基于RabbitMQ的Direct Exchange(直连交换机)进行实战演示,结合消息确认及Redis防重复消费优化。


RabbitMQ安装

采用Docker安装RabbitMQ,步骤如下:

# 拉取镜像docker pull rabbitmq:management# 启动RabbitMQdocker run -d -p 15672:15672 -p 5672:5672 rabbitmq:management

访问地址:http://localhost:15672,默认账户密码均为guest


Spring Boot项目配置

1.项目依赖

pom.xml中添加RabbitMQ支持:

org.springframework.boot
spring-boot-starter-amqp

2.配置类

创建RabbitMQConfig.java

import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {    public static final String EXCHANGE_A = "exchange-A";    public static final String QUEUE_A = "queue-a";    public static final String ROUTING_KEY_A = "routing-key-A";    @Bean    public DirectExchange exchangeA() {        return new DirectExchange(EXCHANGE_A);    }    @Bean    public Queue queueA() {        return new Queue(QUEUE_A, true);    }    @Bean    public Binding binding() {        return BindingBuilder.bind(queueA()).to(exchangeA()).with(ROUTING_KEY_A);    }    @Bean    @Scope("prototype") // 指ミ Cumhuriyetletonapis.org持有_RGisteredBean(GethetBeangenerates a new instance 每次静脉注射    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {        RabbitTemplate template = new RabbitTemplate(connectionFactory);        template.setMandatory(true);        template.setMessageConverter(new SerializerMessageConverter());        return template;    }}

3.全局配置

修改application.yml

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    publisher-confirm-type: correlated    publisher-returns: true

消息生产

1.服务接口

创建ConfirmCallbackService.java

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {    private static final Logger log = LoggerFactory.getLogger(ConfirmCallbackService.class);    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if (!ack) {            log.error("消息发送异常: cause={}", cause);        } else {            log.info("消息确认成功: correlationId={}, ack={}", correlationData.getId(), ack);        }    }}

2.消息回调

创建ReturnCallbackService.java

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {    private static final Logger log = LoggerFactory.getLogger(ReturnCallbackService.class);    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        log.info("消息已返回: replyCode={}, replyText={}, exchange={}, routingKey={}",               replyCode, replyText, exchange, routingKey);    }}

3.消息发送

修改ProducerService.java

import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class ProducerService {    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private ConfirmCallbackService confirmCallbackService;    @Autowired    private ReturnCallbackService returnCallbackService;    public void sendMessage(String exchange, String routingKey, Object msg) {        rabbitTemplate.setMandatory(true);        rabbitTemplate.setConfirmCallback(confirmCallbackService);        rabbitTemplate.setReturnCallback(returnCallbackService);        final rabbitTemplate.convertAndSend(            exchange, routingKey, msg,            message -> {                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);                return message;            },            new CorrelationData(UUID.randomUUID().toString())        );    }}

4.控制器配置

修改RabbitMQController.java

@RestControllerpublic class RabbitMQController {    private static final String SUCCESS = "success";    @Autowired    private ProducerService producerService;    @GetMapping("send")    public String send() {        producerService.sendMessage(            RabbitMQConfig.EXCHANGE_A,             RabbitMQConfig.ROUTING_KEY_A,             "你好!");        return SUCCESS;    }}

消息消费(Consumer)

1.队列监听

创建ReceiverMessage.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.stereotype.Component;@Componentpublic class ReceiverMessage {    @RabbitListener(queues = "queue-a")    public void process(String msg, Message message, Channel channel) {        MessageHeaders headers = message.getHeaders();        Long tag = (Long) headers.get(AmqpHeaders.DeliveryTag);        try {            // TODO: 处理业务逻辑            channel.basicAck(tag, false);        } catch (Exception e) {            log.error("消息处理失败:{}", e.getMessage());            boolean redelivered = (Boolean) headers.get(AmqpHeaders.Reredelivered);            if (redelivered) {                log.warn("消息已重复处理,拒绝处理!");                channel.basicAck(tag, false);            } else {                log.debug("消息将被重新发送队列……");                channel.basicNack(tag, false, true);            }        }    }}

延迟队列实现

1.队列配置

创建DelayRabbitConfig.java

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DelayRabbitConfig {    /**     * 延迟队列名称     */    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";    /**     * DLX交换机名称     */    private static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";    private static final String ORDER_DELAY_ROUTING_KEY = "order_delay";    private static final String ORDER_QUEUE_NAME = "user.order.queue";    private static final String ORDER_EXCHANGE_NAME = "user.order.exchange";    private static final String ORDER_ROUTING_KEY = "order";    @Bean    public Queue delayOrderQueue() {        Map
params = new HashMap<>(); params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME); params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY); return new Queue(ORDER_DELAY_QUEUE, true, false, false, params); } @Bean public DirectExchange orderDelayExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY); } @Bean public Queue orderQueue() { return new Queue(ORDER_QUEUE_NAME, true); } @Bean public TopicExchange orderTopicExchange() { return new TopicExchange(ORDER_EXCHANGE_NAME); } @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY); }}

2.消息生产

修改ProducerService.java

// 在sendMessage方法中添加:message.getMessageProperties().setExpiration(1000 * 30 + "");

3.消费者处理

修改ReceiverMessage.java

@Overridepublic void process(Order order, Message message, Channel channel) {    MessageHeaders headers = message.getHeaders();    Long tag = (Long) headers.get(AmqpHeaders.DeliveryTag);    try {        // 处理业务逻辑        channel.basicAck(tag, false);    } catch (Exception e) {        log.error("消息处理失败:{}", e.getMessage());        boolean redelivered = (Boolean) headers.get(AmqpHeaders.Reredelivered);        if (redelivered) {            log.warn("消息已重复处理,拒绝处理!");            channel.basicAck(tag, false);        } else {            log.debug("消息将被重新发送队列……");            channel.basicNack(tag, false, true);        }    }}

Redis防重复消费

1.依赖配置

在消费项目的pom.xml中添加Redis依赖:

org.springframework.boot
spring-boot-starter-data-redis

2.配置文件

修改application.yml

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    listener:      simple:        concurrency: 5        max-concurrency: 10        prefetch: 1        acknowledge-mode: manual        default-requeue-rejected: true  redis:    database: 0    host: 192.168.0.150    port: 6379    password:    timeout: 3000

3.消息处理

修改ReceiverMessage.java

@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void process(String msg, Message message, Channel channel) {    MessageHeaders headers = message.getHeaders();    Long tag = (Long) headers.get(AmqpHeaders.DeliveryTag);    String msgId = (String) headers.get("spring_returned_message_correlation");    try {        if (redisTemplate.opsForHash().hasKey("test", msgId)) {            log.info("消息已被处理: msgId={}", msgId);            channel.basicAck(tag, false);            return;        }        redisTemplate.opsForHash().put("test", msgId, "处理中...");        int i = 1 / 0; // 确保代码报错,触发回调        channel.basicAck(tag, false);    } catch (Exception e) {        log.error("消息处理失败:{}", e.getMessage());        boolean redelivered = (Boolean) headers.get(AmqpHeaders.Reredelivered);        if (redelivered) {            log.warn("消息已重复处理,拒绝处理!");            channel.basicAck(tag, false);        } else {            log.debug("消息将被重新发送队列……");            channel.basicNack(tag, false, true);        }    }}

通过上述配置,可以确保消息在队列失败时不会重复消费,同时记录到Redis,避免死循环。


测试与验证

1.启动生产者项目

mvn spring-boot:run -Dabc=123

访问http://localhost:8080/send,可以看到success返回,说明消息已发送。

2.查看RabbitMQ管理台

访问http://localhost:15672,可以查看消息发送情况及状态。

3.启动消费者项目

mvn spring-boot:run -Dabc=123

控制台可以看到消息接收日志,说明消费成功。

4.测试延迟队列

访问http://localhost:8080/sendDelay,等待1分钟,控制台可以看到消费日志。

5.验证Redis防重复消费

在队列失败时,Redis记录将阻止重复处理,确保消息安全性。


通过以上步骤,可以完成RabbitMQ的快速入门及实战演示,包括消息生产、消费、延迟队列处理及Redis防重复消费配置。这系列文章适合学习 RabbitMQ 的开发者,帮助您快速掌握 RabbitMQ 的核心功能与应用场景。

转载地址:http://ldvkk.baihongyu.com/

你可能感兴趣的文章
Multicast1
查看>>
mysql client library_MySQL数据库之zabbix3.x安装出现“configure: error: Not found mysqlclient library”的解决办法...
查看>>
MySQL Cluster 7.0.36 发布
查看>>
Multimodal Unsupervised Image-to-Image Translation多通道无监督图像翻译
查看>>
MySQL Cluster与MGR集群实战
查看>>
multipart/form-data与application/octet-stream的区别、application/x-www-form-urlencoded
查看>>
mysql cmake 报错,MySQL云服务器应用及cmake报错解决办法
查看>>
Multiple websites on single instance of IIS
查看>>
mysql CONCAT()函数拼接有NULL
查看>>
multiprocessing.Manager 嵌套共享对象不适用于队列
查看>>
multiprocessing.pool.map 和带有两个参数的函数
查看>>
MYSQL CONCAT函数
查看>>
multiprocessing.Pool:map_async 和 imap 有什么区别?
查看>>
MySQL Connector/Net 句柄泄露
查看>>
multiprocessor(中)
查看>>
mysql CPU使用率过高的一次处理经历
查看>>
Multisim中555定时器使用技巧
查看>>
MySQL CRUD 数据表基础操作实战
查看>>
multisim变压器反馈式_穿过隔离栅供电:认识隔离式直流/ 直流偏置电源
查看>>
mysql csv import meets charset
查看>>