Java搭建RabbitMq消息中間件過程詳解
這篇文章主要介紹了Java搭建RabbitMq消息中間件過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
前言
當(dāng)系統(tǒng)中出現(xiàn)“生產(chǎn)“和“消費“的速度或穩(wěn)定性等因素不一致的時候,就需要消息隊列。
名詞
- exchange: 交換機
- routingkey: 路由key
- queue:隊列
控制臺端口:15672
exchange和queue是需要綁定在一起的,然后消息發(fā)送到exchange再由exchange通過routingkey發(fā)送到對應(yīng)的隊列中。
使用場景
1.技能訂單3分鐘自動取消,改變狀態(tài)
2.直播開始前15分鐘提醒
3.直播狀態(tài)自動結(jié)束
流程
生產(chǎn)者發(fā)送消息 —> order_pre_exchange交換機 —> order_per_ttl_delay_queue隊列
—> 時間到期 —> order_delay_exchange交換機 —> order_delay_process_queue隊列 —> 消費者
第一步:在pom文件中添加
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:在application.properties文件中添加
spring.rabbitmq.host=172.xx.xx.xxx spring.rabbitmq.port=5672 spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
第三步:配置 OrderQueueConfig
package com.tuohang.platform.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitMQ的隊列設(shè)置(生產(chǎn)者發(fā)送的消息,永遠是先進入exchange,再通過路由,轉(zhuǎn)發(fā)到隊列)
*
*
* @author Administrator
* @version 1.0
* @Date 2018年9月18日
*/
@Configuration
public class OrderQueueConfig {
/**
* 訂單緩沖交換機名稱
*/
public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";
/**
* 發(fā)送到該隊列的message會在一段時間后過期進入到order_delay_process_queue 【隊列里所有的message都有統(tǒng)一的失效時間】
*/
public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";
/**
* 訂單的交換機DLX 名字
*/
final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";
/**
* 訂單message時間過期后進入的隊列,也就是訂單實際的消費隊列
*/
public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";
/**
* 訂單在緩沖隊列過期時間(毫秒)30分鐘
*/
public final static int ORDER_QUEUE_EXPIRATION = 1800000;
/**
* 訂單緩沖交換機
*
* @return
*/
@Bean
public DirectExchange preOrderExange() {
return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
}
/**
* 創(chuàng)建order_per_ttl_delay_queue隊列,訂單消息經(jīng)過緩沖交換機,會進入該隊列
*
* @return
*/
@Bean
public Queue delayQueuePerOrderTTLQueue() {
return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
.withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
.withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 設(shè)置訂單隊列的過期時間
.build();
}
/**
* 將order_pre_exchange綁定到order_pre_ttl_delay_queue隊列
*
* @param delayQueuePerOrderTTLQueue
* @param preOrderExange
* @return
*/
@Bean
public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
}
/**
* 創(chuàng)建訂單的DLX exchange
*
* @return
*/
@Bean
public DirectExchange delayOrderExchange() {
return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
}
/**
* 創(chuàng)建order_delay_process_queue隊列,也就是訂單實際消費隊列
*
* @return
*/
@Bean
public Queue delayProcessOrderQueue() {
return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
}
/**
* 將DLX綁定到實際消費隊列
*
* @param delayProcessOrderQueue
* @param delayExchange
* @return
*/
@Bean
public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
}
/**
* 監(jiān)聽訂單實際消費者隊列order_delay_process_queue
*
* @param connectionFactory
* @param processReceiver
* @return
*/
@Bean
public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
OrderProcessReceiver processReceiver) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽order_delay_process_queue
container.setMessageListener(new MessageListenerAdapter(processReceiver));
return container;
}
}
消費者 OrderProcessReceiver :
package com.tuohang.platform.config;
import java.util.Objects;
import org.apache.tools.ant.types.resources.selectors.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
/**
* 訂單延遲處理消費者
*
*
* @author Administrator
* @version 1.0
* @Date 2018年9月18日
*/
@Component
public class OrderProcessReceiver implements ChannelAwareMessageListener {
private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);
String msg = "The failed message will auto retry after a certain delay";
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
processMessage(message);
} catch (Exception e) {
// 如果發(fā)生了異常,則將該消息重定向到緩沖隊列,會在一定延遲之后自動重做
channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
msg.getBytes());
}
}
/**
* 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常)
*
* @param message
* @throws Exception
*/
public void processMessage(Message message) throws Exception {
String realMessage = new String(message.getBody());
logger.info("Received <" + realMessage + ">");
// 取消訂單
if(!Objects.equals(realMessage, msg)) {
// SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
System.out.println("測試111111-----------"+new Date());
System.out.println(message);
}
}
}
或者
/**
* 測試 rabbit 消費者
*
*
* @author Administrator
* @version 1.0
* @Date 2018年9月25日
*/
@Component
@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
public class TestProcessReceiver {
private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);
String msg = "The failed message will auto retry after a certain delay";
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
try {
processMessage(message);
//告訴服務(wù)器收到這條消息 已經(jīng)被我消費了 可以在隊列刪掉;否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會在發(fā)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 如果發(fā)生了異常,則將該消息重定向到緩沖隊列,會在一定延遲之后自動重做
channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
msg.getBytes());
}
}
/**
* 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常)
*
* @param message
* @throws Exception
*/
public void processMessage(Message message) throws Exception {
String realMessage = new String(message.getBody());
logger.info("Received < " + realMessage + " >");
// 取消訂單
if(!Objects.equals(realMessage, msg)) {
System.out.println("測試111111-----------"+new Date());
}else {
System.out.println("rabbit else...");
}
}
}
生產(chǎn)者
/**
* 測試rabbitmq
*
* @return
*/
@RequestMapping(value = "/testrab")
public String testraa() {
GenericResult gr = null;
try {
String name = "test_pre_ttl_delay_queue";
long expiration = 10000;//10s 過期時間
rabbitTemplate.convertAndSend(name,String.valueOf(123456));
// 在單個消息上設(shè)置過期時間
//rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));
} catch (ServiceException e) {
e.printStackTrace();
gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
}
return getWrite(gr);
}
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持我們。
您可能感興趣的文章
- 01-10Java實現(xiàn)動態(tài)模擬時鐘
- 01-10利用Java實現(xiàn)復(fù)制Excel工作表功能
- 01-10JavaWeb實現(xiàn)郵件發(fā)送功能
- 01-10java基于poi導(dǎo)出excel透視表代碼實例
- 01-10Java實現(xiàn)動態(tài)數(shù)字時鐘
- 01-10基于Java驗證jwt token代碼實例
- 01-10java實現(xiàn)液晶數(shù)字字體顯示當(dāng)前時間
- 01-10淺談Java中真的只有值傳遞么
- 01-10Java動態(tài)顯示當(dāng)前日期和時間
- 01-10如何解決線程太多導(dǎo)致java socket連接池出現(xiàn)的問題


