Category Archive RabbitMQ

Byphunsanit

CodeIgniter: php-amqplib

ลองเขียน custom driver สำหรับติดต่อกับ RabbitMQ / AMQP ยังไม่สมบูรณ์แต่ก็ดีกว่าการที่ต้องมา connect ผ่าน php-amqplib ทุกครั้งที่จะใช้ตามปกติ

  1. config file ก่อนตามระเบียบ
    <?php
    defined('BASEPATH') or exit('No direct script access allowed');
    
    /*
    | -------------------------------------------------------------------
    | RABBITMQ CONNECTIVITY SETTINGS
    | -------------------------------------------------------------------
    | This file will contain the settings needed to access your RabbitMQ.
    |
    | For complete instructions please consult the 'RabbitMQ Connection'
    | page of the User Guide.
    |
    | -------------------------------------------------------------------
    | EXPLANATION OF VARIABLES
    | -------------------------------------------------------------------
    |
    |    ['connection_timeout'] => 3.0,
    |    ['context'] => null,
    |    ['heartbeat'] => 0
    |    ['host'],
    |    ['insist'] => false,
    |    ['keepalive'] => false,
    |    ['locale'] => 'en_US',
    |    ['login_method'] => 'AMQPLAIN',
    |    ['login_response'] => null,
    |    ['password'],
    |    ['port'],
    |    ['read_write_timeout'] => 3.0,
    |    ['user'],
    |    ['vhost'] => '/',
     */
    $config['RabbitMQ']['connects'] = [
        'default' => [
            'connection_timeout' => 3.0,
            'context' => null,
            'heartbeat' => 0,
            'host' => '127.0.0.1',
            'insist' => false,
            'keepalive' => false,
            'locale' => 'en_US',
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'password' => 'guest',
            'port' => 5672,
            'read_write_timeout' => 3.0,
            'user' => 'guest',
            'vhost' => '/',
        ],
    ];
    
    /*
    | -------------------------------------------------------------------
    | RABBITMQ DEBUG SETTINGS
    | -------------------------------------------------------------------
    |
     */
    $config['RabbitMQ']['debug'] = false;
    
  2. ถึงคิวไฟล์ driver ที่จะเป็นตัวกลางระหว่าง codeigniter และ php-amqplib
    <?php
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    class RabbitMQ extends AMQPStreamConnection
    {
    
        public function __construct($config = array())
        {
            $this->CI = &get_instance();
    
            $this->config = $config['RabbitMQ'];
    
            if ($this->config['debug'] == true) {
                define('AMQP_DEBUG', true);
            }
        }
    
        public function connect($dsnId = 'default') {
            $dsn = $this->config['connects'][$dsnId];
    
            $this->connection = new AMQPStreamConnection($dsn['host'], $dsn['port'], $dsn['user'], $dsn['password'], $dsn['vhost'], $dsn['insist'], $dsn['login_method'], $dsn['login_response'], $dsn['locale'], $dsn['connection_timeout'], $dsn['read_write_timeout'], $dsn['context'], $dsn['keepalive'], $dsn['heartbeat']);
    
            $this->channel = $this->connection->channel();
        }
    
        public function disconnect()
        {
            $this->channel->close();
            $this->connection->close();
        }
    
        public function setMessageJson($datas)
        {
            $msg_body = json_encode($datas);
    
            $properties = [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ];
    
            return new AMQPMessage($msg_body, $properties);
        }
    
    }
  3. ที่นี้ก็ controller ที่จะทำงานให้
    <?php
    defined('BASEPATH') or exit('No direct script access allowed');
    
    class Messaging extends CI_Controller
    {
        public function __construct()
        {
            parent::__construct();
    
            $this->load->driver('RabbitMQ');
        }
    
        public function receive()
        {
            set_time_limit(0);
    
            $this->rabbitmq->connect();
    
            $exchange_name = 'customers';
            $queue_name = 'invoices';
    
    /**
     * Declares exchange
     *
     * @param string $exchange_name
     * @param string $type
     * @param bool $passive
     * @param bool $durable
     * @param bool $auto_delete
     * @param bool $internal
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            $this->rabbitmq->channel->exchange_declare($exchange_name, 'fanout', false, true, false);
    
    /**
     * Declares queue, creates if needed
     *
     * @param string $queue
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param bool $auto_delete
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare('', false, false, true, false);
            $this->rabbitmq->channel->queue_bind($queue_name, $exchange_name);
    
            $callback = function ($msg) {
    
                $datas = json_decode($msg->body, true);
                fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true));
    
                sleep(substr_count($msg->body, '.'));
    
                /* delete message */
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            };
    
    /**
     * Starts a queue consumer
     *
     * @param string $queue_name
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack
     * @param bool $exclusive
     * @param bool $nowait
     * @param callback|null $callback
     * @param int|null $ticket
     * @param array $arguments
     * @return mixed|string
     */
            $this->rabbitmq->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
    
            while (count($this->rabbitmq->channel->callbacks)) {
    /**
     * Wait for some expected AMQP methods and dispatch to them.
     * Unexpected methods are queued up for later calls to this PHP
     * method.
     *
     * @param array $allowed_methods
     * @param bool $non_blocking
     * @param int $timeout
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
     * @return mixed
     */
                $this->rabbitmq->channel->wait();
            }
    
            $this->rabbitmq->channel->close();
            $connection->close();
        }
    
        public function send()
        {
            $this->rabbitmq->connect();
    
            $exchange_name = 'customers';
            $queue_name = 'invoices';
    
    /**
     * Declares exchange
     *
     * @param string $exchange_name
     * @param string $type
     * @param bool $passive
     * @param bool $durable
     * @param bool $auto_delete
     * @param bool $internal
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            $this->rabbitmq->channel->exchange_declare($exchange_name, 'fanout', false, true, false);
    
    /**
     * Declares queue, creates if needed
     *
     * @param string $queue
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param bool $auto_delete
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare($queue_name, false, true, false, false);
    
            $datas = [
                'rand' => rand(0, 100),
                'time' => date('Y-m-d H:m:s'),
            ];
    
    /**
     * Declares message
     *
     * @param array $datas
     * @return string
     */
            $msg = $this->rabbitmq->setMessageJson($datas);
    
    /**
     * Publishes a message
     *
     * @param AMQPMessage $msg
     * @param string $exchange
     * @param string $routing_key
     * @param bool $mandatory
     * @param bool $immediate
     * @param int $ticket
     */
            $this->rabbitmq->channel->basic_publish($msg, $exchange_name, $queue_name);
    
            echo '<br><pre>' . print_r($datas, true), '</pre>';
    
            $this->rabbitmq->disconnect();
    
        }
    
    }
    
  4. ทดสอบโดยเรียก http://localhost/CodeIgniter-3.1.3/messaging/send และ http://localhost/CodeIgniter-3.1.3/messaging/receive