基础:
工作队列主要是为了避免资源密集型任务的立即执行,然后一直等待它执行结束。相反,我们可以安排好任务,然后在执行。我们可以将一个任务封装成一个消息,发送到队列中。由工作者在后台取出任务然后执行。当有多个工作者时,他们共同处理这些任务。
demo:
package rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者从消息队列取消息 * @author thrillerzw * */ public class Reqv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws InterruptedException, IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.200"); // factory.setPort(15672); factory.setUsername("thrillerzw"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //parameters for queue 'hello4' in vhost '/' not equivalent boolean durable=true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); System.out.println("Waiting for messages."); QueueingConsumer consumer = new QueueingConsumer(channel); //事务方式: boolean autoAck :false,需要手工应答,mq才会认为消费成功。 boolean autoAck=false; String consumerTag = channel.basicConsume(QUEUE_NAME, autoAck, consumer); System.out.println("consumerTag=" + consumerTag); int i=0; while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); try { //业务 System.out.println("Received '" + message + "'"); Thread.sleep(100); if(i==2){ throw new Exception(); } //autoAck :false时候应答确认消息处理完成 long deliveryTag=delivery.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); System.out.println(); } catch (Throwable e) { e.printStackTrace(); //测试看是回滚到队列的末尾. //不断回滚可能出现死循环? channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); } i++; } } } package rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * 生产者向消息队列发送消息 * 防止宕机重启内存丢消息, 持久化队列。 * channel.queueDeclare设置 boolean durable:true,channel.basicPublish消息也必须标记为MessageProperties.PERSISTENT_TEXT_PLAIN * @author thrillerzw * */ public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.200"); factory.setUsername("thrillerzw"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //程序声明一个队列,如果没有会添加。持久化队列:durable:true channel.queueDeclare(QUEUE_NAME, true, false, false, null); for(int i=0;i<10;i++){ String message = "Hello World!"+i; //持久化消息:MessageProperties.PERSISTENT_TEXT_PLAIN 非持久化: MessageProperties.TEXT_PLAIN channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("Sent '" + message + "'"); } channel.close(); connection.close(); } } package rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author thrillerzw * */ public class SendLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.200"); factory.setUsername("thrillerzw"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明Exchange for (int i = 0; i <= 2; i++) { String message = "hello word!" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("Sent '" + message + "'"); } channel.close(); connection.close(); } } package rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.200"); factory.setUsername("thrillerzw"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); /* String queueName = "log-fb1"; //通过channel.queueDeclare()来创建一个非持久的队列名 channel.queueDeclare(queueName, false, false, false, null);*/ //rabbitmq创建一个随机的auto-delete的队列,并返回名字。amq.gen-iieJpst2mQghdrXF7nKYCQ / amq.gen-iieJpst2mQghdrXF7nKYCQ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");//把Queue、Exchange绑定 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Received '" + message + "'"); } } }
相关推荐
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
采用python编写的批量删除rabbitmq的队列或交换机。 1.修改rabbitmq_delete.py中rabbitmq的配置; 2.执行以下命令: 删除队列: python3 rabbitmq_delete.py -k ‘udata.climb’ -d 1 删除交换机: python3 rabbitmq_...
rabbitmq
rabbitmq配置文件,用于rabbitmq管理
RabbitMQClientUtil是MQ的测试工具类,他封装了fanout、direct、topic三种exchange模式,并包括发送数据和接收数据。...rabbitmq.properties配置文件根据自己需要自行放在,放置完毕后,请自己修改util中的文件地址
RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...
【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题...
SpringBoot整合RabbitMQ的详细过程 **1.该篇博文首先讲述了交换机和队列之间的绑定关系** ①direct、②fanout、③topic **2.然后讲消息的回调** 四种情况下,确认触发哪个回调函数: ①消息推送到server,但是在...
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在...
windows版本rabbitmq安装包 里面内含rabbitmq-server-3.9.13.exe、otp_win64_24 内含rabbitmq_delayed_message_exchange-3.9.0.ez 插件
ARM版本的 rabbitmq 镜像资源 版本 3.8.9 使用拷贝到主机 执行:docker load < rabbitmq_arm3.8.9.tar 生成docker镜像
rabbitmq 3.9.3 配置文件
tp6使用rabbitmq
RabbitMQ命令行手动创建队列rabbitmqadmin用法 手动创建队列方法:登录http://ip:15672/cli下载 将下载的rabbitmqadmin放到/usr/sbin目录下,并赋予权限:chmod 755 rabbitmqadmin 查看命令帮助信息: python ...
flink-sql集成rabbitmq
理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,...import com.rabbitmq.client.Queuein
rabbitMq window10安装包
以rabbitmq3.6版本为例
RabbitMQ下载安装配置使用指南官方手册
RabbitMQ源码和客户端工具RabbitMQ源码和客户端工具