消息队列之activeMQ
阅读原文时间:2023年07月09日阅读:2
  1. 实现高可用、高伸缩、高性能、易用和安全的企业级面向消息服务的系统
  2. 异步消息的消费和处理
  3. 控制消息的消费顺序
  4. 可以和Spring/springBoot整合简化编码
  5. 配置集群容错的MQ集群

下载地址:http://activemq.apache.org/components/classic/download/

这里笔者是下载的linux版的:

因为activeMQ底层是使用java编写的,所以需要安装jdk,这个请移步我之前的博客:

https://www.cnblogs.com/pluto-charon/p/11746636.html

安装activeMq:

# 安装apache
[root@localhost ~]# yum install ttpd
# 下载的apache-activemq并上传到linux的home下,解压
[root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz
# 进入到bin目录下
[root@localhost home]# cd /apache-activemq-5.16.0/bin
# 启动
[root@localhost bin]# ./activemq start
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')

# activemq的默认端口是61616,查看是否启动的三种方式
# 第一种
[root@localhost bin]# ps -ef |grep activemq
# 第二种
[root@localhost bin]# netstat -ano|grep 61616
tcp6       0      0 :::61616                :::*                    LISTEN      off (0.00/0/0)
# 第三种
[root@localhost bin]# lsof -i:61616
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    7517 root  132u  IPv6  39926      0t0  TCP *:61616 (LISTEN)

# 带日志的启动方式
[root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
[root@localhost bin]# cd ..
# 可以看到,启动日志都已经记录到日志里了
[root@localhost apache-activemq-5.16.0]# cat myrunmq.log
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
# 关闭activemq
[root@localhost bin]# ./activemq stop

前台访问的端口是8161,在查看前台时,要关闭linux和windows的防火墙:

# 关闭linux防火墙
[root@localhost apache-activemq-5.16.0]# systemctl stop firewalld

在访问之前,需要修改conf目录下的jetty.xml,将下面的host修改成自己的ip,以及修改用户名和密码。

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="127.0.0.1"/>
    <property name="port" value="8161"/>
</bean>

# 用户名和密码可修改可不修改,默认为admin/admin
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
    <property name="name" value="BASIC" />
    <property name="roles" value="user,admin" />
    <!-- set authenticate=false to disable login -->
    <property name="authenticate" value="true" />
</bean>

修改完成之后重启activemq

[root@localhost bin]# ./activemq restart

查看,地址为192.168.189.150:8161

到这里就说明activemq安装成功了。

JMS(java message service)是一个用于提供消息服务的技术规范,他制定了在整个消息服务提供过程中的所有数据结构和交互流程。当两个程序使用jms进行通信时,他们并不是直接相连的,而是通过一个共同的消息收发服务连接起来的,达到解耦的效果。jms为标准消息协议和消息服务提供了一组通用的接口,包括创建、发送、读取消息等。

1 JMS的优势:

异步:客户端不用发送请求,JMS自动将消息发送给客户端

可靠:JMS保证消息只传递一次

2.JMS的四大组件:

  • JMS provider:实现了jms接口和规范的消息中间件

  • JMS producer:消息生产者,创建和发送JMS消息的客户端应用

  • JMS consumer:消息消费者,接受和处理JMS消息的客户端应用

  • JMS message:由消息头、消息属性、消息体组成

    消息头(在send方法之前,通过setXXX()设置):

    JMSDestination:消息发送的目的地,主要是指Queue(点对点传送模型)和Topic(发布订阅模型)

    JMSDeliverMode:消息是否持久

    JMSExpiration:设置消息过期时间

    JMSPriority:消息优先级,0-4被称为普通消息,5-9是加急消息,默认为4

    JMSMessageID:唯一识别每个消息的标识,由MQ产者或者自己设定

    消息属性:除消息头以外的值,如识别,去重,重点标注等方法,如textMessage.setStringProperty("c1","VIP");

    消息体:

    TextMessage:普通字符串

    MapMessage:map类型,其中key为String类型,而值为java的基本类型

    BytesMessage:二进制数组消息

    StreamMessage:java数据流消息,用个标准流来顺序填充和读取

    ObjectMessage:对象消息,包含一个可序列化的java对象

3.JMS的传送模型:

  • 点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,处理消费掉的和已过期的消息

    点对点消息传送的特性:

    1.每个消息只有一个接收者

    2.消息发送者和接收者没有时间依赖性

    3.当消息发送者发送消息时,无论接收者程序在不在运行,都能发送消息

    4.当接收者收到消息时,会发送确认收到通知

  • 发布订阅消息传递模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和消息订阅。

    发布订阅消息传递的特性:

    1.一个消息可以传递给多个订阅者

    2.发布者和订阅者有时间依赖性

    3.为了缓和严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅

1.引入jar包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.16.0</version>
</dependency>

2.生产者代码

package activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @className: Jmsproducer
 * @description: activemq生产者
 * @author: charon
 * @create: 2020-12-27 22:36
 */
public class JmsProducer {

    /** 声明activemq的地址 */
    private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";

    /** 队列名 */
    private static final String QUEUE_NAME = "queue01";

    /**
     * @param args 参数
     */
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 获得连接
        Connection conn = activeMQConnectionFactory.createConnection();
        conn.start();
        // 创建会话
        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 创建消息
        for (int i = 0; i < 5; i++) {
            // 消息体
            TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】条消息");
            // 消息头
            // textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
            // 消息属性
            // textMessage.setStringProperty("c1","VIP");
            messageProducer.send(textMessage);
        }
        // 关闭资源
        messageProducer.close();
        session.close();
        conn.close();
    }
}

运行代码在浏览器上查看,可以看到queue01里面有5条消息:

  • Number Of Pending Messages:等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数

  • Number Of Consumers:消费者的数量

  • Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减

  • Messages Dequeued:出了队列的消息 可以理解为是消费这消费掉的数量

    package activemq;

    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import java.io.IOException;

    /**

    • @className: JmsConsumer

    • @description: activeMq的消费者

    • @author: charon

    • @create: 2020-12-28 08:10
      / public class JmsConsumer { /* 声明activemq的地址 */
      private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";

      /** 队列名 */
      private static final String QUEUE_NAME = "queue01";

      public static void main(String[] args) throws JMSException, IOException {
      // 创建连接工厂
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
      // 获得连接
      Connection conn = activeMQConnectionFactory.createConnection();
      conn.start();
      // 创建会话
      Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
      // 创建队列
      Queue queue = session.createQueue(QUEUE_NAME);
      // 创建消息的生产者
      MessageConsumer messageConsumer = session.createConsumer(queue);
      // 同步方式,生产环境并不适用,这种方式将阻塞知道获得并返回第一条消息
      // while (true){
      // TextMessage textMessage =(TextMessage) messageConsumer.receive();
      // if(null!=textMessage){
      // System.out.println("---消费者收到消息:"+textMessage.getText());
      // }else{
      // break;
      // }
      // }

      // 异步方式,创建监听,在又消息到达时,调用listener的onMessage方法,
      messageConsumer.setMessageListener(new MessageListener() {
          @Override
          public void onMessage(Message message) {
              if(message != null && message instanceof TextMessage){
                  TextMessage textMessage = (TextMessage) message;
                  System.out.println("--消费者接受到消息:"+textMessage);
              }
          }
      });
      
      System.in.read();
      // 关闭资源
      messageConsumer.close();
      session.close();
      conn.close();

      }
      }

