[RabbitMQ] Publish/Subscribe

Listing exchanges
$ sudo rabbitmqctl list_exchanges
Listing exchanges …
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
…done.

emit_log.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('logs', 'fanout', false, false, false);

$data = implode('', array_slice($argv, 1));
if(empty($data)){
	$data = "info:Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');

echo '[x]Sent', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('logs', 'fanout', false, false, false);

list($queue_name,,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
  echo ' [x]', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();

$ php receive_logs.php
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: NOT_FOUND – no queue ‘logs’ in vhost ‘/’ in /home/vagrant/dev/mailer/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:216

なんやこれは。。。よーわからん。。

[RabbitMQ] Work Queues

new_task.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$data = implode('', array_slice($argv, 1));
if(empty($data)){
	$data = "Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo '[x] Sent', $data, "\n";


$channel->close();
$connection->close();

worker.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body,'.'));
  echo "[x] Done\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$ php new_task.php “A very hard task which takes two seconds..”
[x] SentA very hard task which takes two seconds..

$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World!
[x] Done
[x] Received Hello World!
[x] Done
[x] Received A very hard task which takes two seconds..
[x] Done

worker.phpのshellを複数立てると、分散して処理される
[vagrant@localhost mailer]$ php new_task.php First message.
[x] SentFirstmessage.
[vagrant@localhost mailer]$ php new_task.php Second message..
[x] SentSecondmessage..
[vagrant@localhost mailer]$ php new_task.php Third message…
[x] SentThirdmessage…
[vagrant@localhost mailer]$ php new_task.php Fourth message….
[x] SentFourthmessage….

$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Secondmessage..
[x] Done
[x] Received Fourthmessage….
[x] Done
すげえ

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode('', array_slice($argv, 1));
if(empty($data)){
	$data = "Hello World!";
}
$msg = new AMQPMessage(
	$data,
	array('delivery_mode'=> AMQPMessage::DELVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo '[x] Sent', $data, "\n";


$channel->close();
$connection->close();
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body,'.'));
  echo "[x] Done\n";
  $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

queueの振る舞いだが、わかったようでわからんな。。

RabbitMQとは

RabbitMQとは
-> pivotalによって開発されているオープンソースメッセージブローカー
-> AMQP, STOMP, MQTT, HTTPに対応

AMAPとは?
– メッセージ転送に使われる通信プロトコル

Exchange
– Exchange は、生成されたMessageを受け取る役割
Binding
– ExchangeとMessageQueueの対応付け
Message Queue
– Messageを蓄積し、Consumer Applicationnに引き渡す

Publisher Applicationによって生成されたMessageは、Exchangeに渡される
ExchangeはBindingに基づいて、Messageを適切なMessage Queueに引き渡す
Consumer Applicationによって、Message QueueからMessageが取り出される

Direct
– Direct な Exchange は、Messageに付与されているrouting keyと、bindingに設定されているbinding key を見て、routing key = binding key となるような、Message Queue に Message を配送
Fanout
– Fanout なExchangeは、bindされているMessage Queueすべてに受け取ったメッセージの配送
Topic
– メッセージにrouting keyを指定して、そのkeyを元に配送するMessage Queueを選択

RabbitMQは、オープンソースでErlangで記述

公式サイト: https://www.rabbitmq.com/

ちょっと理解するのに時間がかかりそう

RabbitMQでMailerQを実行する

$ sudo yum install php-pecl-amqp

setting.php

$address = 'amqp://guest:guest@192.168.33.10';
$outbox = 'outbox_test';
$resultbox = 'results_test';

$recipientDomain = 'gmail.com';
$recipientEmail = 'hogehoge@gmail.com';
$fromAddress = 'info@hpscript.com';

send.php

require_once('settings.php');

try {
	$connection = new AMQPConnection(array(
		'host' = $hostname,
		'login' => $username,
		'password' => $password
	));

	$connection->connect();
}
catch(AMQPException $exception){
	echo "Could not establish a connection to the RabbitMQ server.\n";
}

try {
	$channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $exception){
	echo "Connection to the broker was lost(creating channel).\n";
}

try {
	$exchange = new AMQPExchange($channel);
	$exchange->setName('exchange1');
	$exchange->setType('fanout');
	$exchange->declareExchange();
} catch (AMQPExchangeException $exception){
	echo "CHannel is not connected to broker(declaring exchange).\n";
} catch (AMQPConnectionException $exception){
	echo "Connection to the broker was lost(declaring exchange).\n";
}

try {
	$queue = new AMQPQueue($channel);
	$queue->setName($outbox);
	$queue->setFlags(AMQP_DURABLE);
	$queue->declareQueue();
	$queue->bind('exchange1','key2');
} catch (AMQPQueueException $exception){
	echo "Channel is not connected to a broker(creating queue).\n";
} catch (AMQPConnectionException $exception){
	echo "Connection to the broker was lost.(creating queue)/\n";
}

$jsonMessage = json_encode(array(
	'envelope'=>$fromAddress,
	'recipient' => $recipientEmail,
	'mime' => "From:".$fromAddress."\r\n"
		. "To:". $recipientEmail . "\r\n"
		. "Subject: Example subject\r\n\r\n"
		. "This is the example message text"
));

try {
	$message = $exchange->publish($jsonMessage,'key2');
} catch (AMQPExchangeException $exception){
	echo "Channel is not connected to a broker(publishing message on queue).\n";
} catch (AMQPConnectionException $exception){
	echo "Connection to the broker was lost(publishing message on queue).\n";
} catch (AMQPChannelException $exception){
	echo "The channel is not open(publishing message on queue).\n";
}

$connection->disconnect();
require_once('settings.php');

try {
	$connection = new AMQPConnection(array(
		'host' => $hostname,
		'login' => $username,
		'password' => $password,
		'vhost' => $vhost

	));

	$connection->connect();
} catch (AMQPException $exception){
	echo "Could not establish a connection to the RabbitMQ servier.\n";
}

try {
	$channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $exception){
	echo "Connection to the broker was lost(creating channel).\n";
}

try {
	$queue = new AMQPQueue($channel);
	$queue->setName($resultbox);
	$queue->setFlags(AMQP_DURABLE);
	$queue->bind('exchange1','key2');
	$queue->declareQueue();
} catch(AMQPQueueException $exception){
	echo "Channel is not connected to a broker(creating queue).\n";
} catch (AMQPConnectionException $exception){
	echo "Connection to the broker was lost.(creating queue)/\n";
}

while ($envelope = $queue->get()){
	echo "Received:\n";
	echo $envelope->getBody(). "\n";
	echo "----------------------------------------------\n\n";
}

$connection->disconnect();

$ php result.php
PHP Fatal error: Uncaught Error: Class ‘AMQPConnection’ not found in /home/vagrant/dev/mailer/result.php:6
Stack trace:
#0 {main}
thrown in /home/vagrant/dev/mailer/result.php on line 6 

ん? どういうこと?
RabbitMQの使い方がよくわかってないな。。

[AmazonLinux2] RabbitMQをインストールして実行する

$ sudo yum -y update
// epelをインストール
$ sudo yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
$ sudo yum-config-manager –enable epel
// erlangをインストール
$ sudo yum install erlang –enablerepo=epel
$ sudo yum install yum-plugin-versionlock
$ sudo yum versionlock gcc-*
$ sudo yum install -y socat
$ sudo yum install logrotate
$ sudo rpm –import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
$ sudo yum install rabbitmq-server

$ sudo rabbitmq-plugins enable rabbitmq_management
$ chkconfig rabbitmq-server on
$ sudo systemctl start rabbitmq-server
$ systemctl status rabbitmq-server
● rabbitmq-server.service – RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 火 2021-02-16 09:31:04 JST; 3s ago
Main PID: 28564 (beam)
CGroup: /system.slice/rabbitmq-server.service
├─28564 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -K true -A30 -P 1048576 — -root /u…
├─28636 inet_gethost 4
└─28637 inet_gethost 4
$ sudo rabbitmqctl list_queues
Listing queues …
…done.

$ php send.php
[x] sent ‘Hello World!’
$ php recieve.php
[*] Waiting for messages. To exit press CTR+C
[x] Received Whats up

ぎゃあああああああああああああああああああああああ
助けてくれええええええええええええええええええええええええええええ

RabbitMQの送受信

送信側
send.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// コネクションの確立
$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Whats up');

$channel->basic_publish($msg, '', 'hello');

echo "[x] sent 'Hello World!'\n";

$channel->close();

受信側

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo "[*] Waiting for messages. To exit press CTR+C\n";

$callback = function($msg){
	echo '[x] Received ', $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)){
	$channel->wait();
}

