[RabbitMQ] Work Queues

new_task.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$data = implode('', array_slice($argv, 1));
if(empty($data)){
	$data = "Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo '[x] Sent', $data, "\n";


$channel->close();
$connection->close();

worker.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body,'.'));
  echo "[x] Done\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$ php new_task.php “A very hard task which takes two seconds..”
[x] SentA very hard task which takes two seconds..

$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World!
[x] Done
[x] Received Hello World!
[x] Done
[x] Received A very hard task which takes two seconds..
[x] Done

worker.phpのshellを複数立てると、分散して処理される
[vagrant@localhost mailer]$ php new_task.php First message.
[x] SentFirstmessage.
[vagrant@localhost mailer]$ php new_task.php Second message..
[x] SentSecondmessage..
[vagrant@localhost mailer]$ php new_task.php Third message…
[x] SentThirdmessage…
[vagrant@localhost mailer]$ php new_task.php Fourth message….
[x] SentFourthmessage….

$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Secondmessage..
[x] Done
[x] Received Fourthmessage….
[x] Done
すげえ

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode('', array_slice($argv, 1));
if(empty($data)){
	$data = "Hello World!";
}
$msg = new AMQPMessage(
	$data,
	array('delivery_mode'=> AMQPMessage::DELVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo '[x] Sent', $data, "\n";


$channel->close();
$connection->close();
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body,'.'));
  echo "[x] Done\n";
  $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

queueの振る舞いだが、わかったようでわからんな。。