Monthly Archives: July 2015

Microservices in C# Part 2: Consistent Message Delivery

Fork me on GitHub

Microservice Architecture

Microservice Architecture

Ensuring that Messages are Consumed by their Intended Recipient

This tutorial builds on the simple Microservice application that we built in the previous tutorial. Everything looks good so far, but what happens when we release this to production, and our application is consumed by multiple customers? Routing problems and message-correlation issue begin to rear their ugly heads. Our current example is simplistic. Consider a deployed application that performs work that is much more complex than our example.

Now we are faced with a problem; how to ensure that any given message is received by its intended recipient only. Consider the following process flow:

potential for mismatched message-routing

potential for mismatched message-routing

It is possible that outbound messages published from the SimpleMath Microservice may not arrive at the ASP.NET application in the same order in which the ASP.NET application initially published the corresponding request to the SimpleMath Microservice.

RabbitMQ has built-in safeguards against this scenario in the form of Correlation IDs. A Correlation ID is essentially a unique value assigned by the ASP.NET application to inbound messages, and retained throughout the entire process flow. Once processed by the SimpleMath Microservice, the Correlation ID is inserted into the associated response message, and published to the response Queue.

Upon receipt of any given message, the ASP.NET inspects the message contents, extracts the Correlation ID and compares it to the original Correlation ID. Consider the following pseudo-code:

            Message message = new Message();
            message.CorrelationID = new CorrelationID();

            RabbitMQAdapter.Instance.Publish(message.ToJson(), "MathInbound");

            string response;
            BasicDeliverEventArgs args;

            var responded = RabbitMQAdapter.Instance.TryGetNextMessage("MathOutbound", out response, out args, 5000);

            if (responded) {
                Message m = Parse(response);
                if (m.CorrelationID == message.CorrelationID) {
                    // This message is the intended response associated with the original request
                }
                else {
                    // This message is not the intended response, and is associated with a different request
                    // todo: Put this message back in the Queue so that its intended recipient may receive it...
                }
            }
            throw new HttpResponseException(HttpStatusCode.BadGateway);

What’s wrong with this solution?

It’s possible that any given message may be bounced around indefinitely, without ever reaching its intended recipient. Such a scenario is unlikely, but possible. Regardless, it is likely, given multiple Microservices, that messages will regularly be consumed by Microservices to whom the message was not intended to be delivered. This is an obvious inefficiency, and very difficult to control from a performance perspective, and impossible to predict in terms of scaling.

But this is the generally accepted solution. What else can we do?

An alternative, but discouraged solution is to invoke a dedicated Queue for each request:

dedicated queue per inbound request

dedicated queue per inbound request

Whoa! Are you suggesting that we create a new Queue for each request?!?

Yes, so let’s park that idea right there – it’s essentially a solution that won’t scale. We would place an unnecessary amount of pressure on RabbitMQ in order to fulfil this design. A new Queue for every inbound HTTP request is simply unmanageable.

Or, is it?

What if we could manage this? Imagine a dedicated pool of Queues, made available to inbound requests, such that each Queue was returned to the pool upon request completion. This might sound far-fetched, but this is essentially the way that database connection-pooling works. Here is the new flow:

consistent message routing using queue-pooling

consistent message routing using queue-pooling

Let’s walk through the code, starting with the QueuePool itself:

    public class QueuePool {
        private static readonly QueuePool _instance = new QueuePool(
            () => new RabbitMQQueue {
                Name = Guid.NewGuid().ToString(),
                IsNew = true
            });

        private readonly Func<AMQPQueue> _amqpQueueGenerator;
        private readonly ConcurrentBag<AMQPQueue> _amqpQueues;

        static QueuePool() {}

        public static QueuePool Instance { get { return _instance; } }

        private QueuePool(Func<AMQPQueue> amqpQueueGenerator) {
            _amqpQueueGenerator = amqpQueueGenerator;
            _amqpQueues = new ConcurrentBag<AMQPQueue>();

            var manager = new RabbitMQQueueMetricsManager(false, "localhost", 15672, "guest", "guest");
            var queueMetrics = manager.GetAMQPQueueMetrics();

            foreach (var queueMetric in queueMetrics.Values) {
                Guid queueName;
                var isGuid = Guid.TryParse(queueMetric.QueueName, out queueName);

                if (isGuid) {
                    _amqpQueues.Add(new RabbitMQQueue {IsNew = false, Name = queueName.ToString()});
                }
            }
        }

        public AMQPQueue Get() {
            AMQPQueue queue;

            var queueIsAvailable = _amqpQueues.TryTake(out queue);
            return queueIsAvailable ? queue : _amqpQueueGenerator();
        }

        public void Put(AMQPQueue queue) {
            _amqpQueues.Add(queue);
        }
    }