$ php recieve.php
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPIOException: stream_socket_client(): unable to connect to tcp://192.168.33.10:5672 (Connection refused) in /home/vagrant/dev/mailer/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:111

サーバにRabbitMQをインストールしないとダメなのか。。

MailerQを使いたい

MailerQを使ってメール配信したい。
公式サイトを見ると、AMQPが必要らしい。
で、AMQPにはbcmatchが必要とのことで、bcmatchをインストールする

$ php -v
PHP 7.4.11 (cli) (built: Oct 21 2020 19:12:26) ( NTS )
$ sudo yum list | grep php
// 省略
php.x86_64 7.4.14-1.amzn2 amzn2extra-php7.4
php-bcmath.x86_64 7.4.14-1.amzn2 amzn2extra-php7.4
$ sudo yum install php-bcmath

AMQPとは?
-> Advanced Message Queuing Protocol(AMQP)の略
-> メッセージ指向ミドルウェアのオープンスタンダードなアプリケーション層プロトコル
-> AMQPはメッセージングプロバイダとクライアントに、HTTPなどの手法と同じように異なるベンダ間で正しく相互運用できるような振る舞いを要求

Github: https://github.com/php-amqplib/php-amqplib
This library is a pure PHP implementation of the AMQP 0-9-1 protocol. It’s been tested against RabbitMQ.
The library was used for the PHP examples of RabbitMQ in Action and the official RabbitMQ tutorials.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.

