new_task.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$data = implode('', array_slice($argv, 1));
if(empty($data)){
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo '[x] Sent', $data, "\n";
$channel->close();
$connection->close();
worker.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body,'.'));
echo "[x] Done\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$ php new_task.php “A very hard task which takes two seconds..”
[x] SentA very hard task which takes two seconds..
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World!
[x] Done
[x] Received Hello World!
[x] Done
[x] Received A very hard task which takes two seconds..
[x] Done
worker.phpのshellを複数立てると、分散して処理される
[vagrant@localhost mailer]$ php new_task.php First message.
[x] SentFirstmessage.
[vagrant@localhost mailer]$ php new_task.php Second message..
[x] SentSecondmessage..
[vagrant@localhost mailer]$ php new_task.php Third message…
[x] SentThirdmessage…
[vagrant@localhost mailer]$ php new_task.php Fourth message….
[x] SentFourthmessage….
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Secondmessage..
[x] Done
[x] Received Fourthmessage….
[x] Done
すげえ
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode('', array_slice($argv, 1));
if(empty($data)){
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
array('delivery_mode'=> AMQPMessage::DELVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo '[x] Sent', $data, "\n";
$channel->close();
$connection->close();
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body,'.'));
echo "[x] Done\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
queueの振る舞いだが、わかったようでわからんな。。