QueuePool is a static class that retains a reference to a synchronised collection of Queue objects. The most important aspect of this is that the collection is synchronised, and therefore thread-safe. Under the hood, incoming HTTP requests obtain mutually exclusive locks in order to extract a Queue from the collection. In other words, any given request that extracts a Queue is guaranteed to have exclusive access to that Queue.

Note the private constructor. Upon start-up (QueuePool will be initialised by the first inbound HTTP request) and will invoke a call to the RabbitMQ HTTP API, returning a list of all active Queues. You can mimic this call as follows:

curl -i -u guest:guest http://localhost:15672/api/queues

The list of returned Queue objects is filtered by name, such that only those Queues that are named in GUID-format are returned. QueuePool expects that all underlying Queues implement this convention in order to separate them from other Queues leveraged by the application.

Now we have a list of Queues that our QueuePool can distribute. Let’s take a look at our updated Math Controller:

            var queue = QueuePool.Instance.Get();
            RabbitMQAdapter.Instance.Publish(string.Concat(number, ",", queue.Name), "Math");

            string message;
            BasicDeliverEventArgs args;

            var responded = RabbitMQAdapter.Instance.TryGetNextMessage(queue.Name, out message, out args, 5000);
            QueuePool.Instance.Put(queue);

            if (responded) {
                return message;
            }
            throw new HttpResponseException(HttpStatusCode.BadGateway);

Let’s step through the process flow from the perspective of the ASP.NET application:

  1. Retrieves exclusive use of the next available Queue from the QueuePool
  2. Publishes the numeric input (as before) to SimpleMath Microservice, along with the Queue-name
  3. Subscribes to the Queue retrieved from QueuePool, awaiting inbound messages
  4. Receives the response from SimpleMath Microservice, which published to the Queue specified in step #2
  5. Releases the Queue, which is re-inserted into QueuePool’s underlying collection

Notice the Get method. An attempt is made to retrieve the next available Queue. If all Queues are currently in use, QueuePool will create a new Queue.

Summary

Leveraging QueuePool offers greater reliability in terms of message delivery, as well as consistent throughput speeds, given that we no longer need rely on consuming components to re-queue messages that were intended for other consumers.

It offers a degree of predictable scale – performance testing will reveal the optimal number of Queues that the QueuePool should retain in order to achieve sufficient response times.

It is advisable to determine the optimal number of Queues required by your application, so that QueuePool can avoid creating new Queues in the event of pool-exhaustion, reducing overhead.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Microservices in C# Part 1: Building and Testing

Fork me on GitHub

Microservice Architecture

Microservice Architecture

Overview

I’m often asked how to design and build a Microservice framework. It’s a tricky concept, considering loose level of coupling between Microservices. Consider the following scenario outlining a simple business process that consists of 3 sub-processes, each managed by a separate Microservice:

Microservice Architecture core concept

Microservice Architecture core concept

In order to test this process, it would seem that we need a Service Bus, or at the very least, a Service Bus mock in order to link the Microservices. How else will the Microservices communicate? Without a Service Bus, each Microservice is effectively offline, and cannot communicate with any component outside its own context. Let’s examine that concept a little bit further…maybe there is a way that we can establish at least some level of testing without a Service Bus.

A Practical Example

First, let’s expand on the previous tutorial and actually implement a simple Microservice-based application. The application will have 2 primary features:

  • Take an integer-based input and double its value
  • Take a text-based input and reverse it

Both features will be exposed through Microservices. Our first task is to define the Microservice itself. At a fundamental level, Microservices consist of a simple set of functionality:

Anatomy of a Microservice

Anatomy of a Microservice

