PHP SlimQ - RabbitMQ Job Queue

I recently wrote a little helper package for dispatching jobs to a Rabbit MQ queue. I routinely deal with Rabbit at work and I use it in some side projects. It's quite nice to be able to queue jobs for later using a consumer and cron.

SlimQ

SlimQ is a simple RabbitMQ queue wrapper.

There are 3 moving parts in this package.

  • A Producer - Something which produces Jobs/Messages
  • A Consumer - Something which consumes Jobs/Messages
  • A Job - Something which implements the JobInterface

The communication happens over an Exchange and Queue. Using the empty string should work just fine for simple queues, however feel free to customize this as you need!

Code Examples

$connection = new AMQPStreamConnection(
                $host,
                5672,
                $userName,
                $password,
                $vhost
            );

$channel = $this->connection->channel();

$publisher = new SlimQ\Messaging\QueuePublisher('exchange', $channel);

$publisher->publish(JobClass::class, ['arg1' => 'value']);

The JobClass might look like.

class JobClass implements JobInterface
{
    protected $args;

    public function __invoke(array $args)
    {
        $this->args = $args;

        return true;
    }

    /**
     * @return mixed
     */
    public function getArgs()
    {
        return $this->args;
    }
}

When the consumer invokes with JobClass it will grab an instance of it from the Dependency Container and then pass in the array arguments.

The consumer sits and listens for Jobs to be published. You can run the consumer all the time in a blocking state or run it on a cron.

More concrete example

//DI-Container.php
    AMQPChannel::class => function (ContainerInterface $container)
    {
        $connection = new AMQPStreamConnection(
            $_ENV['RABBIT_HOST'],
            $_ENV['RABBIT_PORT'],
            $_ENV['RABBIT_USER'],
            $_ENV['RABBIT_PASSWORD'],
            $_ENV['RABBIT_VHOST']);

        $channel = $connection->channel();

        $channel->queue_declare(QUEUE_NAME, false, true, false, false);
        $channel->exchange_declare(EXCHANGE_NAME, 'direct', false, true, false);
        $channel->queue_bind(QUEUE_NAME, EXCHANGE_NAME);

        register_shutdown_function(function (\PhpAmqpLib\Connection\AbstractConnection $connection)
        {
            $connection->close();
        }, $connection);

        return $channel;
    }

//producer.php
$container = $app->getContainer();

$producer = $container->get(QueuePublisher::class);

$repo = $container->get(\Space\Domain\Game\Repositories\GameRepository::class);

$game = new \Space\Domain\Game\Entities\Game(new \Space\Core\Identity\Uuid());
$repo->store($game);

$producer->publish(
    \Space\Jobs\CreateMap::class, ['id' => $game->getId()->toString()]
);

//consumer.php
$app = new \Space\Core\SpaceApp();
$container = $app->getContainer();

$consumer = $container->get(\Space\Core\Messaging\QueueConsumer::class);

$consumer->consume();

//CreateMap.php
class CreateMap implements JobInterface
{
    const QUADRANT_LENGTH = 8;
    /**
     * @var GameRepository
     */
    private $gameRepository;

    /**
     * StartGame constructor.
     *
     * @param GameRepository $gameRepository
     */
    public function __construct(GameRepository $gameRepository)
    {
        $this->gameRepository = $gameRepository;
    }

    public function __invoke(array $args)
    {
        /** @var $game Game */
        $game = $this->gameRepository->find(new Uuid($args['id']));
        //.....
    }
}

I hope this boilerplate code gives you a good start on writing some consumers and producers!

Written by Glenn Eggleton on Tuesday October 3, 2017
Permalink - Chapter: Projects