- 前言
最近需求涉及到了异步以及应用解耦,原先系统都是采用redis去实现这两种功能,但是这种方法还是容易出现问题,由于没有ack回应机制,一旦redis挂了很容易造成消息丢失,从而影响业务的处理。所以这里我们就需要引入mq(消息队列)来实现,mq其实就是个消息中间件,用来对消息的存储和转发,目前业内较流行的消息中间件有:RabbitMQ、RocketMQ、Kafka。由于这边需求并不需要对mq进行集群和顺序消息,所以就选择了较稳定较简单的RabbitMQ来实现。 - RabbitMQ的安装
这里我采用docker-compose来安装,方便启动和关闭,也不影响原来系统环境,docker-compose.yaml文件如下:version: '3' services: rabbitmq: container_name: local-rabbitmq image: rabbitmq:management restart: always hostname: duoliangRabbitMq ports: - "5672:5672" - "15672:15672" volumes: - $PWD/data:/var/lib/rabbitmq - $PWD/log:/var/log/rabbitmq/log - $PWD/hosts:/etc/hosts environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: rooot RABBITMQ_DEFAULT_PASS: root
从上面文件我们可以看出我们映射了两个端口,以及挂载了两个文件目录,所以这里要确认好端口不冲突,以及目录已创建。
接着执行
即可启动rabbitmq服务了。docker-compose up -d
启动后我们可以在浏览器输入 ip:15672 来访问mq后台,用户密码为上面环境配置变量配置的,登录后就可以看到此界面了
注意想要访问后台需要把上面端口加入安全组,不然应该访问不了被拦截。
这样我们就可以在该后台看到我们新建队列,交换机等等,以及还能看到发布的消息以及消费者消费监听的连接。
至此我们的rabbitmq服务就算成功安装了。 -
rabbitmq的连接和使用
我这边是使用php语言来开发的,所以以php为例子,首先我们需要引入一个包compose require php-amqplib/php-amqplib -vvv
拉下包之后,我们就可以创建连接connection,发布消息publish,消费消息consume,这里我们可以写个抽象类将这些操作封装起来,下面我将写个小demo来更展示:
1.先写个类AbstractRabbitMQ,如下:abstract class AbstractRabbitMQ { /** * @var AMQPStreamConnection */ protected $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ protected $channel; /** * @var bool */ protected $autoAck = true; /** * @var array */ protected $arguments = []; /** * @var */ protected $prefetchSize = null; /** * @var */ protected $prefetchCount = 1; /** * 创建连接 * AbstractRabbitMQ constructor. * @param array $configs */ public function __construct($configs = []) { $config = []; if (empty($configs)) { $configs = ['host'=>'ip','port' => 5672, 'user' => 'root', 'pass' => 'root', 'vhost' => '/']; } foreach ($configs as $key => $value) { $config[$key] = $value; } try { if (!($this->connection instanceof AMQPStreamConnection)) { // 创建链接 $this->connection = new AMQPStreamConnection( $config['host'], $config['port'], $config['user'], $config['password'], $config['vhost'] ); $this->channel = $this->connection->channel(); } } catch (\Exception $e) { throw new RabbitMQException($e->getMessage()); } } /** 绑定队列和交换机 * @param $exchangeName * @param $queueName * @param $routeKey * @param string $exchangeType * @return $this */ public function createRelation($exchangeName, $queueName, $routeKey, $exchangeType) { $this->channel->exchange_declare( $exchangeName, $exchangeType, false, true, false, false, false, empty($this->arguments['exchange']) ? [] : $this->arguments['exchange'] ); $this->channel->queue_declare( $queueName, false, true, false, false, false, empty($this->arguments['queue']) ? [] : $this->arguments['queue'] ); $this->channel->queue_bind($queueName, $exchangeName, $routeKey); } /**发布消息 * @param $exchangeName 交换机名称 * @param $queueName 队列名称 * @param $routeKey 路由键 * @param string $exchangeType 交换机传播类型 * @param $data 数据 * @throws \Exception */ public function publish($exchangeName, $queueName, $routeKey, $exchangeType = AMQPExchangeType::DIRECT, $data) { $this->createRelation($exchangeName, $queueName, $routeKey, $exchangeType); $properties = [ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; $msg = new AMQPMessage($data, $properties); $this->channel->basic_publish($msg, $exchangeName, $routeKey); $this->closeConnection(); } /**消费消息 * @param $exchangeName * @param $queueName * @param $routeKey * @param string $exchangeType * @param bool $flag */ public function consume($exchangeName, $queueName, $routeKey, $exchangeType = AMQPExchangeType::DIRECT, $flag = false) { $this->createRelation($exchangeName, $queueName, $routeKey, $exchangeType); $callback = function ($msg) { $this->get($msg); }; $this->autoAck = $flag; $this->channel->basic_qos($this->prefetchSize, $this->prefetchCount, null); $this->channel->basic_consume( $queueName, '', false, $this->autoAck, false, false, $callback ); } /** * @throws \ErrorException */ public function monitor() { register_shutdown_function(function () { $this->closeConnection(); }); // 监听消息 while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ protected function closeConnection() { $this->channel->close(); $this->connection->close(); } /**获取消息数据 * @param $msg */ protected function get($msg) { $param = $msg->body; try { $this->doProcess($param); if (!$this->autoAck) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } } catch (\Exception $exception) { throw new RabbitMQException($exception->getMessage()); } } // 消息数据处理钩子 abstract public function doProcess($param); }
上面代码已帮我们封装好了操作,重点需要关注的是get()函数,以及doProcess抽象函数,我们需要写个类去继承AbstractRabbitMQ类,然后去根据你的业务需求去编写,如doProcess函数一般就是对消息的消费处理,get()函数顾名思义可以get到消息,拿到消息内容再进行doProcess,这个函数你可以重写,也可以直接用。
2.根据自己的业务需求,我们可以写个类去继承上面的抽象类,编写doProess方法,对队列消息数据进行处理class Amqp extends AbstractRabbitMQ { public function doProcess($param) { 日志记录($param) } }
3.接着我们可以开始发布消息
$amqp = new Amqp(); // 没有传配置信息,则用默认的 // 发布消息 $amqp->publish('exchage_name','quene_name','route_key', 'direct', 'hello world');
4.上面已经在交换机exchage_name上创建了一个队列,像队列里面插入了一个消息(hello world),我们也可以登录mq后台去看看是否创建成功,接着我们需要写个后台进程去监听队列,对消息进行消费
$amqp = amqp(); $amqp->consume('exchage_name','quene_name','route_key', 'direct',false); $amqp->monitor(); // 监听消息
执行上面代码后,我们会发现后台的的队列消息已经被消费,且日志记录了hello world,证明消息已被消费。