new_task.php
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $data = implode('', array_slice($argv, 1)); if(empty($data)){ $data = "Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, '', 'hello'); echo '[x] Sent', $data, "\n"; $channel->close(); $connection->close();
worker.php
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body,'.')); echo "[x] Done\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); }
$ php new_task.php “A very hard task which takes two seconds..”
[x] SentA very hard task which takes two seconds..
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World!
[x] Done
[x] Received Hello World!
[x] Done
[x] Received A very hard task which takes two seconds..
[x] Done
worker.phpのshellを複数立てると、分散して処理される
[vagrant@localhost mailer]$ php new_task.php First message.
[x] SentFirstmessage.
[vagrant@localhost mailer]$ php new_task.php Second message..
[x] SentSecondmessage..
[vagrant@localhost mailer]$ php new_task.php Third message…
[x] SentThirdmessage…
[vagrant@localhost mailer]$ php new_task.php Fourth message….
[x] SentFourthmessage….
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Secondmessage..
[x] Done
[x] Received Fourthmessage….
[x] Done
すげえ
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data = implode('', array_slice($argv, 1)); if(empty($data)){ $data = "Hello World!"; } $msg = new AMQPMessage( $data, array('delivery_mode'=> AMQPMessage::DELVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); echo '[x] Sent', $data, "\n"; $channel->close(); $connection->close();
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body,'.')); echo "[x] Done\n"; $msg->ack(); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
queueの振る舞いだが、わかったようでわからんな。。