运行消费者的代码,应该我上面生产者的代码运行了两次,所以消息有10条。

在这里,笔者使用的基于Zookeeper+levelDb搭建的activeMq集群,为了避免单点故障,使用一主两从的架构。使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它被视为master,也就是说如果master因为故障而不能提供服务,Zookeeper会从SLave中选举出一个Broker充当master。

我这边的zookeeper集群已经搭建好了,150和151是follower,152是leader。

# 每台服务器上安装activeMq,同时在集群环境下,activemq的jetty.xml文件重的host要改成0.0.0.0
# 修改activeMq.xml,注释掉kahadb这个配置,actviemq默认的是kahadb,并且添加leveldb
[root@localhost conf]# vi activemq.xml
<!--        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter> -->
<persistenceAdapter>
   <replicatedLevelDB
      directory="${activemq.data}/leveldb"
      replicas="3"
      <!--实例间的通信地址-->
      bind="tcp://0.0.0.0:62222"
      <!--zookeeper的地址-->
      zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
      <!--修改为每个服务器的节点的ip-->
      hostname="192.168.189.152"
      sync="local_disk"
      zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
# 启动三个节点的activemq
[root@localhost bin]# ./activemq restart

# 查看 连接zookeeper客户端
[root@localhost bin]# zkCli.sh
[zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
[00000000022, 00000000020, 00000000021]
# 访问
[zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
{"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
[zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
[zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}

从上面可以看到,只有00000000020这个几点的elected里面有值,表明它被选举为master节点了。

在浏览器上依次访问:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

只有192.168.189.150:8161可以访问成功,因为只有master节点可以对外提供访问,所以只有一个节点能访问到,那么它就是master节点。

第二种查看的方式:

查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即为master节点,SlaveLevelDBStore即为slave节点。

第三种查看的方式为使用zookeeper的可视化工具。

由于activeMq集群是基于zookeeper集群实现的,所以要注意一下三点:

  1. activeMQ的客户端只能访问master的Broker,其它处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议
  2. 当一个activeMQ节点挂掉或者一个Zookeeper节点挂掉,activeMQ服务正常运转,但是如果仅剩一个activeMQ节点,由于不能选举Master,所以activeMQ不能正常运行;(一个就不成集群了)
  3. 同理,如果Zookeeper仅剩一个节点是活动的,不管activeMQ是都存活或者说不管activeMQ个节点是否存活,activeMQ不能正常提供服务,必须依赖于Zookeeper集群服务。

集群的代码和上面单机的代码大致是一直的,就只需要修改一个activemq的地址。

 /** 声明集群中activemq的地址,使用failover协议,随机 */
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";

1.消息发送方式

默认情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升,所以在发送持久化消息时,请开启事务模式。

2.储存机制

在通常情况下,非持久化的消息时存储在内存中的,持久化消息时存储在文件中的,他们的最大限制在配置文件中的节点配置的,但是在非持久化消息堆积到一定程度(内存告急)时,actviemq会将内存中的非持久化消息写入临时文件中,以腾出内存。但是它和持久化消息的区别在于,重启后持久化消息会从文件中恢复,非持久化消息的临时文件会删除。

所以尽量不要用非持久化文件,如果非要用的化,可以将临时文件的限制调大。同时,非持久化的消息要及时处理,不要堆积,或者启动事务。启动事务后,commit()会等待服务器的消息返回,也不会导致消息丢失了。

3.死信队列

一条消息在被重发多次后(默认是6次),将会被ActiveMQ移入死信队列;说白了就是异常消息的归并处理的集合,主要是处理失败的消息。可以在activeMQ.DLQ这个队列中查看。

4.重复消息,幂等性调用

在网络延迟的情况洗啊,可能会造成MQ重试,可能会造成重复消费。如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,因为唯一主键,会造成主键冲突,避免数据库出现脏数据。如果是第三方消费,可以在每条数据里面加一个全局唯一的id,如果消息消费了,就将消息存在redis中,在消费消息之前将id到redis中查询一下,判断是否消费过,如果没有消费过,就处理,如果消费过了,就不处理了。

参考网址:

https://blog.csdn.net/weixin_34122548/article/details/91929810?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242