消息隊(duì)列rabbitmq使用
1、docker啟動(dòng)
docker?search rabbitmq:management 搜索
docker pull rabbitmq:management? 拉取鏡像
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management 啟動(dòng)
docker logs rabbitmq? 打印日志
此時(shí),可以在瀏覽器登錄,地址http://localhost:15672,用戶名密碼都為guest
2、使用
2.1編寫(xiě)生產(chǎn)者類
public class Producer {
? ? private final static String QUEUE_NAME = "Test";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setUsername("guest");
? ? ? ? factory.setPassword("guest");
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? factory.setVirtualHost("/");
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? /**
? ? ? ? ?* 生成一個(gè)隊(duì)列
? ? ? ? ?* 1、隊(duì)列名稱 QUEUE_NAME
? ? ? ? ?* 2、隊(duì)列里面的消息是否持久化(默認(rèn)消息存儲(chǔ)在內(nèi)存中)
? ? ? ? ?* 3、該隊(duì)列是否只供一個(gè)Consumer消費(fèi) 是否共享 設(shè)置為true可以多個(gè)消費(fèi)者消費(fèi)
? ? ? ? ?* 4、是否自動(dòng)刪除 最后一個(gè)消費(fèi)者斷開(kāi)連接后 該隊(duì)列是否自動(dòng)刪除
? ? ? ? ?* 5、其他參數(shù)
? ? ? ? ?*/
? ? ? ? channel.queueDeclare(QUEUE_NAME,false,false,false,null);
? ? ? ? String message = "Hello world!";
? ? ? ? /**
? ? ? ? ?* 發(fā)送消息
? ? ? ? ?* 1、發(fā)送到哪個(gè)exchange交換機(jī),2、路由的key,3、其他的參數(shù)信息,4、消息體
? ? ? ? ?*/
? ? ? ? channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
? ? ? ? System.out.println("send?'"+message+"'");
? ? ? ? channel.close();
? ? ? ? connection.close();
? ? }
}
2.2編寫(xiě)消費(fèi)者類
public class Receiver {
? ? private final static String QUEUE_NAME = "Test";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setUsername("guest");
? ? ? ? factory.setPassword("guest");
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? factory.setVirtualHost("/");
? ? ? ? factory.setConnectionTimeout(600000);//milliseconds
? ? ? ? factory.setRequestedHeartbeat(60);//seconds
? ? ? ? factory.setHandshakeTimeout(6000);//milliseconds
? ? ? ? factory.setRequestedChannelMax(5);
? ? ? ? factory.setNetworkRecoveryInterval(500);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME,false,false,false,null);
? ? ? ? System.out.println("Waiting for messages. ");
? ? ? ? Consumer consumer = new DefaultConsumer(channel) {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
? ? ? ? ? ? ? ? String message = new String(body, "UTF-8");
? ? ? ? ? ? ? ? System.out.println(" received '" + message + "'");
? ? ? ? ? ? }
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME,true,consumer);
? ? }
}
2.3工具類
public class RabbitMqUtils {
? ? public static Channel getChannel() throws Exception{
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setUsername("guest");
? ? ? ? factory.setPassword("guest");
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? return channel;
? ? }
}
2.4?設(shè)置手動(dòng)應(yīng)答
rabbitmq將message發(fā)送給消費(fèi)者后,就會(huì)將該消息標(biāo)記為刪除。如果消費(fèi)者在處理message過(guò)程中宕機(jī),沒(méi)有來(lái)得及處理消息,則會(huì)導(dǎo)致消息的丟失。
因此,需要設(shè)置手動(dòng)應(yīng)答。代碼如下:
2.4.1 生產(chǎn)者
public class Task?{
? ? private static final String TASK_QUEUE_NAME = "ack_queue";
? ? public static void main(String[] args) throws Exception{
? ? ? ? try(Channel channel = RabbitMqUtils.getChannel()){
? ? ? ? ? ? channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
? ? ? ? ? ? Scanner scanner = new Scanner(System.in);
? ? ? ? ? ? System.out.println("請(qǐng)輸入信息");
? ? ? ? ? ? while(scanner.hasNext()){
? ? ? ? ? ? ? ? String message = scanner.nextLine();
? ? ? ? ? ? ? ? channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
? ? ? ? ? ? ? ? System.out.println("生產(chǎn)者發(fā)出消息"+ message);
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
2.4.1?消費(fèi)者
public class Work?{
? ? private static final String ACK_QUEUE_NAME = "ack_queue";
? ? public static void main(String[] args) throws Exception{
? ? ? ? Channel channel = RabbitMqUtils.getChannel();
? ? ? ? DeliverCallback deliverCallback = (consumerTag,delivery)->{
? ? ? ? ? ? String message = new String(delivery.getBody());
? ? ? ? ? ? SleepUtils.sleep(1);
? ? ? ? ? ? System.out.println("接收到消息:"+message);
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 1、消息的標(biāo)記tag
? ? ? ? ? ? ?* 2、是否批量應(yīng)答
? ? ? ? ? ? ?*/
? ? ? ? ? ? channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
? ? ? ? };
? ? ? ? CancelCallback cancelCallback = (consumerTag)->{
? ? ? ? ? ? System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
? ? ? ? };
? ? ? ? //采用手動(dòng)應(yīng)答
? ? ? ? boolean autoAck = false;
? ? ? ? channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
? ? }
}
工具類SleepUtils?:
public class SleepUtils {
? ? public static void sleep(int second){
? ? ? ? try{
? ? ? ? ? ? Thread.sleep(1000*second);
? ? ? ? }catch (InterruptedException _ignored){
? ? ? ? ? ? Thread.currentThread().interrupt();
? ? ? ? }
? ? }
}
2.5?不公平分發(fā)
rabbitmq默認(rèn)是輪詢分發(fā):生產(chǎn)者依次向消費(fèi)者按順序發(fā)送消息。但是當(dāng)消費(fèi)者A處理速度很快,而消費(fèi)者B處理速度很慢時(shí),這種分發(fā)策略顯然是不合理的。需要設(shè)置為不公平分發(fā):
int prefetchCount = 10;
channel.basicQos(prefetchCount);
通過(guò)此配置,rabbitmq會(huì)優(yōu)先將message分發(fā)給空閑消費(fèi)者