最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

消息隊(duì)列rabbitmq使用

2022-05-13 14:45 作者:wulizhao1  | 我要投稿

1、docker啟動(dòng)

docker?search rabbitmq:management 搜索

docker pull rabbitmq:management? 拉取鏡像

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management 啟動(dòng)

docker logs rabbitmq? 打印日志

此時(shí),可以在瀏覽器登錄,地址http://localhost:15672,用戶名密碼都為guest


2、使用

2.1編寫(xiě)生產(chǎn)者類


public class Producer {

? ? private final static String QUEUE_NAME = "Test";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setUsername("guest");

? ? ? ? factory.setPassword("guest");

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? factory.setVirtualHost("/");


? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? /**

? ? ? ? ?* 生成一個(gè)隊(duì)列

? ? ? ? ?* 1、隊(duì)列名稱 QUEUE_NAME

? ? ? ? ?* 2、隊(duì)列里面的消息是否持久化(默認(rèn)消息存儲(chǔ)在內(nèi)存中)

? ? ? ? ?* 3、該隊(duì)列是否只供一個(gè)Consumer消費(fèi) 是否共享 設(shè)置為true可以多個(gè)消費(fèi)者消費(fèi)

? ? ? ? ?* 4、是否自動(dòng)刪除 最后一個(gè)消費(fèi)者斷開(kāi)連接后 該隊(duì)列是否自動(dòng)刪除

? ? ? ? ?* 5、其他參數(shù)

? ? ? ? ?*/

? ? ? ? channel.queueDeclare(QUEUE_NAME,false,false,false,null);

? ? ? ? String message = "Hello world!";

? ? ? ? /**

? ? ? ? ?* 發(fā)送消息

? ? ? ? ?* 1、發(fā)送到哪個(gè)exchange交換機(jī),2、路由的key,3、其他的參數(shù)信息,4、消息體

? ? ? ? ?*/

? ? ? ? channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

? ? ? ? System.out.println("send?'"+message+"'");


? ? ? ? channel.close();

? ? ? ? connection.close();

? ? }

}


2.2編寫(xiě)消費(fèi)者類

public class Receiver {

? ? private final static String QUEUE_NAME = "Test";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setUsername("guest");

? ? ? ? factory.setPassword("guest");

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? factory.setVirtualHost("/");

? ? ? ? factory.setConnectionTimeout(600000);//milliseconds

? ? ? ? factory.setRequestedHeartbeat(60);//seconds

? ? ? ? factory.setHandshakeTimeout(6000);//milliseconds

? ? ? ? factory.setRequestedChannelMax(5);

? ? ? ? factory.setNetworkRecoveryInterval(500);


? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();


? ? ? ? channel.queueDeclare(QUEUE_NAME,false,false,false,null);

? ? ? ? System.out.println("Waiting for messages. ");


? ? ? ? Consumer consumer = new DefaultConsumer(channel) {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {

? ? ? ? ? ? ? ? String message = new String(body, "UTF-8");

? ? ? ? ? ? ? ? System.out.println(" received '" + message + "'");

? ? ? ? ? ? }

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME,true,consumer);

? ? }

}

2.3工具類

public class RabbitMqUtils {

? ? public static Channel getChannel() throws Exception{

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setUsername("guest");

? ? ? ? factory.setPassword("guest");

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? return channel;

? ? }

}

2.4?設(shè)置手動(dòng)應(yīng)答

rabbitmq將message發(fā)送給消費(fèi)者后,就會(huì)將該消息標(biāo)記為刪除。如果消費(fèi)者在處理message過(guò)程中宕機(jī),沒(méi)有來(lái)得及處理消息,則會(huì)導(dǎo)致消息的丟失。
因此,需要設(shè)置手動(dòng)應(yīng)答。代碼如下:


2.4.1 生產(chǎn)者

public class Task?{

? ? private static final String TASK_QUEUE_NAME = "ack_queue";

? ? public static void main(String[] args) throws Exception{

? ? ? ? try(Channel channel = RabbitMqUtils.getChannel()){

? ? ? ? ? ? channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);

? ? ? ? ? ? Scanner scanner = new Scanner(System.in);

? ? ? ? ? ? System.out.println("請(qǐng)輸入信息");

? ? ? ? ? ? while(scanner.hasNext()){

? ? ? ? ? ? ? ? String message = scanner.nextLine();

? ? ? ? ? ? ? ? channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());

? ? ? ? ? ? ? ? System.out.println("生產(chǎn)者發(fā)出消息"+ message);

? ? ? ? ? ? }

? ? ? ? }

? ? }

}


2.4.1?消費(fèi)者

public class Work?{

? ? private static final String ACK_QUEUE_NAME = "ack_queue";

? ? public static void main(String[] args) throws Exception{

? ? ? ? Channel channel = RabbitMqUtils.getChannel();

? ? ? ? DeliverCallback deliverCallback = (consumerTag,delivery)->{

? ? ? ? ? ? String message = new String(delivery.getBody());

? ? ? ? ? ? SleepUtils.sleep(1);

? ? ? ? ? ? System.out.println("接收到消息:"+message);

? ? ? ? ? ? /**

? ? ? ? ? ? ?* 1、消息的標(biāo)記tag

? ? ? ? ? ? ?* 2、是否批量應(yīng)答

? ? ? ? ? ? ?*/

? ? ? ? ? ? channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

? ? ? ? };

? ? ? ? CancelCallback cancelCallback = (consumerTag)->{

? ? ? ? ? ? System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

? ? ? ? };

? ? ? ? //采用手動(dòng)應(yīng)答

? ? ? ? boolean autoAck = false;

? ? ? ? channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);

? ? }

}

工具類SleepUtils?:

public class SleepUtils {

? ? public static void sleep(int second){

? ? ? ? try{

? ? ? ? ? ? Thread.sleep(1000*second);

? ? ? ? }catch (InterruptedException _ignored){

? ? ? ? ? ? Thread.currentThread().interrupt();

? ? ? ? }

? ? }

}

2.5?不公平分發(fā)

rabbitmq默認(rèn)是輪詢分發(fā):生產(chǎn)者依次向消費(fèi)者按順序發(fā)送消息。但是當(dāng)消費(fèi)者A處理速度很快,而消費(fèi)者B處理速度很慢時(shí),這種分發(fā)策略顯然是不合理的。需要設(shè)置為不公平分發(fā):

int prefetchCount = 10;

channel.basicQos(prefetchCount);

通過(guò)此配置,rabbitmq會(huì)優(yōu)先將message分發(fā)給空閑消費(fèi)者

消息隊(duì)列rabbitmq使用的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
嘉定区| 如东县| 墨竹工卡县| 区。| 邮箱| 普定县| 新巴尔虎左旗| 东城区| 新宁县| 汝阳县| 五峰| 根河市| 马尔康县| 深水埗区| 运城市| 蒙阴县| 湘阴县| 互助| 农安县| 安阳市| 德阳市| 宜川县| 古交市| 溧阳市| 成都市| 华阴市| 和田市| 罗城| 平陆县| 铜陵市| 靖安县| 修水县| 昌邑市| 合肥市| 穆棱市| 霍州市| 太谷县| 长阳| 盐池县| 米易县| 彭山县|