Consider each Microservice daemon in the above diagram. Essentially, each they consist of

  • a Message Dispatcher (publishes messages to a service bus
  • an Event Listener (receives messages from a service bus
  • Proprietary business logic (to handle inbound and outbound messages)

Let’s design a simple contract that encapsulates this:

    internal interface Microservice {
        void Init();
        void OnMessageReceived(object sender, MessageReceivedEventArgs e);
        void Shutdown();
    }

Each Microservice implementation should expose this functionality. Let’s start with a simple math-based Microservice:

    public class SimpleMathMicroservice : Microservice {
        private RabbitMQAdapter _adapter;
        private RabbitMQConsumerCatchAll _rabbitMQConsumerCatchAll;

        public void Init() {
            _adapter = RabbitMQAdapter.Instance;
            _adapter.Init("localhost", 5672, "guest", "guest", 50);

            _rabbitMQConsumerCatchAll = new RabbitMQConsumerCatchAll("Math", 10);
            _rabbitMQConsumerCatchAll.MessageReceived += OnMessageReceived;

            _adapter.Connect();
            _adapter.ConsumeAsync(_rabbitMQConsumerCatchAll);
        }

        public void OnMessageReceived(object sender, MessageReceivedEventArgs e) {
            var input = Convert.ToInt32(e.Message);
            var result = Functions.Double(input);

            _adapter.Publish(result.ToString(), "MathResponse");
        }

        public void Shutdown() {
            if (_adapter == null) return;

            if (_rabbitMQConsumerCatchAll != null) {
                _adapter.StopConsumingAsync(_rabbitMQConsumerCatchAll);
            }

            _adapter.Disconnect();
        }
    }

Functionality in Detail

Init()

Establishes a connection to RabbitMQ. Remember, as per the previous tutorial, we need only a single connection. This connection is a TCP pipeline designed to funnel all communications to RabbitMQ from the Microservice, and back. Notice the RabbitMQConsumerCatchAll implementation. Here we’ve decided that in the event of an exception occurring, our Microservice will catch each exception and deal with it accordingly. Alternatively, we could have implemented RabbitMQConsumerCatchOne, which would cause the Microservice to disengage from the RabbitMQ Queue that it is listening to (essentially a Circuit Breaker, which I’ll talk about in a future post). In this instance, the Microservice is listening to a Queue called “Math”, to which messages will be published from external sources.

OnMessageReceived()

Our core business logic, in this case, multiplying an integer by 2, is implemented here. Once the calculation is complete, the result is dispatched to a Queue called “MathResponse”.

Shutdown()

Gracefully closes the underlying connection to RabbitMQ.

Unit Testing

There are several moving parts here. How do we test this? Let’s extract the business logic from the Microservice. Surely testing this separately from the application will result in a degree of confidence in the inner workings of our Microservice. It’s a good place to start.
Here is the core functionality in our SimpleMathMicroservice (Functions class in Daishi.Math):

        public static int Double(int input) {
            return input * 2;
        }

Introducing a Unit Test as follows ensures that our logic behaves as designed:

    [TestFixture]
    internal class MathTests {
        [Test]
        public void InputIsDoubled() {
            const int input = 5;
            var output = Functions.Double(input);

            Assert.AreEqual(10, output);
        }
    }

Now our underlying application logic is sufficiently covered from a Unit Testing perspective. Let’s focus on the application once again.

Entrypoint

How will users leverage our application? Do they interface with the Microservice framework through a UI? No, like all web applications, we must provide an API layer, exposing a HTTP channel through which users interact with our application. Let’s create a new ASP.NET application and edit Global.asax as follows:

            # region Microservice Init
            _simpleMathMicroservice = new SimpleMathMicroservice();
            _simpleMathMicroservice.Init();

            #endregion

            #region RabbitMQAdapter Init

            RabbitMQAdapter.Instance.Init("localhost", 5672, "guest", "guest", 100);
            RabbitMQAdapter.Instance.Connect();

            #endregion

We’re going to run an instance of our SimpleMathMicroservice alongside our ASP.NET application. This is fine for the purpose of demonstration, however each Microservice should run in its own context, as a daemon (*.exe in Windows) in a production equivalent. In the above code snippet, we initialise our SimpleMathMicroservice and also establish a separate connection to RabbitMQ to allow the ASP.NET application to publish and receive messages. Essentially, our SimpleMathService instance will run silently, listening for incoming messages on the “Math” Queue. Our adjacent ASP.NET application will publish messages to the “Math” Queue, and attempt to retrieve responses from SimpleMathService by listening to the “MathResponse” Queue. Let’s implement a new ASP.NET Controller class to achieve this:

    public class MathController : ApiController {
        public string Get(int id) {
            RabbitMQAdapter.Instance.Publish(id.ToString(), "Math");

            string message;
            BasicDeliverEventArgs args;
            var responded = RabbitMQAdapter.Instance.TryGetNextMessage("MathResponse", out message, out args, 5000);

            if (responded) {
                return message;
            }
            throw new HttpResponseException(HttpStatusCode.BadGateway);
        }
    }

Navigating to http://localhost:{port}/api/math/100 will initiate the following process flow:

  • ASP.NET application publishes the integer value 100 to the “Math” Queue
  • ASP.NET application immediately polls the “MathResponse” Queue, awaiting a response from SimpleMathMicroservice
  • SimpleMathMicroservice receives the message, and invokes Math.Functions.Double on the integer value 100
  • SimpleMathService publishes the result to “MathResponse”
  • ASP.NET application receives and returns the response to the browser.

Summary

In this example, we have provided a HTTP endpoint to access our SimpleMathMicroservice, and have abstracted SimpleMathMicroservice’ core logic, and applied Unit Testing to achieve sufficient coverage. This is an entry-level requirement in terms of building Microservices. The next step, which I will cover in Part 2, focuses on ensuring reliable message delivery.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+