Queues¶
Innmind uses the AMQP protocol to build queues.
You need to first install a server that implements this protocol (1). The most well known server is RabbitMQ.
- Only the version
0.9
is supported. (1.0
is a completely different protocol)
Installation¶
Usage¶
use Innmind\AMQP\{
Factory,
Command\DeclareExchange,
Command\DeclareQueue,
Command\Bind,
Model\Exchange\Type,
};
use Innmind\Socket\Internet\Transport;
use Innmind\TimeContinuum\Earth\Period\Second;
use Innmind\Url\Url;
$client = Factory::of($os)
->build(
Transport::tcp(),
Url::of('amqp://guest:guest@localhost:5672/'),
Second::of(1)->asElapsedPeriod(), // heartbeat
)
->with(DeclareExchange::of('crawler', Type::direct))
->with(DeclareQueue::of('parser'))
->with(Bind::of('crawler', 'parser'));
This builds the basis of an AMQP client. As is it does nothing until it's run (more in a bit). The client is immutable and each call to with
returns a new instance. In this case the $client
will create an exchange named crawler
, create a queue parser
and will route every message published to crawler
directly to parser
.
Tip
You can head to the RabbitMQ tutorials to understand exchanges, queues and how to route your messages between the two.
The first step is to publish messages before trying to consume them.
use Innmind\AMPQ\{
Model\Basic\Message,
Command\Publish,
Failure,
};
use Innmind\Immutable\Str;
$message = Message::of(
Str::of('https://github.com');
);
$client
->with(Publish::one($message)->to('crawler'))
->run(null) #(1)
->match(
static fn() => null, // success
static fn(Failure $failure) => handleFailure($failure),
);
- For now don't worry about this
null
, just know that it's required.
The client will execute anything only when the run
method is called. In this case, because we reuse the client from above, it will create the exchange, the queue and bind them together and then publish one message that will end up in the queue.
If everything works fine then it will return an Either
with null
on the right side. If any error occurs it will be a Failure
on the left side.
Info
Using a client that always declare the the exchange and queues that it requires allows for a hot declaration of your infrastructure when you try to use the client. And if the exchanges, queues and bindings already exist it will silently continue to execute as the structure is the way you expect on the AMQP server.
Then to consume the queue:
use Innmind\AMQP\{
Command\Consume,
Model\Basic\Message,
Consumer\Continuation,
Failure,
};
$client
->with(Consume::of('parser')->handle(
static function(
int $count, #(1)
Message $message,
Continuation $continuation,
): Continuation {
doStuff($message);
if ($count === 42) {
return $continuation->cancel($count);
}
return $continuation->continue($count + 1);
},
))
->run(0) #(2)
->match(
static fn(int $count) => var_dump($count),
static fn(Failure $failure) => handleFailure($failure),
);
- This argument is a carried value between each call of this function.
- This is the initial value passed to the function handling the messages.
Here we reuse the client from the first example to make sure we indeed have a parser
queue to work on. Then we consume the queue, meaning we'll wait for incoming messages and call the function when one arrives. This function behaves like a reduce operation where the initial value is 0
and is incremented each time a message is received. On the 43th message we'll handle the message and ask the client to stop consuming the queue.
At this point the run
method will return 42
on the right side of the Either
or a failure on the left side.
In this case the carried value is an int
but you can use any type you want.
Tip
If you only need to pull one message from the queue you should use Innmind\AMQP\Command\Get
instead of Consume
.
Tip
When consuming a queue by default the server will send as many messages as it can through the socket so there's no wait time when dealing the next message. However depending on the throughput of your program it can send too many messages in advance.
To prevent network saturation you can use Innmind\AMQP\Command\Qos::of(100)
where 100
is the number of messages to send in advance. Add this command before adding the Consume
command to the client.