博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式、消费者(接收方)自行确认模式、生产者(发送方)确认模式)...
阅读量:4955 次
发布时间:2019-06-12

本文共 29422 字,大约阅读时间需要 98 分钟。

准备工作:

1)安装RabbitMQ,参考文章:

2.)分别新建名为OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程

在pom.xml文件里面引入如下依赖:

com.rabbitmq
amqp-client
5.0.0

说明:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本

一、消费者(接收方)自动确认模式

 前面有谈到消费者收到的每一条消息都必须进行确认,消息的确认机制分为自动确认和消费者自行确认。下面我们来看一下自动确认的示例:

示例1:交换器是direct

1. 在工程OriginalRabbitMQProducer新建一个一个direct的生产者

package study.demo.normal;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 交换器是direct的生产者 路由键完全匹配时,消息才投放到对应队列 * @author leeSmall * @date 2018年9月15日 * */public class DirectProducer {    //定义交换器的名字    private final static String EXCHANGE_NAME = "direct_logs";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();                //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //设置用户名 这里使用缺省的        //factory.setUsername(..);                //设置连接断开  这里使用缺省的        //factory.setPort();                //设置虚拟主机 这里使用缺省的        //factory.setVirtualHost();        //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();        //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();        //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //定义一组路由键        String[]routingKeys = {"error","info","warning"};        //发布消息的交换器上        for(int i=0;i<3;i++){            //路由键            String routingKey = routingKeys[i];                        //要发送的消息            String message = "Hello world_"+(i+1);            /**             * 发送消息到交换器上             * 参数1:交换器的名字             * 参数2:路由键             * 参数3:BasicProperties             * 参数4:要发送的消息             */            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());            System.out.println("Sent "+routingKey+":"+message);        }        channel.close();        connection.close();    }}

2. 在工程OriginalRabbitMQConsumer新建一个direct的只消费error日志的消费者

package study.demo.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 交换器是direct 只消费error日志的消费者 * @author leeSmall * @date 2018年9月15日 * */public class ConsumerError {    //定义交换器的名字    private static final String EXCHANGE_NAME = "direct_logs";    // private static final String EXCHANGE_NAME = "fanout_logs_1";    public static void main(String[] argv) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);                //5.声明随机队列        String queueName = channel.queueDeclare().getQueue();                //6.声明一个只消费错误日志的路由键error        String routingKey = "error";                //7.队列通过路由键绑定到交换器上        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);                System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);            }        };     //9.自动确认:autoAck参数为true        channel.basicConsume(queueName,true,consumerB);    }}

3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果

启动消费者ConsumerError:

启动生产者DirectProducer:

 

查看消费者ConsumerError现在的状况:

 

可以看到消费者只消费了error级别的消息,这是因为在direct模式下,消费者只定义了路由键routingKey = "error";

 4. 在工程OriginalRabbitMQConsumer新建一个direct的消费所有日志的消费者

package study.demo.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 交换器是direct 消费所有日志的消费者 * @author leeSmall * @date 2018年9月15日 * */public class ConsumerAll {    //定义交换器的名字    private static final String EXCHANGE_NAME = "direct_logs";    // private static final String EXCHANGE_NAME = "fanout_logs_1";    public static void main(String[] argv) throws IOException,            InterruptedException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);                        //5.声明随机队列        String queueName = channel.queueDeclare().getQueue();                //6.定义一组路由键消费所有日志        String[]routingKeys = {"error","info","warning"};                //7.队列通过路由键绑定到交换器上        for(String routingKey:routingKeys){            //队列和交换器的绑定            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);        }        System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerA = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);            }        };         //9.自动确认:autoAck参数为true         channel.basicConsume(queueName,true,consumerA);    }  }

5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果

启动消费者ConsumerAll:

启动生产者DirectProducer:

 查看消费者ConsumerAll的状态:

 可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在direct模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};

示例2:交换器是fanout

1. 在工程OriginalRabbitMQProducer新建一个交换器是fanout的生产者

package study.demo.normal;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 交换器是fanout的生产者 可以理解为广播,会把所有消息投放到绑定到这个交换器上的队列上 * @author leeSmall * @date 2018年9月15日 * */public class FanoutProducer {    private final static String EXCHANGE_NAME = "fanout_logs_1";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();                //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //设置用户名 这里使用缺省的        //factory.setUsername(..);                //设置连接断开  这里使用缺省的        //factory.setPort();                //设置虚拟主机 这里使用缺省的        //factory.setVirtualHost();        //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();        //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();        //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);        //定义一组路由键        String[]routingKeys = {"error","info","warning"};        //发布消息的交换器上        for(int i=0;i<3;i++){            //路由键            String routingKey = routingKeys[i];                        //要发送的消息            String message = "Hello world_"+(i+1);            /**             * 发送消息到交换器上             * 参数1:交换器的名字             * 参数2:路由键             * 参数3:BasicProperties             * 参数4:要发送的消息             */            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());            System.out.println("Sent "+routingKey+":"+message);        }        channel.close();        connection.close();    }}

2. 在工程OriginalRabbitMQConsumer新建一个fanout的只消费error日志的消费者

package study.demo.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 交换器是fanout,只消费error日志的消费者 * @author leeSmall * @date 2018年9月15日 * */public class ConsumerError {    //定义交换器的名字    //private static final String EXCHANGE_NAME = "direct_logs";     private static final String EXCHANGE_NAME = "fanout_logs_1";    public static void main(String[] argv) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);                //5.声明随机队列        String queueName = channel.queueDeclare().getQueue();                //6.声明一个只消费错误日志的路由键error        String routingKey = "error";                //7.队列通过路由键绑定到交换器上        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);                System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);            }        };      //9.自动确认:autoAck参数为true     channel.basicConsume(queueName,true,consumerA);    }}

3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果

启动消费者ConsumerError:

启动生产者FanoutProducer:

 

查看消费者ConsumerError现在的状况:

 

可以看到消费者消费了所有级别的消息,这是因为在fanout模式下,虽然消费者只定义了路由键routingKey = "error",但是因为fanut是广播模式,会把所有消息投放到绑定到这个交换器上的队列上

 4. 在工程OriginalRabbitMQConsumer新建一个fanout的消费所有日志的消费者

package study.demo.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 交换器是fanout 消费所有日志的消费者 * @author leeSmall * @date 2018年9月15日 * */public class ConsumerAll {    //定义交换器的名字    //private static final String EXCHANGE_NAME = "direct_logs";    private static final String EXCHANGE_NAME = "fanout_logs_1";    public static void main(String[] argv) throws IOException,            InterruptedException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);                        //5.声明随机队列        String queueName = channel.queueDeclare().getQueue();                //6.定义一组路由键消费所有日志        String[]routingKeys = {"error","info","warning"};                //7.队列通过路由键绑定到交换器上        for(String routingKey:routingKeys){            //队列和交换器的绑定            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);        }        System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerA = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);            }        };       //9.自动确认:autoAck参数为true       channel.basicConsume(queueName,true,consumerA);    }}

5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果

启动消费者ConsumerAll:

启动生产者FanoutProducer:

 查看消费者ConsumerAll的状态:

 可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在fanout模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};

三、消费者(接收方)自行确认模式

1. 在工程OriginalRabbitMQProducer新建一个消费者自行确认生产者

package study.demo.consumerconfirm;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 消费者自行确认生产者 * @author leeSmall * @date 2018年9月15日 * */public class ConsumerConfirmProducer {    //交换器    private final static String EXCHANGE_NAME = "direct_cc_confirm_1";        //路由键    private final static String ROUTE_KEY = "error";    public static void main(String[] args) throws IOException, TimeoutException,            InterruptedException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();                //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //设置用户名 这里使用缺省的        //factory.setUsername(..);                //设置连接断开  这里使用缺省的        //factory.setPort();                //设置虚拟主机 这里使用缺省的        //factory.setVirtualHost();        //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();        //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();        //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //发布消息的交换器上        for(int i=0;i<10;i++){            //要发送的消息            String message = "Hello world_"+(i+1);            /**             * 发送消息到交换器上             * 参数1:交换器的名字             * 参数2:路由键             * 参数3:BasicProperties             * 参数4:要发送的消息             */            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes());            System.out.println("Sent "+ROUTE_KEY+":"+message);        }        channel.close();        connection.close();    }}

2. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认消费者

package study.demo.consumerconfirm;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 消费者自行确认消费者 * @author leeSmall * @date 2018年9月15日 * */public class ClientConsumerAck {    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";    public static void main(String[] argv) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //5.声明队列        String queueName = "consumer_confirm";        channel.queueDeclare(queueName,false,false,                false,null);        //6.声明一个只消费错误日志的路由键error        String routingKey = "error";                //7.队列通过路由键绑定到交换器上        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);        System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);                //消费者自行确认                this.getChannel().basicAck(envelope.getDeliveryTag(),false);            }        };        //9.消费者自行确认:autoAck参数为false        channel.basicConsume(queueName,false,consumerB);    }} 

3. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认休眠不回复ack的消费者

package study.demo.consumerconfirm;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 消费者自行确认休眠不回复ack的消费者 * @author leeSmall * @date 2018年9月15日 * */public class ClientConsumerSlowAck {    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";    public static void main(String[] argv) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //5.声明队列        String queueName = "consumer_confirm";        channel.queueDeclare(queueName,false,false,                false,null);        //6.声明一个只消费错误日志的路由键error        String routingKey = "error";                //7.队列通过路由键绑定到交换器上        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);        System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                try {                    //消费者自行确认时不回复ack,一直休眠                    Thread.sleep(20000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);                //this.getChannel().basicAck(envelope.getDeliveryTag(),false);            }        };        //9.消费者自行确认:autoAck参数为false        channel.basicConsume(queueName,false,consumerB);    }} 

4. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态

启动消费者自行确认消费者ClientConsumerAck:

启动消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck

 

启动消费者自行确认生产者ConsumerConfirmProducer

 

查看消费者自行确认消费者ClientConsumerAck的状态:

查看消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck的状态:

查看RabbitMQ服务器上的队列情况:

可以看到队列consumer_confirm里面有5条消息未消费,这是因为消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck收到了这5条消息,但是没有向RabbitMQ服务器发送确认消息,RabbitMQMQ认为这5条消息还没有被消费就一直存在队列里面

下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服务器里面队列consumer_confirm的状态

 

可以看到停掉ClientConsumerSlowAck以后,之前的5条消息被ClientConsumerAck消费了

5. 在工程OriginalRabbitMQConsumer 新建一个消费者自行确认拒绝消息的消费者

package study.demo.consumerconfirm;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 消费者自行确认拒绝消息的消费者 * @author leeSmall * @date 2018年9月15日 * */public class ClientConsumerReject {    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";    public static void main(String[] argv) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //5.声明队列        String queueName = "consumer_confirm";        channel.queueDeclare(queueName,false,false,                false,null);        //6.声明一个只消费错误日志的路由键error        String routingKey = "error";                //7.队列通过路由键绑定到交换器上        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);        System.out.println("Waiting message.......");        //8.设置一个监听器监听消费消息        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                //消费者自行拒绝消息 参数requeue=true,让RabbitMQ服务器重新分发消息,requeue=false让RabbitMQ服务器移除消息                this.getChannel().basicReject(envelope.getDeliveryTag(),true);                System.out.println("Reject:"+envelope.getRoutingKey()                        +":"+new String(body,"UTF-8"));            }        };        //9.消费者自行确认:autoAck参数为false        channel.basicConsume(queueName,false,consumerB);    }} 

 6. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认拒绝消息的消费者ClientConsumerReject,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态

 启动消费者自行确认消费者ClientConsumerAck:

 

 启动消费者自行确认拒绝消息的消费者ClientConsumerReject:

 

 

启动消费者自行确认生产者ConsumerConfirmProducer

 

 

 查看消费者自行确认消费者ClientConsumerAck的状态:

查看消费者自行确认拒绝消息的消费者ClientConsumerReject的状态:

可以看到消息都被ClientConsumerAck消费了,这是因为消费者ClientConsumerReject拒绝了所有消息,这里要注意

this.getChannel().basicReject(envelope.getDeliveryTag(),true);

 

这段代码的basicReject的第二个参数requeue,参数requeue=true,让RabbitMQ服务器重新分发消息,requeue=false让RabbitMQ服务器移除消息

 requeue=false时,RabbitMQ服务器会删掉被ClientConsumerReject拒绝的消息,消费者ClientConsumerAck就不能消费所有消息了

四、生产者(发送方)确认模式

 为什么要有个发送方确认模式?

生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。

AMQP协议层面为我们提供的事务机制解决了这个问题,但是事务机制本身也会带来问题:
1)严重的性能问题
2)使生产者应用程序产生同步
RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。

1. 在OriginalRabbitMQProducer工程新建一个生产者(发送方)确认同步模式的类

package study.demo.producerconfirm;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** *  * @Description: 生产者(发送方)确认同步模式 * @author leeSmall * @date 2018年9月16日 * */public class ProducerConfirm {    private final static String EXCHANGE_NAME = "producer_confirm";    private final static String ROUTE_KEY = "error";    public static void main(String[] args) throws IOException, TimeoutException,            InterruptedException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();                //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //设置用户名 这里使用缺省的        //factory.setUsername(..);                //设置连接断开  这里使用缺省的        //factory.setPort();                //设置虚拟主机 这里使用缺省的        //factory.setVirtualHost();        //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();        //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();        //将信道设置为发送方确认        channel.confirmSelect();        //发布消息的交换器上        for(int i=0;i<2;i++){            String msg = "Hello "+(i+1);            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());            //等待RabbitMQ返回消息确认消息已送达RabbitMQ服务器            if (channel.waitForConfirms()){                System.out.println("发送方同步确认: "+ROUTE_KEY+":"+msg);            }        }        // 关闭频道和连接        channel.close();        connection.close();    }}

2. 在OriginalRabbitMQProducer工程新建一个生产者(发送方)确认异步模式的类

package study.demo.producerconfirm;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 生产者(发送方)确认异步模式 * @author leeSmall * @date 2018年9月16日 * */public class ProducerConfirmAsync {    private final static String EXCHANGE_NAME = "producer_confirm";    public static void main(String[] args) throws IOException, TimeoutException,            InterruptedException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();                //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //设置用户名 这里使用缺省的        //factory.setUsername(..);                //设置连接断开  这里使用缺省的        //factory.setPort();                //设置虚拟主机 这里使用缺省的        //factory.setVirtualHost();        //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();        //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();        //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //将信道设置为发送方确认        channel.confirmSelect();        //信道被关闭监听器 用于RabbitMQ服务器断线重连        //channel.addShutdownListener();        /**         * 生产者异步确认监听         * 参数deliveryTag代表了当前channel唯一的投递         * 参数multiple:false         *          */        channel.addConfirmListener(new ConfirmListener() {            //RabbitMQ服务器确认收到消息            public void handleAck(long deliveryTag, boolean multiple)                    throws IOException {                System.out.println("RabbitMQ服务器确认收到消息Ack deliveryTag="+deliveryTag                        +"multiple:"+multiple);            }            //RabbitMQ服务器由于自己内部出现故障没有收到消息            public void handleNack(long deliveryTag, boolean multiple)                    throws IOException {                System.out.println("RabbitMQ服务没有收到消息Ack deliveryTag="+deliveryTag                        +"multiple:"+multiple);            }        });        //生产者异步返回监听 这里和发布消息时的mandatory参数有关        //参数mandatory:mandatory=true,投递消息时无法找到一个合适的队列,把消息返回给生产者,mandatory=false 丢弃消息(缺省)        channel.addReturnListener(new ReturnListener() {            public void handleReturn(int replyCode, String replyText,                                     String exchange, String routingKey,                                     AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String msg = new String(body);                System.out.println("replyText:"+replyText);                System.out.println("exchange:"+exchange);                System.out.println("routingKey:"+routingKey);                System.out.println("msg:"+msg);            }        });        //声明一组路由键        String[] routingKeys={"error","info","warning"};        //发送消息到交换器上        for(int i=0;i<3;i++){            String routingKey = routingKeys[i%3];            // 发送的消息            String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());                        //通过路由键把消息发送到交换器上            //参数mandatory: mandatory=true,投递消息时无法找到一个合适的队列,把消息返回给生产者,            //              mandatory=false 丢弃消息(缺省)            channel.basicPublish(EXCHANGE_NAME, routingKey, false,                    null, message.getBytes());            System.out.println("----------------------------------------------------");            System.out.println(" Sent Message: [" + routingKey +"]:'"+ message + "'");            //sleep一下让程序不快速结束 可以看到RabbitMQ服务器的响应            Thread.sleep(1000);        }        // 关闭信道和连接        channel.close();        connection.close();    }}

3. 在OriginalRabbitMQConsumer工程新建一个发送方确认消费者

package study.demo.producerconfirm;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** *  * @Description: 发送方确认消费者 * @author leeSmall * @date 2018年9月16日 * */public class ProducerConfirmConsumer {    private static final String EXCHANGE_NAME = "producer_confirm";    public static void main(String[] argv) throws IOException, TimeoutException {        //1.创建一个连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置要连接的RabbitMQ服务器的地址        factory.setHost("127.0.0.1");                //2.通过连接工厂创建一个连接        Connection connection = factory.newConnection();                //3.通过连接创建一个信道 信道是用来传送数据的        Channel channel = connection.createChannel();                //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //5.声明队列        String queueName = "producer_confirm";        channel.queueDeclare(queueName,false,false,                false,null);        //6.声明一个只消费错误日志的路由键error        String routingKey = "error";                //7.队列通过路由键绑定到交换器上        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);        System.out.println("Waiting message.......");        // 8.创建队列消费者 设置一个监听器监听消费消息         final Consumer consumerB = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);            }        };        //9.消费者自动确认:autoAck参数为true        channel.basicConsume(queueName, true, consumerB);    }}

4. 启动发送方确认消费者ProducerConfirmConsumer,再分别启动生产者(发送方)确认同步模式的类ProducerConfirm和生产者(发送方)确认异步模式ProducerConfirmAsync

启动发送方确认消费者ProducerConfirmConsumer:

 启动生产者(发送方)确认同步模式的类ProducerConfirm:

 

查看发送方确认消费者ProducerConfirmConsumer的状态:

 

 启动生产者(发送方)确认异步模式ProducerConfirmAsync:

 

查看发送方确认消费者ProducerConfirmConsumer的状态:

 

 

转载于:https://www.cnblogs.com/leeSmall/p/9653219.html

你可能感兴趣的文章
ubuntu12.04 串口登录系统配置
查看>>
poj3061
查看>>
linux--多进程进行文件拷贝
查看>>
笔记:git基本操作
查看>>
HDU2255 奔小康赚大钱 (最大权完美匹配) 模板题【KM算法】
查看>>
CodeForces 510C Fox And Names (拓扑排序)
查看>>
1231作业
查看>>
【Docker】mesos如何修改hostport默认端口范围?
查看>>
子shell以及什么时候进入子shell
查看>>
Linux系统下Memcached的安装以及自启动
查看>>
P4实验问题 解决python模块导入
查看>>
函数基础
查看>>
linux安装配置阿里云的yum源和python3
查看>>
spring-boot2.x Application properties属性配置
查看>>
C/C++中const关键字 2014-03-20 15:30 401人阅读 评论(0) 收藏...
查看>>
grep的用法
查看>>
stm32-浅谈IIC
查看>>
程序输入幸运数
查看>>
整数展示分数和整形数的四则运算
查看>>
写入数据java将数据写入到csv文件
查看>>