閱讀排行
本欄相關(guān)
- 01-10Java實現(xiàn)動態(tài)模擬時鐘
- 01-10Springboot中@Value的使用詳解
- 01-10JavaWeb實現(xiàn)郵件發(fā)送功能
- 01-10利用Java實現(xiàn)復(fù)制Excel工作表功能
- 01-10Java實現(xiàn)動態(tài)數(shù)字時鐘
- 01-10java基于poi導(dǎo)出excel透視表代碼實例
- 01-10java實現(xiàn)液晶數(shù)字字體顯示當(dāng)前時間
- 01-10基于Java驗證jwt token代碼實例
- 01-10Java動態(tài)顯示當(dāng)前日期和時間
- 01-10淺談Java中真的只有值傳遞么
隨機閱讀
- 01-10使用C語言求解撲克牌的順子及n個骰子
- 01-11ajax實現(xiàn)頁面的局部加載
- 08-05DEDE織夢data目錄下的sessions文件夾有什
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 01-10delphi制作wav文件的方法
- 04-02jquery與jsp,用jquery
- 01-10SublimeText編譯C開發(fā)環(huán)境設(shè)置
- 08-05織夢dedecms什么時候用欄目交叉功能?
- 08-05dedecms(織夢)副欄目數(shù)量限制代碼修改
- 01-10C#中split用法實例總結(jié)