$ composer require php-amqplib/php-amqplib
$ ls
composer.json composer.lock vendor

send_msg.php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.33.10', 8000, 'password', '/my_vhost');

$channel = $connection->channel();
$channel->queue_declar('Hello_World', false, false, false, false);

$msg = new AMQPMessage('Hello RabbitMQ World!');
$channel->basic_publish($msg, '', 'Hello_World');
echo " [x] Sent 'Hello_World'\n";

$channel->close();
$connection->close();

$ php send_msg.php
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPIOException: stream_socket_client(): unable to connect to tcp://192.168.33.10:8000 (Connection refused) in /home/vagrant/dev/mailer/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:111

ん? なんやこれ、どういうことだ。。。

[Laravel8.x] Mailgun + お名前.com + Route53 のメール送信設定方法

Laravel8.x + Mailgun + お名前.com + Route53

まず、Mailgunでカスタムドメインを設定する必要がある
そうすると、DNSに、TXT、MX、CNAMEを登録しろ、と出てくる

なるほど、これをお名前.comに登録すれば良いのね、ということで、お名前.comのDNS設定で追加して、Verify DNS Settingsを押しても一向にverifyされない。。。

何故だ? 24時間くらい待った方が良いの? 常識的にそんな訳ないよね。。お名前に問い合わせしようかな。。と考えていたが、
TXT、MX、CNAMは、お名前側ではなく、Route53のHosted zonesのcreate recordで設定して上手くいった。
設定内容は、record nameと”Enter This Value”をvalueに入れていく。

mxレコードの場合は、valueに”10 mxa.mailgun.org”と入れる。

これで、再度Verify DNS Settingsを押すと、verifyされる。

で、設定したドメインのSMTP credentialのページに行き、ログインの箇所とReset Passwordでパスワードを取得して、この内容をlaravelのenvに記載する

laravel .env
L maildriverはsmtpのままで大丈夫

MAIL_MAILER=smtp
MAIL_HOST=smtp.mailgun.org
MAIL_PORT=587
MAIL_USERNAME=postmaster@${domain name}
MAIL_PASSWORD=${domain password}
MAIL_ENCRYPTION=null
MAIL_FROM_ADDRESS=null
MAIL_FROM_NAME=""

これでOK
送信テストを行う

まあ、Amazon SESの申請が降りたから、mailgun使わなくて良いんだけど、SESの申請が落ちた時はこちらを使う。
取り敢えず、MailgunのTXT、MX、CNAMEはお名前.comではなく、Route53側で設定するということ。

mailstrapで開発してて、さあSTG、商用環境にデプロイしよか、って時にメール送信できないとか、恐怖でしかないわ、ホンマに。

[Laravel8.x] SendGridでメール送信する

$ composer require sendgrid/sendgrid

.env

SENDGRID_API_KEY=""
FROM_EMAIL=test@gmail.com
FROM_NAME=HPSCRIPT

controller

public function test(){
    	$this->sendMail();
    	return view('index');
    }

public function sendMail(){
    	$email = new \SendGrid\Mail\Mail();
    	$email->setFrom(getenv('FROM_EMAIL'), getenv('FROM_NAME'));
    	$email->setSubject("test");
    	$email->addTo('***');

    	$sendGrid = new \SendGrid(getenv('SENDGRID_API_KEY'));
    	$email->addContent(
            "text/plain",
            strval(
                view(
                    'index'
                )
            )
        );
    	$email->addContent(
    		"text/html",
    		strval(
    			view(
    				'index'
    			)
    		)
    	);
    	
    	try {
    		$sendGrid->send($email);
    		return true;
    	} catch (Exception $e) {
    		echo $e;
            // Log::debug($e->getMessage());
            return false;
        }
    }

メール配信を無料ではなくてちゃんと月額課金の契約して行くと、ステージ変わった感があるな。
感慨深いものがある。

Mailgunで送信ドメインをカスタマイズしたい

Sending -> Domains -> Add new domain

テスト用として試したいだけなので、ドメインは「お名前.com」で取得してもうすぐ期限が切れそうな「free-engineer.work」を使いたいと思う。

「mg.free-engineer.work」で入力する。

すると、
「1. Go to your DNS provider」と表示される。
続いて、
TXT, MX, CNAMEを入力していきます。

Once you make the above DNS changes it can take 24-48hrs for those changes to propagate. We will email you to let you know once your domain is verified.

反映までに結構時間がかかりそうなので、それまで別のことをしたいと思う。