SpringBoot使用RabbitMQ延時隊列(小白必備)
1.什么是MQ
MQ,是一種跨進程的通信機制,用于上下游傳遞消息。
在互聯(lián)網(wǎng)架構(gòu)中,MQ是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。
使用了MQ之后,消息發(fā)送上游只需要依賴MQ,不用依賴其他服務(wù)。
為什么會產(chǎn)生消息列隊?
- 不同進程(process)之間傳遞消息時,兩個進程之間耦合程度過高,改動一個進程,引發(fā)必須修改另一個進程,為了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),所有兩進程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨修改某一個進程,不會影響另一個;
- 不同進程(process)之間傳遞消息時,為了實現(xiàn)標準化,將消息的格式規(guī)范化了,并且,某一個進程接受的消息太多,一下子無法處理完,并且也有先后順序,必須對收到的消息進行排隊,因此誕生了事實上的消息隊列;
延時列隊的使用場景?
- 訂單業(yè)務(wù):在淘寶或者京東購買東西,用戶下單后未付款則30分鐘后取消訂單。
- 短信通知:手機用戶交完話費后,幾分鐘之內(nèi)將會收到繳費信息
2.什么是RabbitMQ(這里就做了一下簡單介紹)
RabbitMQ是一種消息隊列 ,用于常見的進程通信。支持點對點,請求應(yīng)答和發(fā)布訂閱模式 并且提供多種語言的支持。常見的java,c#,php都支持。
常被用在異步處理,應(yīng)用解耦。流量消鋒等復(fù)雜的業(yè)務(wù)場景中。和java的kafka一樣都屬于消息中間件。
下載地址:
https://www.rabbitmq.com/download.html
進入RabbitMQ官網(wǎng)
1.第一步
第二步
下載好后不要著急安裝RabbitMQ,我們這里還需要安裝Erlang
下載地址:http://www.erlang.org/download/otp_win64_17.3.exe
安裝步驟
步驟一
步驟二
步驟三
步驟四
安裝完成
現(xiàn)在安裝RabbitMQ
步驟一
步驟二
步驟三
安裝完成
啟動RabbitMQ管理工具
開始菜單 — 最新添加 — 展開 — 選中雙擊
輸入命令:rabbitmq-plugins enable rabbitmq_management
效果如果圖
在瀏覽器中輸入地址查看:http://127.0.0.1:15672/
出現(xiàn)次頁面代表成功,默認用戶和密碼都是guest/ guest
若不出現(xiàn)此頁面,就是安裝失敗了,不要慌,多半問題在系統(tǒng)用戶名必須是中文(放心有解決辦法):
Windows下安裝RabbitMQ后,按正常RabbitMQ會自動注冊服務(wù)并自動啟動,但是如果有的道友不注意中英文目錄就會出現(xiàn)服務(wù)啟動后幾秒鐘自動停止,而且反反復(fù)復(fù)。
出現(xiàn)這種情況一般都是由我們的用戶名是中文,而導(dǎo)致默認的DB和log訪問出現(xiàn)問。所以我建議以后大家在使用windows操作系統(tǒng)的時候盡量用英文來命名文件或目錄,這樣會極大的減小以后安裝軟件出現(xiàn)莫名其妙的問題的bug。
接下來我們先卸載我們的RabbitMQ,然后在我們的系統(tǒng)變量里設(shè)置一個RABBITMQ_BASE 的變量路徑為一個不含英文的路徑 比如 E:\rabbit,最后我們重新安裝RabbitMQ即可,然后就會看到RabbitMQ服務(wù)自動注冊了,并且不會自動停止。
SpringBoot整合RabbitMQ
1.添加依賴
pom.xml中添加 spring-boot-starter-amqp的依賴
<!-- spring-boot-starter-amqp的依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
其他依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
application.yml文件中配置rabbitmq相關(guān)內(nèi)容
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
這里我們環(huán)境就搭建起來了
2.具體編碼實現(xiàn)
配置列隊
package com.example.spring_boot_rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author:zq
* @date: Greated in 2019/12/19 11:46
* 配置隊列
*/
@Configuration
@Slf4j
public class DelayRabbitConfig {
/**
* 延遲隊列 TTL 名稱
*/
private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
/**
* DLX,dead letter發(fā)送到的 exchange
* 延時消息就是發(fā)送到該交換機的
*/
public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
/**
* routing key 名稱
* 具體消息發(fā)送在該 routingKey 的
*/
public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
public static final String ORDER_QUEUE_NAME = "user.order.queue";
public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
public static final String ORDER_ROUTING_KEY = "order";
/**
* 延遲隊列配置
* <p>
* 1、params.put("x-message-ttl", 5 * 1000);
* 第一種方式是直接設(shè)置 Queue 延遲時間 但如果直接給隊列設(shè)置過期時間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認是時間小的優(yōu)先)
* 2、rabbitTemplate.convertAndSend(book, message -> {
* message.getMessageProperties().setExpiration(2 * 1000 + "");
* return message;
* });
* 第二種就是每次發(fā)送消息動態(tài)設(shè)置延遲時間,這樣我們可以靈活控制
**/
@Bean
public Queue delayOrderQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 聲明了隊列里的死信轉(zhuǎn)發(fā)到的DLX名稱,
params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
// x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時攜帶的 routing-key 名稱。
params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
}
/**
* 需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。
* 這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉(zhuǎn)發(fā),
* 不會轉(zhuǎn)發(fā)dog.puppy,也不會轉(zhuǎn)發(fā)dog.guard,只會轉(zhuǎn)發(fā)dog。
* @return DirectExchange
*/
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange(ORDER_DELAY_EXCHANGE);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
}
@Bean
public Queue orderQueue() {
return new Queue(ORDER_QUEUE_NAME, true);
}
/**
* 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。
* 符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
**/
@Bean
public TopicExchange orderTopicExchange() {
return new TopicExchange(ORDER_EXCHANGE_NAME);
}
@Bean
public Binding orderBinding() {
// TODO 如果要讓延遲隊列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機很關(guān)鍵
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
}
}
創(chuàng)建一個Order實體類
package com.example.spring_boot_rabbitmq.pojo;
import lombok.Data;
import java.io.Serializable;
/**
* @author:zq
* @date: Greated in 2019/12/19 11:49
*/
@Data
public class Order implements Serializable {
private static final long serialVersionUID = -2221214252163879885L;
private String orderId; // 訂單id
private Integer orderStatus; // 訂單狀態(tài) 0:未支付,1:已支付,2:訂單已取消
private String orderName; // 訂單名字
}
接收者
package com.example.spring_boot_rabbitmq;
import com.example.spring_boot_rabbitmq.pojo.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author:zq
* @date: Greated in 2019/12/19 11:53
* 接收者
*/
@Component
@Slf4j
public class DelayReceiver {
@RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
public void orderDelayQueue(Order order, Message message, Channel channel) {
log.info("###########################################");
log.info("【orderDelayQueue 監(jiān)聽的消息】 - 【消費時間】 - [{}]- 【訂單內(nèi)容】 - [{}]", new Date(), order.toString());
if(order.getOrderStatus() == 0) {
order.setOrderStatus(2);
log.info("【該訂單未支付,取消訂單】" + order.toString());
} else if(order.getOrderStatus() == 1) {
log.info("【該訂單已完成支付】");
} else if(order.getOrderStatus() == 2) {
log.info("【該訂單已取消】");
}
log.info("###########################################");
}
}
發(fā)送者
package com.example.spring_boot_rabbitmq;
import com.example.spring_boot_rabbitmq.pojo.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author:zq
* @date: Greated in 2019/12/19 11:55
* 發(fā)送者
*/
@Component
@Slf4j
public class DelaySender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDelay(Order order) {
log.info("【訂單生成時間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString() );
this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
// 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時候就指定好延遲時間還是在發(fā)送自己控制時間
message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
return message;
});
}
}
測試,訪問http://localhost:8080/sendDelay查看日志輸出
package com.example.spring_boot_rabbitmq;
import com.example.spring_boot_rabbitmq.pojo.Order;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
/**
* @author:zq
* @date: Greated in 2019/12/19 11:57
* 測試
*/
@RestController
public class TestController {
@Autowired
private DelaySender delaySender;
@GetMapping("/sendDelay")
public Object sendDelay() {
Order order1 = new Order();
order1.setOrderStatus(0);
order1.setOrderId("123456");
order1.setOrderName("小米6");
Order order2 = new Order();
order2.setOrderStatus(1);
order2.setOrderId("456789");
order2.setOrderName("小米8");
delaySender.sendDelay(order1);
delaySender.sendDelay(order2);
return "ok";
}
}
輸出
到此已經(jīng)SpringBoot使用RabbitMQ延時隊列已經(jīng)完成,希望對你有所幫助,若有地方不理解或者有更好的辦法請留言,謝謝。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持我們。
您可能感興趣的文章
- 01-10Springboot中@Value的使用詳解
- 01-10使用Swing繪制動態(tài)時鐘
- 01-10springboot實現(xiàn)文件上傳步驟解析
- 01-10springboot jta atomikos實現(xiàn)分布式事物管理
- 01-10JAVA8獨有的map遍歷方式(非常好用)
- 01-10Java搭建RabbitMq消息中間件過程詳解
- 01-10如何基于SpringBoot部署外部Tomcat過程解析
- 01-10springboot集成fastDfs過程代碼實例
- 01-10Java使用Scanner類進行控制臺輸入實現(xiàn)方法
- 01-10SPRINGBOOT讀取PROPERTIES配置文件數(shù)據(jù)過程詳解


