SQS stands for Simple Queue Service, is a highly scalable and reliable distributed queueing system, which can be used to make applications asynchronous.
I will show you the simplest way of integrating the SQS, installing AWS SDK, Creating a Queue and a Worker to manage the queues and push tasks to Queues.
Install and use PHP AWS SDK
Please read my previous article about that here How to use Amazon PHP AWS SDK
Creating Worker and Queue
To create a queue and worker is very easy, you need the SDK instance and SQS instance
This script has to be run from the command line in order to start multiple workers to handle as much as you need tasks.
// worker.php
// $this->config is array with Amazon settings that are mentioned in the previous article about installing SDK
$sdk = new Sdk([
'region' => $this->config['amazon']['sqs']['region'],
'version' => 'latest',
]);
// Create the SQS instance with the right credentials
$sqs = $sdk->createSqs([
'region' => $this->config['amazon']['sqs']['region'],
'version' => 'latest',
'credentials' => [
'key' => $this->config['amazon']['sqs']['access_key'],
'secret' => $this->config['amazon']['sqs']['secret_access_key'],
],
]);
// You need to specify a Queue name, string, you can have multiple queues, and this depends on how much tasks you need to handle, in the most cases, 1 will do the perfect job
$queueName = $this->config['amazon']['sqs']['workers']['queue'];
// I'm using Symfony 2 console, that's why I have $output object
$output->writeln('[*] Waiting for SQS messages. - ' . date(self::DATEFORMAT));
// We need to make a while loop in order to stay awake and wait for messeges/tasks from the Queue
while (true) {
try {
// Create the queue, this is done the first time, after that it just doesn't return anything, also you can create the Queue from the Amazon SQS profile
$sqs->createQueue(['QueueName' => $queueName]);
// Get the queue URL from the queue name.
$result = $sqs->getQueueUrl(['QueueName' => $queueName]);
$queueUrl = $result->get('QueueUrl');
// Receive a message from the queue
$result = $sqs->receiveMessage([
'QueueUrl' => $queueUrl,
'WaitTimeSeconds' => 1 // Enable long polling
]);
if ($result['Messages'] == null) {
// No message to process so continue to looooooooooooooooooooop
continue;
}
// Get the message information
$message = array_pop($result['Messages']);
$queueHandle = $message['ReceiptHandle'];
$messageBody = $message['Body'];
$output->writeln("[x] Received ". $messageBody . ' - ' . date(self::DATEFORMAT));
// Do what you want to do with the task, here I'm processing user uploaded videos with FFMPEG making some GIFs and additional stuff
// My $messageBody contains video SLUG, according to it I can get the right video from the database and do the processing on it
$this->videoService->process($messageBody);
// Delete the message, it is processed
$sqs->deleteMessage(array(
'QueueUrl' => $queueUrl,
'ReceiptHandle' => $queueHandle
));
$output->writeln('[x] Done - ' . date(self::DATEFORMAT));
} catch (Exception $e) {
// Always log the errors, this scripts runs in commandline and if something happens, you will need the info
$output->writeln('[*] Error receiving message to SQS queue - ' . date(self::DATEFORMAT) . $e->getMessage());
}
}
Adding tasks to the Queue
// sqsPushToQueue.php
try {
$sdk = new Sdk([
'region' => $this->config['amazon']['sqs']['region'],
'version' => 'latest',
]);
$sqs = $sdk->createSqs([
'region' => $this->config['amazon']['sqs']['region'],
'version' => 'latest',
'credentials' => [
'key' => $this->config['amazon']['sqs']['access_key'],
'secret' => $this->config['amazon']['sqs']['secret_access_key'],
],
]);
$queueName = $this->config['amazon']['sqs']['workers']['queue'];
// Get the queue URL from the queue name.
$result = $sqs->getQueueUrl(['QueueName' => $queueName]);
$queueUrl = $result->get('QueueUrl');
// Send the message
$sqs->sendMessage(array(
'QueueUrl' => $queueUrl,
'MessageBody' => $slug // As already mentioned above, I'm sending the slug that I will need to get the right video to process
));
$this->logger->debug('SQS: data sent! ' . $slug);
} catch (Exception $e) {
$this->logger->err('SQS: Error sending message to queue ' . $e->getMessage());
}
That’s it, if you have any comments, please fill them below.