工具库说明:https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md
代码示例
<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; define('HOST', getenv('TEST_RABBITMQ_HOST') ? getenv('TEST_RABBITMQ_HOST') : 'localhost'); define('PORT', getenv('TEST_RABBITMQ_PORT') ? getenv('TEST_RABBITMQ_PORT') : 5672); define('USER', getenv('TEST_RABBITMQ_USER') ? getenv('TEST_RABBITMQ_USER') : 'guest'); define('PASS', getenv('TEST_RABBITMQ_PASS') ? getenv('TEST_RABBITMQ_PASS') : 'guest'); define('VHOST', '/'); define('AMQP_DEBUG', getenv('TEST_AMQP_DEBUG') !== false ? (bool)getenv('TEST_AMQP_DEBUG') : false); class comsumer{ protected $connection = null; public function __construct() { $this->connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); } public function run(){ $exchange = 'router'; $queue = 'msgs'; $consumerTag = 'consumer'; $channel = $this->connection->channel(); $channel->queue_declare($queue, false, true, false, false); $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange); /** * @param \PhpAmqpLib\Message\AMQPMessage $message */ function process_message($message) { echo "\n--------\n"; echo $message->body; echo "\n--------\n"; $message->ack(); if ($message->body === 'quit') { $message->getChannel()->basic_cancel($message->getConsumerTag()); } } $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); /** * @param \PhpAmqpLib\Channel\AMQPChannel $channel * @param \PhpAmqpLib\Connection\AbstractConnection $connection */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $this->connection); while ($channel ->is_consuming()) { $channel->wait(); } } } $comsumer = new comsumer(); $comsumer->run();