activemq消息持久化方式(activemq和kafka区别)

队列模式(点对点模式,P2P)特点:

1、客户端包括生产者和消费者;

2、队列中的消息只能被一个消费者消费;

3、消费者可以随时消费队列中的消息;

activemq消息持久化方式(activemq和kafka区别)

Number Of Consumers:表示消费者数量;

Number Of Pending Messages:等待消费的消息,这个是当前未出队列的数量;

Messages Enqueued:进入队列的消息;( 这个数量只增不减,重启后会清零);

Messages Dequeued:出了队列的消息 可以理解为是消费者消费掉的数量 (重启后会清零);

持久化案例代码:

ActiveMQ持久化,生产者产生的数据,在没有被消费者消费时,先保存到数据库中,当数据被消费者消费后,再从数据库中删除。

生产者:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer {    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";    public static final String QUEUE_NAME = "queue02";    public static void main(String[] args) throws JMSException {        //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);        //通过连接工厂获取connection连接 并启动访问        Connection connection = activeMQConnectionFactory.createConnection();        connection.start();        //创建会话session  需要两个参数,第一个事务,第二个签收        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建目的地(选择是队列还是主题)        Queue queue = session.createQueue(QUEUE_NAME);        //创建消息的生产者        MessageProducer messageProducer = session.createProducer(queue);        // 消息持久化        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);        //通过使用消息生产者messageProducer生产3条消息发送到队列中        for (int i = 1; i <= 7; i++) {            //创建消息   一个字符串消息            TextMessage textMessage = session.createTextMessage("msg---->" + i);            //通过messageProducer 发布消息            messageProducer.send(textMessage);        }        //关闭资源        messageProducer.close();        session.close();        connection.close();        System.out.println("消息发送到MQ成功");    }}

代码:messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

消费者:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer {    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";    public static final String QUEUE_NAME="queue02";    public static void main(String[] args) throws JMSException {        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);        //通过连接工厂获取connection连接 并启动访问        Connection connection = activeMQConnectionFactory.createConnection();        connection.setClientID("client-queue02-01");        connection.start();        //创建会话session  需要两个参数,第一个事务,第二个签收        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建目的地(选择是队列还是主题)        Queue queue = session.createQueue(QUEUE_NAME);        //创建消息的消费者        MessageConsumer messageConsumer = session.createConsumer(queue);        while (true){            //从队列中获取消息  receive未设置最大时间 是阻塞的,            TextMessage textMessage = (TextMessage) messageConsumer.receive();            if (textMessage !=null){                System.out.println("消费者接受到消息---->"+textMessage.getText());            }else {                break;            }        }        messageConsumer.close();        session.close();        connection.close();    }}

测试:

1、先运行生产者,ActiveMQProducer

2、查看数据库:

activemq消息持久化方式(activemq和kafka区别)

3、在运行消费者,ActiveMQConsumer,输出:

 INFO | Successfully connected to tcp://192.168.1.17:61616消费者接受到消息---->msg---->1消费者接受到消息---->msg---->2消费者接受到消息---->msg---->3消费者接受到消息---->msg---->4消费者接受到消息---->msg---->5消费者接受到消息---->msg---->6消费者接受到消息---->msg---->7

4、再次查看数据库,消息已删除。

(0)
小多多的头像小多多创始人

相关推荐

发表回复

登录后才能评论