Consume generic SQS messages in Laravel cover image

Consume generic SQS messages in Laravel

tzelleke • February 9, 2022

Laravel supports AWS SQS as a queue backend but expects SQS messages to be generated in specific format that includes the specific job handler class and a serialized job. This blog post shows how Laravel's implementation can be extended to consume generic SQS messages that originate outside of Laravel, for instance S3 event notifications when a file has been uploaded to a S3 bucket.

Tip: There exists already a dedicated package that implements this functionality and beyond. Check out the package on GitHub https://github.com/dusterio/laravel-plain-sqs to learn more.

Overview

Custom queue class

In our custom queue class we need to override the pop() method.

// app/Queue/SqsGenericQueue.php
<?php

namespace App\Queue;

use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\Jobs\SqsJob;
use Illuminate\Queue\SqsQueue;

class SqsGenericQueue extends SqsQueue
{
    public function pop($queue = null)
    {
        $response = $this->sqs->receiveMessage([
            'QueueUrl' => $queue = $this->getQueue($queue),
            'AttributeNames' => ['ApproximateReceiveCount'],
        ]);

        if (!is_null($response['Messages']) && count($response['Messages']) > 0) {
            $message = $response['Messages'][0];
            $body = json_decode($message['Body'], true);
            $job = $this->container->makeWith($this->jobClass, ['message' => $body]);
            $message['Body'] = $this->createPayload($job, $this->default);

            return new SqsJob(
                $this->container,
                $this->sqs,
                $message,
                $this->connectionName,
                $queue,
            );
        }
    }
}

Custom connector class

In our custom connector we need to override the connect() method. Here, we have only changed the returned queue class from Illuminate\Queue\SqsQueue to our custom queue class SqsGenericQueue.

// app/Queue/Connectors/SqsGenericConnector.php
<?php

namespace App\Queue\Connectors;

use App\Queue\SqsGenericQueue;
use Aws\Sqs\SqsClient;
use Illuminate\Contracts\Queue\Queue;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Support\Arr;

class SqsGenericConnector extends SqsConnector
{
    public function connect(array $config)
    {
        $config = $this->getDefaultConfiguration($config);

        if (! empty($config['key']) && ! empty($config['secret'])) {
            $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
        }

        return new SqsGenericQueue(
            new SqsClient($config),
            $config['queue'],
            $config['prefix'],
        );
    }
}

Registration with the service provider

We need to register our custom connector with Laravel's queue manager in a service provider.

// app/Providers/AppServiceProvider.php
use App\Queue\Connectors\SqsGenericConnector;

class AppServiceProvider extends ServiceProvider
{
    ...
    public function register()
    {
        $this->app->afterResolving(QueueManager::class, function (QueueManager $manager) {
            $manager->addConnector('sqs-generic', function () {
                return new SqsGenericConnector();
            });
        });
    }
}

Finally, define a new connection utilizing our custom connector in config/queue.php.

// config/queue.php
<?php

return [
    ...
    'connections' => [

        'sqs-generic' => [
            'driver' => 'sqs-generic',
            'key' => env('AWS_ACCESS_KEY_ID'),
            'secret' => env('AWS_SECRET_ACCESS_KEY'),
            'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
            'queue' => env('SQS_QUEUE', 'default'),
            'suffix' => env('SQS_SUFFIX'),
            'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
            'after_commit' => false,
            'job_class' => \App\Jobs\DemoJob::class,
        ],
        ...
    ],
    ...
];