Introduction to VPS and Web Technology Development

rabbitmq消费者php版本

自由vps 队列

工具库说明:https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md

代码示例

<?php

require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

define('HOST', getenv('TEST_RABBITMQ_HOST') ? getenv('TEST_RABBITMQ_HOST') : 'localhost');
define('PORT', getenv('TEST_RABBITMQ_PORT') ? getenv('TEST_RABBITMQ_PORT') : 5672);
define('USER', getenv('TEST_RABBITMQ_USER') ? getenv('TEST_RABBITMQ_USER') : 'guest');
define('PASS', getenv('TEST_RABBITMQ_PASS') ? getenv('TEST_RABBITMQ_PASS') : 'guest');
define('VHOST', '/');
define('AMQP_DEBUG', getenv('TEST_AMQP_DEBUG') !== false ? (bool)getenv('TEST_AMQP_DEBUG') : false);

class comsumer{
    protected $connection = null;
    public function __construct()
    {
        $this->connection =  new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
    }

    public function run(){
        $exchange = 'router';
        $queue = 'msgs';
        $consumerTag = 'consumer';
        $channel = $this->connection->channel();
        
        $channel->queue_declare($queue, false, true, false, false);

        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);

        $channel->queue_bind($queue, $exchange);

        /**
         * @param \PhpAmqpLib\Message\AMQPMessage $message
         */
        function process_message($message)
        {
            echo "\n--------\n";
            echo $message->body;
            echo "\n--------\n";

            $message->ack();

            if ($message->body === 'quit') {
                $message->getChannel()->basic_cancel($message->getConsumerTag());
            }
        }


        $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

        /**
         * @param \PhpAmqpLib\Channel\AMQPChannel $channel
         * @param \PhpAmqpLib\Connection\AbstractConnection $connection
         */
        function shutdown($channel, $connection)
        {
            $channel->close();
            $connection->close();
        }

        register_shutdown_function('shutdown', $channel, $this->connection);

        while ($channel ->is_consuming()) {
            $channel->wait();
        }
    }
}


$comsumer = new comsumer();
$comsumer->run();
使用chatGPT寻求答案
标签: 暂无标签

免责声明:

本站提供的资源,都来自网络,版权争议与本站无关,所有内容及软件的文章仅限用于学习和研究目的。不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负,我们不保证内容的长久可用性,通过使用本站内容随之而来的风险与本站无关,您必须在下载后的24个小时之内,从您的电脑/手机中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。侵删请致信E-mail:master@freevpsweb.com

同类推荐
评论列表