RabbitMQ 入门 DEMO - 基于 PHP AMQPlib
通过 composer 安装 PHP AMQPlib
composer require php-amqplib/php-amqplib
目录结构:
├─rabbitmq
└─vendor
├─composer
├─php-amqplib
│ └─php-amqplib
│ └─PhpAmqpLib
│ ├─Channel
│ ├─Connection
│ │ └─Heartbeat
│ ├─Exception
│ ├─Exchange
│ ├─Helper
│ │ └─Protocol
│ ├─Message
│ └─Wire
│ └─IO
└─phpseclib
└─phpseclib
└─phpseclib
├─Crypt
├─File
│ └─ASN1
├─Math
├─Net
│ └─SFTP
└─System
└─SSH
└─Agent
生产者和消费者demo
生产者
<?php
require "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
if ($argc < 2) {
exit("please input argument, like `php -f directP.php mystring`");
}
$conn = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
if (!$conn->isConnected()) {
exit("connect err...");
}
$channel = $conn->channel();
$properties = [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
$channel->basic_publish(new AMQPMessage($argv[1], $properties), "myExchange", "my", true);
$channel->close();
$conn->close();
消费者
<?php
require "../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
$conn = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
if (!$conn->isConnected()) {
exit("connect err...");
}
$channel = $conn->channel();
$callback = function($msg) {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], false); // default ack this msg
echo $msg->body . PHP_EOL;
if ($msg->body == 'quit') {
exit('quit now...');
}
file_put_contents('./test.txt', json_encode($msg));
return true;
};
$channel->basic_consume('myQueue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$conn->close();
测试
开启消费者:
php -f directC.php
生产消息:
php -f directP.php "hello world"
此时可以在消费者终端看到消息。