Ensuring that Messages are Consumed by their Intended Recipient
This tutorial builds on the simple Microservice application that we built in the previous tutorial. Everything looks good so far, but what happens when we release this to production, and our application is consumed by multiple customers? Routing problems and message-correlation issue begin to rear their ugly heads. Our current example is simplistic. Consider a deployed application that performs work that is much more complex than our example.
Now we are faced with a problem; how to ensure that any given message is received by its intended recipient only. Consider the following process flow:
It is possible that outbound messages published from the SimpleMath Microservice may not arrive at the ASP.NET application in the same order in which the ASP.NET application initially published the corresponding request to the SimpleMath Microservice.
RabbitMQ has built-in safeguards against this scenario in the form of Correlation IDs. A Correlation ID is essentially a unique value assigned by the ASP.NET application to inbound messages, and retained throughout the entire process flow. Once processed by the SimpleMath Microservice, the Correlation ID is inserted into the associated response message, and published to the response Queue.
Upon receipt of any given message, the ASP.NET inspects the message contents, extracts the Correlation ID and compares it to the original Correlation ID. Consider the following pseudo-code:
Message message = new Message(); message.CorrelationID = new CorrelationID(); RabbitMQAdapter.Instance.Publish(message.ToJson(), "MathInbound"); string response; BasicDeliverEventArgs args; var responded = RabbitMQAdapter.Instance.TryGetNextMessage("MathOutbound", out response, out args, 5000); if (responded) { Message m = Parse(response); if (m.CorrelationID == message.CorrelationID) { // This message is the intended response associated with the original request } else { // This message is not the intended response, and is associated with a different request // todo: Put this message back in the Queue so that its intended recipient may receive it... } } throw new HttpResponseException(HttpStatusCode.BadGateway);
What’s wrong with this solution?
It’s possible that any given message may be bounced around indefinitely, without ever reaching its intended recipient. Such a scenario is unlikely, but possible. Regardless, it is likely, given multiple Microservices, that messages will regularly be consumed by Microservices to whom the message was not intended to be delivered. This is an obvious inefficiency, and very difficult to control from a performance perspective, and impossible to predict in terms of scaling.
But this is the generally accepted solution. What else can we do?
An alternative, but discouraged solution is to invoke a dedicated Queue for each request:
Whoa! Are you suggesting that we create a new Queue for each request?!?
Yes, so let’s park that idea right there – it’s essentially a solution that won’t scale. We would place an unnecessary amount of pressure on RabbitMQ in order to fulfil this design. A new Queue for every inbound HTTP request is simply unmanageable.
Or, is it?
What if we could manage this? Imagine a dedicated pool of Queues, made available to inbound requests, such that each Queue was returned to the pool upon request completion. This might sound far-fetched, but this is essentially the way that database connection-pooling works. Here is the new flow:
Let’s walk through the code, starting with the QueuePool itself:
public class QueuePool { private static readonly QueuePool _instance = new QueuePool( () => new RabbitMQQueue { Name = Guid.NewGuid().ToString(), IsNew = true }); private readonly Func<AMQPQueue> _amqpQueueGenerator; private readonly ConcurrentBag<AMQPQueue> _amqpQueues; static QueuePool() {} public static QueuePool Instance { get { return _instance; } } private QueuePool(Func<AMQPQueue> amqpQueueGenerator) { _amqpQueueGenerator = amqpQueueGenerator; _amqpQueues = new ConcurrentBag<AMQPQueue>(); var manager = new RabbitMQQueueMetricsManager(false, "localhost", 15672, "guest", "guest"); var queueMetrics = manager.GetAMQPQueueMetrics(); foreach (var queueMetric in queueMetrics.Values) { Guid queueName; var isGuid = Guid.TryParse(queueMetric.QueueName, out queueName); if (isGuid) { _amqpQueues.Add(new RabbitMQQueue {IsNew = false, Name = queueName.ToString()}); } } } public AMQPQueue Get() { AMQPQueue queue; var queueIsAvailable = _amqpQueues.TryTake(out queue); return queueIsAvailable ? queue : _amqpQueueGenerator(); } public void Put(AMQPQueue queue) { _amqpQueues.Add(queue); } }
QueuePool is a static class that retains a reference to a synchronised collection of Queue objects. The most important aspect of this is that the collection is synchronised, and therefore thread-safe. Under the hood, incoming HTTP requests obtain mutually exclusive locks in order to extract a Queue from the collection. In other words, any given request that extracts a Queue is guaranteed to have exclusive access to that Queue.
Note the private constructor. Upon start-up (QueuePool will be initialised by the first inbound HTTP request) and will invoke a call to the RabbitMQ HTTP API, returning a list of all active Queues. You can mimic this call as follows:
curl -i -u guest:guest http://localhost:15672/api/queues
The list of returned Queue objects is filtered by name, such that only those Queues that are named in GUID-format are returned. QueuePool expects that all underlying Queues implement this convention in order to separate them from other Queues leveraged by the application.
Now we have a list of Queues that our QueuePool can distribute. Let’s take a look at our updated Math Controller:
var queue = QueuePool.Instance.Get(); RabbitMQAdapter.Instance.Publish(string.Concat(number, ",", queue.Name), "Math"); string message; BasicDeliverEventArgs args; var responded = RabbitMQAdapter.Instance.TryGetNextMessage(queue.Name, out message, out args, 5000); QueuePool.Instance.Put(queue); if (responded) { return message; } throw new HttpResponseException(HttpStatusCode.BadGateway);
Let’s step through the process flow from the perspective of the ASP.NET application:
- Retrieves exclusive use of the next available Queue from the QueuePool
- Publishes the numeric input (as before) to SimpleMath Microservice, along with the Queue-name
- Subscribes to the Queue retrieved from QueuePool, awaiting inbound messages
- Receives the response from SimpleMath Microservice, which published to the Queue specified in step #2
- Releases the Queue, which is re-inserted into QueuePool’s underlying collection
Notice the Get
method. An attempt is made to retrieve the next available Queue. If all Queues are currently in use, QueuePool will create a new Queue.
Summary
Leveraging QueuePool offers greater reliability in terms of message delivery, as well as consistent throughput speeds, given that we no longer need rely on consuming components to re-queue messages that were intended for other consumers.
It offers a degree of predictable scale – performance testing will reveal the optimal number of Queues that the QueuePool should retain in order to achieve sufficient response times.
It is advisable to determine the optimal number of Queues required by your application, so that QueuePool can avoid creating new Queues in the event of pool-exhaustion, reducing overhead.
Connect with me: