new_task.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | require_once __DIR__ . '/vendor/autoload.php' ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection( 'localhost' , 5672, 'guest' , 'guest' ); $channel = $connection ->channel(); $channel ->queue_declare( 'hello' , false, false, false, false); $data = implode( '' , array_slice ( $argv , 1)); if ( empty ( $data )){ $data = "Hello World!" ; } $msg = new AMQPMessage( $data ); $channel ->basic_publish( $msg , '' , 'hello' ); echo '[x] Sent' , $data , "\n" ; $channel ->close(); $connection ->close(); |
worker.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | require_once __DIR__ . '/vendor/autoload.php' ; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection( 'localhost' , 5672, 'guest' , 'guest' ); $channel = $connection ->channel(); $channel ->queue_declare( 'hello' , false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n" ; $callback = function ( $msg ) { echo ' [x] Received ' , $msg ->body, "\n" ; sleep(substr_count( $msg ->body, '.' )); echo "[x] Done\n" ; }; $channel ->basic_consume( 'hello' , '' , false, true, false, false, $callback ); while ( $channel ->is_consuming()) { $channel ->wait(); } |
$ php new_task.php “A very hard task which takes two seconds..”
[x] SentA very hard task which takes two seconds..
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World!
[x] Done
[x] Received Hello World!
[x] Done
[x] Received A very hard task which takes two seconds..
[x] Done
worker.phpのshellを複数立てると、分散して処理される
[vagrant@localhost mailer]$ php new_task.php First message.
[x] SentFirstmessage.
[vagrant@localhost mailer]$ php new_task.php Second message..
[x] SentSecondmessage..
[vagrant@localhost mailer]$ php new_task.php Third message…
[x] SentThirdmessage…
[vagrant@localhost mailer]$ php new_task.php Fourth message….
[x] SentFourthmessage….
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Secondmessage..
[x] Done
[x] Received Fourthmessage….
[x] Done
すげえ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | require_once __DIR__ . '/vendor/autoload.php' ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection( 'localhost' , 5672, 'guest' , 'guest' ); $channel = $connection ->channel(); $channel ->queue_declare( 'task_queue' , false, true, false, false); $data = implode( '' , array_slice ( $argv , 1)); if ( empty ( $data )){ $data = "Hello World!" ; } $msg = new AMQPMessage( $data , array ( 'delivery_mode' => AMQPMessage::DELVERY_MODE_PERSISTENT) ); $channel ->basic_publish( $msg , '' , 'task_queue' ); echo '[x] Sent' , $data , "\n" ; $channel ->close(); $connection ->close(); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | require_once __DIR__ . '/vendor/autoload.php' ; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection( 'localhost' , 5672, 'guest' , 'guest' ); $channel = $connection ->channel(); $channel ->queue_declare( 'task_queue' , false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n" ; $callback = function ( $msg ) { echo ' [x] Received ' , $msg ->body, "\n" ; sleep(substr_count( $msg ->body, '.' )); echo "[x] Done\n" ; $msg ->ack(); }; $channel ->basic_qos(null, 1, null); $channel ->basic_consume( 'task_queue' , '' , false, false, false, false, $callback ); while ( $channel ->is_consuming()) { $channel ->wait(); } $channel ->close(); $connection ->close(); |
queueの振る舞いだが、わかったようでわからんな。。