有些人为了让项目快速上线,服务器往往安装宝塔面板,然后再极速安装LNMP。尽管环境搭建的时间省了,但是宝塔上PHP中扩展包没有提供AMQP。这时候只是为了使用消息队列而对PHP大动干戈, 不如使用一个PHP AMQP的库,即用即装,不对环境造成影响。
php-amqplib 客户端库,通过composer安装,不需要在PHP中安装扩展,以下为两种不同的安装方式。
1. 项目中新建composer.json,添加如下代码,然后composer install
{ "require": { "php-amqplib/php-amqplib": " 2.6.*" } }
2. 命令进入到项目,然后 composer require php-amqplib/php-amqplib 2.6.*
1. 进入web管控台,添加新用户,角色管理员,任何IP上都可以登录,授权指定虚拟机。
2. 添加交换机
3. 添加队列并与交互机绑定。
1. 封装rabbitMQ类。
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Class RabbitMQ. */ class RabbitMQ { const READ_LINE_NUMBER = 0; const READ_LENGTH = 1; const READ_DATA = 2; public $config; public static $prefix = 'autoinc_key:'; protected $exchangeName = 'flow'; protected $queueName = 'flow_queue'; /** * @var \PhpAmqpLib\Connection\AMQPStreamConnection */ protected $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ protected $channel; protected $queue; //配置项 private $host; private $port; private $user; private $pass; private $vhost; public function __construct($config = []) { //$this->config = $config; //设置rabbitmq配置值 $this->host = '192.168.1.101'; $this->port = 5672; $this->user = 'beiqiaosu'; $this->pass = 'beiqiaosu'; $this->vhost = 'report'; $this->connect(); } public function __call($method, $args = []) { $reConnect = false; while (1) { try { $this->initChannel(); $result = call_user_func_array([$this->channel, $method], $args); } catch (\Exception $e) { //已重连过,仍然报错 if ($reConnect) { throw $e; } \Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1)); if ($this->connection) { $this->close(); } $this->connect(); $reConnect = true; continue; } return $result; } //不可能到这里 return false; } /** * 连接rabbitmq消息队列. * * @return bool */ public function connect() { try { if ($this->connection) { unset($this->connection); } $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost); } catch (\Exception $e) { echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage(); return false; } } /** * 关闭连接. */ public function close() { $this->channel->close(); $this->connection->close(); } /** * 设置交换机名称. * * @param string $exchangeName */ public function setExchangeName($exchangeName = '') { $exchangeName && $this->exchangeName = $exchangeName; } /** * 设置队列名称. * * @param string $queueName */ public function setQueueName($queueName = '') { $queueName && $this->queueName = $queueName; } /** * 设置频道. */ public function initChannel() { if (!$this->channel) { //通道 $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName, false, true, false, false); $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName); } } /** * 获取队列数据. * * @return mixed */ public function pop() { while (1) { try { $this->connect(); $this->initChannel(); $message = $this->channel->basic_get($this->queueName); if ($message) { $this->channel->basic_ack($message->delivery_info['delivery_tag']); $result = $message->body; } else { throw new \Exception('Empty Queue Data'); } } catch (\Exception $e) { //\Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")"); sleep(1); continue; } return $result; } //不可能到这里 return false; } /** * 插入队列数据. * * @param $data * * @return bool */ public function push($data) { while (1) { try { $this->connect(); $this->initChannel(); $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->channel->basic_publish($message, $this->exchangeName); } catch (\Exception $e) { echo "$e->getMessage()"; continue; } return true; } //不可能到这里 return false; } }
2. 操作mq,出队,入队。
<?php require_once "vendor/autoload.php"; require_once "component/RabbitMQ.php"; $mq = new RabbitMQ(); // 消息消费测试 /*try { $res = $mq->pop(); }catch(\Exception $e) { var_dump($e->getMessage());die; }*/ // 消息生产测试 try { $res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155'])); }catch(\Exception $e) { var_dump($e->getMessage());die; } var_dump($res);die;
1. 先通过生产消息(入队)方法运行一下,然后进入队列中get message查看消息总数。
2. 测试调用消费,再查看总数。
如果觉得博客文章对您有帮助,异或土豪有钱任性,可以通过以下扫码向我捐助。也可以动动手指,帮我分享和传播。您的肯定,是我不懈努力的动力!感谢各位亲~