閱讀排行
本欄相關(guān)
- 01-10Java實現(xiàn)動態(tài)模擬時鐘
- 01-10Springboot中@Value的使用詳解
- 01-10JavaWeb實現(xiàn)郵件發(fā)送功能
- 01-10利用Java實現(xiàn)復(fù)制Excel工作表功能
- 01-10Java實現(xiàn)動態(tài)數(shù)字時鐘
- 01-10java基于poi導(dǎo)出excel透視表代碼實例
- 01-10java實現(xiàn)液晶數(shù)字字體顯示當(dāng)前時間
- 01-10基于Java驗證jwt token代碼實例
- 01-10Java動態(tài)顯示當(dāng)前日期和時間
- 01-10淺談Java中真的只有值傳遞么
隨機閱讀
- 04-02jquery與jsp,用jquery
- 01-10delphi制作wav文件的方法
- 01-10使用C語言求解撲克牌的順子及n個骰子
- 08-05織夢dedecms什么時候用欄目交叉功能?
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 01-10SublimeText編譯C開發(fā)環(huán)境設(shè)置
- 08-05DEDE織夢data目錄下的sessions文件夾有什
- 01-11ajax實現(xiàn)頁面的局部加載
- 01-10C#中split用法實例總結(jié)
- 08-05dedecms(織夢)副欄目數(shù)量限制代碼修改


