RabbitMq的安装与使用

  • 前言
    最近需求涉及到了异步以及应用解耦,原先系统都是采用redis去实现这两种功能,但是这种方法还是容易出现问题,由于没有ack回应机制,一旦redis挂了很容易造成消息丢失,从而影响业务的处理。所以这里我们就需要引入mq(消息队列)来实现,mq其实就是个消息中间件,用来对消息的存储和转发,目前业内较流行的消息中间件有:RabbitMQ、RocketMQ、Kafka。由于这边需求并不需要对mq进行集群和顺序消息,所以就选择了较稳定较简单的RabbitMQ来实现。
  • RabbitMQ的安装
    这里我采用docker-compose来安装,方便启动和关闭,也不影响原来系统环境,docker-compose.yaml文件如下:

    version: '3'
    services:
    rabbitmq:
    container_name: local-rabbitmq
    image: rabbitmq:management
    restart: always
    hostname: duoliangRabbitMq
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - $PWD/data:/var/lib/rabbitmq
      - $PWD/log:/var/log/rabbitmq/log
      - $PWD/hosts:/etc/hosts
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: rooot
      RABBITMQ_DEFAULT_PASS: root

    从上面文件我们可以看出我们映射了两个端口,以及挂载了两个文件目录,所以这里要确认好端口不冲突,以及目录已创建。
    接着执行docker-compose up -d 即可启动rabbitmq服务了。
    启动后我们可以在浏览器输入 ip:15672 来访问mq后台,用户密码为上面环境配置变量配置的,登录后就可以看到此界面了

    注意想要访问后台需要把上面端口加入安全组,不然应该访问不了被拦截。
    这样我们就可以在该后台看到我们新建队列,交换机等等,以及还能看到发布的消息以及消费者消费监听的连接。
    至此我们的rabbitmq服务就算成功安装了。

  • rabbitmq的连接和使用
    我这边是使用php语言来开发的,所以以php为例子,首先我们需要引入一个包

    compose require php-amqplib/php-amqplib -vvv

    拉下包之后,我们就可以创建连接connection,发布消息publish,消费消息consume,这里我们可以写个抽象类将这些操作封装起来,下面我将写个小demo来更展示:
    1.先写个类AbstractRabbitMQ,如下:

    abstract class AbstractRabbitMQ
    {
    /**
     * @var AMQPStreamConnection
     */
    protected $connection;
    
    /**
     * @var \PhpAmqpLib\Channel\AMQPChannel
     */
    protected $channel;
    
    /**
     * @var bool
     */
    protected $autoAck = true;
    
    /**
     * @var array
     */
    protected $arguments = [];
    
    /**
     * @var
     */
    protected $prefetchSize = null;
    
    /**
     * @var
     */
    protected $prefetchCount = 1;
    /**
     * 创建连接
     * AbstractRabbitMQ constructor.
     * @param array $configs
     */
    public function __construct($configs = [])
    {
        $config = [];
        if (empty($configs)) {
            $configs = ['host'=>'ip','port' => 5672, 'user' => 'root', 'pass' => 'root', 'vhost' => '/'];
        }
    
        foreach ($configs as $key => $value) {
            $config[$key] = $value;
        }
        try {
            if (!($this->connection instanceof AMQPStreamConnection)) {
                // 创建链接
                $this->connection = new AMQPStreamConnection(
                    $config['host'],
                    $config['port'],
                    $config['user'],
                    $config['password'],
                    $config['vhost']
                );
                $this->channel    = $this->connection->channel();
            }
        } catch (\Exception $e) {
            throw new RabbitMQException($e->getMessage());
        }
    
    }
    
        /** 绑定队列和交换机
     * @param $exchangeName
     * @param $queueName
     * @param $routeKey
     * @param string $exchangeType
     * @return $this
     */
    public function createRelation($exchangeName, $queueName, $routeKey, $exchangeType)
    {
        $this->channel->exchange_declare(
            $exchangeName,
            $exchangeType,
            false,
            true,
            false,
            false,
            false,
            empty($this->arguments['exchange']) ? [] : $this->arguments['exchange']
        );
    
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            empty($this->arguments['queue']) ? [] : $this->arguments['queue']
        );
    
        $this->channel->queue_bind($queueName, $exchangeName, $routeKey);
    }
    
     /**发布消息
     * @param $exchangeName 交换机名称
     * @param $queueName 队列名称
     * @param $routeKey 路由键
     * @param string $exchangeType 交换机传播类型
     * @param $data 数据
     * @throws \Exception
     */
    public function publish($exchangeName, $queueName, $routeKey, $exchangeType = AMQPExchangeType::DIRECT, $data)
    {
        $this->createRelation($exchangeName, $queueName, $routeKey, $exchangeType);
    
        $properties = [
            'content_type'  => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        ];
    
        $msg = new AMQPMessage($data, $properties);
    
        $this->channel->basic_publish($msg, $exchangeName, $routeKey);
    
        $this->closeConnection();
    }
    
    /**消费消息
     * @param $exchangeName
     * @param $queueName
     * @param $routeKey
     * @param string $exchangeType
     * @param bool $flag
     */
    public function consume($exchangeName, $queueName, $routeKey, $exchangeType = AMQPExchangeType::DIRECT, $flag = false)
    {
        $this->createRelation($exchangeName, $queueName, $routeKey, $exchangeType);
    
        $callback = function ($msg) {
            $this->get($msg);
        };
    
        $this->autoAck = $flag;
    
        $this->channel->basic_qos($this->prefetchSize, $this->prefetchCount, null);
    
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            $this->autoAck,
            false,
            false,
            $callback
        );
    }
    
    /**
     * @throws \ErrorException
     */
    public function monitor()
    {
        register_shutdown_function(function () {
            $this->closeConnection();
        });
    
        // 监听消息
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    /**
     * @throws \Exception
     */
    protected function closeConnection()
    {
        $this->channel->close();
        $this->connection->close();
    }
        /**获取消息数据
     * @param $msg
     */
    protected function get($msg)
    {
        $param = $msg->body;
        try {
            $this->doProcess($param);
    
            if (!$this->autoAck) {
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        } catch (\Exception $exception) {
            throw new RabbitMQException($exception->getMessage());
        }
    }
    // 消息数据处理钩子
    abstract public function doProcess($param);
    }

    上面代码已帮我们封装好了操作,重点需要关注的是get()函数,以及doProcess抽象函数,我们需要写个类去继承AbstractRabbitMQ类,然后去根据你的业务需求去编写,如doProcess函数一般就是对消息的消费处理,get()函数顾名思义可以get到消息,拿到消息内容再进行doProcess,这个函数你可以重写,也可以直接用。
    2.根据自己的业务需求,我们可以写个类去继承上面的抽象类,编写doProess方法,对队列消息数据进行处理

    class Amqp extends AbstractRabbitMQ
    {
    public function doProcess($param)
    {
        日志记录($param)
    }
    }

    3.接着我们可以开始发布消息

    $amqp = new Amqp(); // 没有传配置信息,则用默认的
    // 发布消息
    $amqp->publish('exchage_name','quene_name','route_key', 'direct', 'hello world');

    4.上面已经在交换机exchage_name上创建了一个队列,像队列里面插入了一个消息(hello world),我们也可以登录mq后台去看看是否创建成功,接着我们需要写个后台进程去监听队列,对消息进行消费

    $amqp = amqp();
    $amqp->consume('exchage_name','quene_name','route_key', 'direct',false);
    $amqp->monitor(); // 监听消息

    执行上面代码后,我们会发现后台的的队列消息已经被消费,且日志记录了hello world,证明消息已被消费。

总结:到这里,整个消息的生产消费流程就结束了,相信大家到这里应该对rabbitmq的安装和使用有了一定的了解,最先开始接触时,觉得这个东西有点难理解也不知道怎么用,但是我们要动起手来,在实践中,我们更容易接受和学习新的东西,希望大家也动起手来学习新东西,这次的分享就到这里,下次有相关新的东西再回来补充下~ 。

发表评论

电子邮件地址不会被公开。 必填项已用*标注