To implement RabbitMQ in our project we need to write the code for producer and consumer.
What is Producer?
Producer produces the messages and sends it to exchange. Exchange receives the messages and routes it to the corresponding queue.
What is Consumer?
Consumer consumes the messages from queue and processes it.
The message produced by producer is not directly sent to queue, instead it goes to specified exchange. Exchange routes the message to correct queue with the help of routing keys. The message stays in the queue until it is received by consumer. Then Consumer handles the message.
Producers and consumers are written by developers.
Following is the simple example of producer and consumer.
Producer:
require_once __DIR__ . ‘/vendor/autoload.php’; //import libraries
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
public function publish() { $this->writeLog("TestController:: index::starting to make connection with rabbitmq .."); try { //make connection with credentials $connection=new AMQPStreamConnection("hostname", producer-port-number, "username", "password", "vhost"); $resultData = print_r($connection, 1); $this->writeLog("TestController:: index::connection object value with rabbitmq :: $resultData"); } catch (Exception $e) { $resultData = print_r($e, 1); $this->writeLog("TestController:: index::exception while making connection with rabbitmq : $resultData"); } if ($connection) //if connection establishes successfully { try { $channel = $connection->channel();//create channel //declare a queue $channel->queue_declare('test_queue', false, false, false, false); $msg = new AMQPMessage(‘test message’); //create message $channel->basic_publish($msg, '', 'test_queue'); //publish message //echo " [x] Sent 'Hello World!'\n"; $channel->close(); //close channel $connection->close(); //close connection } catch (Exception $e) { $resultData = print_r($e, 1); $this->writeLog("TestController:: index::exception while making channel with rabbitmq connection : $resultData"); } }
Consumer:
The code in consumer uses almost same code as in producer except receiving messages.
require_once(‘/vendor/autoload.php’); //import library
public function consume() { //define('AMQP_DEBUG', true); $this->writeLog("TestController:: index::starting to make connection with rabbitmq .."); try { $connection = new AMQPStreamConnection("hostname", consumer-port-number, "username", "password", "vhost"); $resultData = print_r($connection, 1); $this->writeLog(TeseController:: index::connection object value with rabbitmq :: $resultData"); } catch (Exception $e) { $resultData = print_r($e, 1); $this->writeLog("TestController:: index::exception while making connection with rabbitmq : $resultData"); } if ($connection) { try { $channel = $connection->channel(); $channel->queue_declare('test_queue', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $channel->basic_consume('test_queue', '', false, true, false, false, array($this, 'processOrder') ); //consume message and calls the process order while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } catch (Exception $e) { $resultData = print_r($e, 1); $this->writeLog("TestController:: index::exception while making channel with rabbitmq connection : $resultData"); } } } Function processOrder(){ //here we can declare any task we want to process in background }
So, in this way, you can process your background tasks using RabbitMQ so that they can be processed asynchronously.