Category Archives: Microservice Architecture

Microservices with C# and RabbitMQ

Fork me on GitHub

Microservices in Action

Microservices in Action

Microservices with C# and RabbitMQ

Overview

Microservices are groupings of lightweight services, interconnected, although independent of each other, without direct coupling or dependency.

Microservices allow flexibility in terms of infrastructure; application traffic is routed to collections of services that may be distributed across CPU, disk, machine and network as opposed to a single monolithic platform designed to manage all traffic.

Anatomy of a Microservice

In its simplest form, a Microservice consists of an event-listener and a message-dispatcher. The event-listener polls a service-bus – generally a durable message-queue – and handles incoming messages. Messages consist of instructions bound in metadata and encoded in a data-interchange format such as JSON, or Protobuf.

Anatomy of a Microservice

Anatomy of a Microservice

Daishi.AMQP Key Components

Microservices Class Diagram - New Page

Daishi.AMQP Class Architecture

Connecting to RabbitMQ

Everything starts with an abstraction. The Daishi.AMQP library abstracts all AMQP components, and provides RabbitMQ implementations. First, we need to connect to RabbitMQ. Let’s look at the Connect method in the RabbitMQAdapter class:

        public override void Connect() {
            var connectionFactory = new ConnectionFactory {
                HostName = hostName,
                Port = port,
                UserName = userName,
                Password = password,
                RequestedHeartbeat = heartbeat
            };

            if (!string.IsNullOrEmpty(virtualHost)) connectionFactory.VirtualHost = virtualHost;
            _connection = connectionFactory.CreateConnection();
        }

A connection is established on application start-up, and is ideally maintained for the duration of the application’s lifetime.

Consuming Messages

A single running instance (generally an *.exe, or daemon) can connect to RabbitMQ and consume messages in a single-threaded, blocking manner. However, this is not the most scalable solution. Processes that read messages from RabbitMQ must subscribe to a Queue, or Exchange. Once subscribed, RabbitMQ manages message delivery, in terms of even-distribution through round-robin, or biased distribution, depending on your Quality of Service (QOS) configuration. Please refer to this post for a more detailed explanation as to how this works.

For now, consider that our Microservice executable can generate multiple processes, each running on a dedicated thread, to consume messages from RabbitMQ in a parallel manner. The AMQPAdapter class contains a method designed to invoke such processes:

        public void ConsumeAsync(AMQPConsumer consumer) {
            if (!IsConnected) Connect();

            var thread = new Thread(o => consumer.Start(this));
            thread.Start();

            while (!thread.IsAlive)
                Thread.Sleep(1);
        }

Notice the input variable of type “AMQPConsumer”. Let’s take a look at that class in more detail:

        public event EventHandler<MessageReceivedEventArgs> MessageReceived;

        public virtual void Start(AMQPAdapter amqpAdapter) {
            stopConsuming = false;
        }

        public void Stop() {
            stopConsuming = true;
        }

        protected void OnMessageReceived(MessageReceivedEventArgs e) {
            var handler = MessageReceived;
            if (handler != null) handler(this, e);
        }

Essentially, the class contains Start and Stop methods, and an event-handler to handle message-delivery. Like most classes in this project, this is an AMQP abstraction. Here is the RabbitMQ implementation:

        protected void Start(AMQPAdapter amqpAdapter, bool catchAllExceptions) {
            base.Start(amqpAdapter);
            try {
                var connection = (IConnection) amqpAdapter.GetConnection();

                using (var channel = connection.CreateModel()) {
                    if (createQueue) channel.QueueDeclare(queueName, true, false, false, queueArgs);
                    channel.BasicQos(0, prefetchCount, false);

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, noAck, consumer);

                    while (!stopConsuming) {
                        try {
                            BasicDeliverEventArgs;
                            var messageIsAvailable = consumer.Queue.Dequeue(timeout, out basicDeliverEventArgs);

                            if (!messageIsAvailable) continue;
                            var payload = basicDeliverEventArgs.Body;

                            var message = Encoding.UTF8.GetString(payload);
                            OnMessageReceived(new MessageReceivedEventArgs {
                                Message = message,
                                EventArgs = basicDeliverEventArgs
                            });

                            if (implicitAck && !noAck) channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
                        }
                        catch (Exception exception) {
                            OnMessageReceived(new MessageReceivedEventArgs {
                                Exception = new AMQPConsumerProcessingException(exception)
                            });
                            if (!catchAllExceptions) Stop();
                        }
                    }
                }
            }
            catch (Exception exception) {
                OnMessageReceived(new MessageReceivedEventArgs {
                    Exception = new AMQPConsumerInitialisationException(exception)
                });
            }

Let’s Start from the Start

Connect to a RabbitMQ instance as follows:

            var adapter = RabbitMQAdapter.Instance;

            adapter.Init("hostName", 1234, "userName", "password", 50);
            adapter.Connect();

Notice the static declaration of the RabbitMQAdapter class. RabbitMQ connections in this library are thread-safe; a single connection will facilitate all requests to RabbitMQ.

RabbitMQ implements the concept of Channels, which are essentially subsets of a physical connection. Once a connection is established, Channels, which are logical segments of the underlying Connection, can be invoked in order to interface with RabbitMQ. A single RabbitMQ connection can support up to 65,535 Channels, although I would personally scale out client instances, rather than establish such a high number of Channels. Let’s look at publishing a message to RabbitMQ:

        public override void Publish(string message, string exchangeName, string routingKey,
            IBasicProperties messageProperties = null) {
            if (!IsConnected) Connect();
            using (var channel = _connection.CreateModel()) {
                var payload = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchangeName, routingKey,
                    messageProperties ?? RabbitMQProperties.CreateDefaultProperties(channel), payload);
            }
        }

Notice the _connection.CreateModel() method call. This establishes a Channel to interface with RabbitMQ. The Channel is encapsulated within a using block; once we’ve completed our operation, the Channel may be disposed. Channels are relatively cheap to create, in terms of resources, and may be created and dropped liberally.

Messages are sent in UTF-8, byte-format. Here is how to publish a message to RabbitMQ:

            var message = "Hello, World!";
            adapter.Publish(message, "queueName");

This method also contains overloaded exchangeName and routingKey parameters. These are used to control the flow of messages through RabbitMQ resources. This concept is well documented here.

Now let’s attempt to read our message back from RabbitMQ:

            string output;
            BasicDeliverEventArgs eventArgs;
            adapter.TryGetNextMessage("queueName", out output, out eventArgs, 50);

The tryGetNextMessage method reads the next message from the specified Queue, when available. The method will return false in the event that the Queue is empty, after the specified timeout period has elapsed.

Complete code listing below:

        private static void Main(string[] args) {
            var adapter = RabbitMQAdapter.Instance;

            adapter.Init("hostName", 1234, "userName", "password", 50);
            adapter.Connect();

            var message = "Hello, World!";
            adapter.Publish(message, "queueName");

            string output;
            BasicDeliverEventArgs eventArgs;
            adapter.TryGetNextMessage("queueName", out output, out eventArgs, 50);
        }

Consistent Message Polling

Reading 1 message at a time may not be the most efficient means of consuming messages. I mentioned the AMQPConsumer class at the beginning of this post. The following code outlines a means to continuously read messages from a RabbitMQ Queue:

            var consumer = new RabbitMQConsumerCatchAll("queueName", 10);
            adapter.ConsumeAsync(consumer);

            Console.ReadLine();
            adapter.StopConsumingAsync(consumer);

Note the RabbitMQConsumerCatchAll class instantiation. This class is an implementation of RabbitMQConsumer. All  potential exceptions that occur will be handled by this consumer and persisted back to the client along the same Channel as valid messages. As an alternative, the RabbitMQConsumerCatchOne instance can be leveraged instead. Both classes achieve the same purpose, with the exception of their error-handling logic. The RabbitMQConsumerCatchOne class will disconnect from RabbitMQ should an exception occur.

Summary

The Daishi.AMQP library provides a means of easily interfacing with AMQP-driven queuing mechanisms, with built-in support for RabbitMQ, allowing .NET developers to easily integrate solutions with RabbitMQ. Click here for Part 1 in a tutorial series outlining the means to leverage Daishi.AMQP in your .NET application.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Building a Highly Available, Durable in-memory Cache

Overview

Caching strategies have become an integral component in today’s software applications. Distributed computing has resulted in caching strategies that have grown quite complex. Coupled with Cloud computing, caching has become something of a dark art. Let’s walk through the rationale behind a cache, the mechanisms that drive it, and how to achieve a highly available, durable cache, without persisting to disk.

Why We Need a Cache

Providing fast data-access

Data stores are growing larger and more distributed. Caches provide fast read capability and enhanced performance vs. reading from disk. Data distributed across multiple hardware stacks, across multiple geographic locations can be centralised at locations geographically close to application users.

Absorbing traffic surges

Sudden bursts in traffic can cause contention in terms of data-persistence. Storing data in memory removes the overhead involved in disk I/O operations, easing the burden on network resources and application threads.

Augmenting NoSQL

NoSQL has gained traction to the extent that it is now pervasive. Many NoSQL offerings, such as Couchbase, implement an eventual-consistency model; essentially, data will eventually persist to disk at some point after a write operation is invoked. This is an effective big data management strategy, however, it results in potential pitfalls on the consuming application-side. Consider an operation originating from an application that expects data to be written immediately. The application may not have the luxury of waiting until the data eventually persists. Caching the data ensures almost immediate availability.

Another common design in NoSQL technology is to direct both reads and writes, that are associated with the same data segment, to the node on which the data segment resides. This minimises node-hopping and ensures efficient data-flow. Caching can further augment this process by reducing the NoSQL data-store’s requirement to manage traffic by providing a layer of cached metadata before the data-store, minimising resource-consumption. The following design illustrates the basic structure of a managed cache in a hosted environment using Aerospike – a flash optimised, in-memory database:

Distributed Cache

Distributed Cache

 

High Availability and the Cloud

High availability is a principal applied to hosted solutions, ensuring that the system will be online, if even partly, regardless of failure. Failure takes into account not just hardware or software failure, such as disk failure, or out-of-memory exceptions, but also controlled failure, such as machine maintenance.

How Super Data Centers Manage Infrastructure

Data Centers, such as those managed by Amazon Web Services and Microsoft Azure, distribute infrastructure across regions – physical locations separated geographically. Infrastructure contained within each region is further segmented into Availability Zones, or Availability Sets. These are physical groupings of hosted services within hardware stacks – e.g., server racks. Hardware is routinely patched, maintained, and upgraded within Data Centers. This is applied in a controlled manner, such that resources contained within Availability Zone/Set X will not be taken offline at the same time as resources contained within Availability Zone/Set Z.

Durability and the Cloud

To achieve high availability in hosted applications, the applications should be distributed across Availability Zones/Sets, at least. To further enhance the degree of availability, applications can be distributed across separate regions. Consider the following design:

Highly available, durable, cloud-based cache

Highly available, durable, cloud-based cache

When Things Fall Over

Notice that the design provides 8 Cache servers, distributed evenly across both region and availability zone. Thus, should any given Availability Zone fail, 3 Availability Zones will remain online. In the unlikely event that a Data Center fails, and all Availability Zones fail, the second region will remain online – our application can be said to be highly available.

Note that the design includes AWS Simple Queue Service (SQS) to achieve Cross Data Center Data Replication (XDR). The actual implementation, which I will address in an upcoming post, is slightly more complex, and is simplified here for clarity. Enterprise solutions, such as Aerospike and Couchbase offer XDR as a function.

Traffic is load balanced evenly (or in a more suitable manner) across Availability Zones. A Global DNS service, such as AWS Route 53, directs traffic to each region. In situations where all regions and Availability Zones are available, we might consider distributing traffic based on geographic location. Users based in Ireland can be routed to AWS-Dublin, while German users might be routed to AWS-Frankfurt, for example. Route 53 can be configured to distribute all traffic to live regions, should any given region fail entirely.

Taking Things a Step Further by Minimising PCI DSS Exposure

Applications that handle financial data, such as Merchants, must comply with the requirements outlined by the PCI Data Security Standard. These requirements apply based on your application configuration. For example, storing payment card details on disk requires a higher level of adherence to PCI DSS than offloading the storage effort to a 3rd party.

Requirements for Handling Financial Data

The PCI DSS define data as 2 logical entities; data-in-transit and data-at-rest. Data-at-rest is essentially data that has been persisted to a data-store. Data-in-transit applies to data stored in RAM, although the requirements do not specify that this data must be transient – that it must have a point of origin and a destination. Therefore, storing data in RAM would, at least from a legal-perspective, result in a reduced level of PCI DSS exposure, in that requirements pertaining to storing data on disk, such as encryption, do not apply.

Of course, this raises the question; should sensitive data always be persisted to hard-storage? Or, is storing data in a highly available and durable cache sufficient? I suspect at this point that you might feel compelled to post a strongly-worded comment outlining that this idea is ludicrous – but is it really? Can an in-memory cache, once distributed and durable enough to withstand multiple degrees of failure, operate with the same degree of reliability as a hard data-store? I’d certainly like to prove the concept.

Summary

Caching data allows for increased throughput and optimised application performance. Enhancing this concept further, by distributing your cache across physical machine-boundaries, and further still across multiple geographical locations, results in a highly available, durable in-memory storage mechanism.

Hosting cache servers within close proximity to your customers allows for reduced latency and an enhanced user-experience, as well as providing for several degrees of failure; from component, to software, to Availability Zone/Set, to entire region failure.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

JSON# – Tutorial #5: Deserialising Complex Objects

Fork on Github
Download the Nuget package

The previous tutorial focused on deserialising simple JSON objects. This tutorial describes the process of deserialising a more complex object using JSON#.

Let’s use the ComplexObject class that we’ve leveraged in earlier tutorials:

class ComplexObject : IHaveSerialisableProperties {
    public string Name { get; set; }
    public string Description { get; set; }
    public List<ComplexArrayObject> ComplexArrayObjects { get; set; }
    public List<double> Doubles { get; set; }

    public SerialisableProperties GetSerializableProperties() {
        return new SerialisableProperties("complexObject", new List<JsonProperty> {
            new StringJsonProperty {
                Key = "name",
                Value = Name
            },
            new StringJsonProperty {
                Key = "description",
                Value = Description
            }
            }, new List<JsonSerialisor> {
                    new ComplexJsonArraySerialisor("complexArrayObjects",
                        ComplexArrayObjects.Select(c => c.GetSerializableProperties())),
                    new JsonArraySerialisor("doubles",
                        Doubles.Select(d => d.ToString(CultureInfo.InvariantCulture)), JsonPropertyType.Numeric)
            });
        }
    }

Let’s instantiate this with some values, and serialise to JSON. I won’t bloat this post with details on how to serialise, covered in previous posts. Here is our serialised ComplexObject instance:

{"complexObject":{"name":"Complex Object","description":"A complex object","complexArrayObjects":[{"name":"Array Object #1","description":"The 1st array object"},{"name":"Array Object #2","description":"The 2nd array object"}],"doubles":[1,2.5,10.8]}}

Notice that we have 2 collections. A simple collection of Doubles, and a more complex collection of ComplexArrayObjects. Let’s start with those.

First, create a new class, ComplexObjectDeserialiser, and implement the required constructor and Deserialise method.

Remember this method from the previous tutorial?

    var properties = jsonNameValueCollection.Parse(mergeArrayValues);

This effectively parses the JSON and loads each element into a NameValueCollection. This is fine for simple properties, however collection-based properties would cause the deserialiser to load each collection-element as a separate item in the returned NameValueCollection, which may be somewhat cumbersome to manage:

Flattened JSON collection

Flattened JSON collection

This is where the Boolean parameter mergeArrayValues comes in. It will concatenate all collection-based values in a comma-delimited string, and load this value into the returned NameValueCollection. This is much more intuitive, and allows consuming code to simply split the comma-delimited values and iterate as required.

Compressed JSON Collection

Compressed JSON Collection

Here is the complete code-listing:

class ComplexObjectDeserialiser : Deserialiser<ComplexObject> {
        public ComplexObjectDeserialiser(JsonNameValueCollection jsonNameValueCollection) : base(jsonNameValueCollection) {}

        public override ComplexObject Deserialise(bool mergeArrayValues = false) {
            var properties = jsonNameValueCollection.Parse(mergeArrayValues);

            var complexObject = new ComplexObject {
                Name = properties["complexObject.name"],
                Description = properties["complexObject.description"],
                ComplexArrayObjects = new List<ComplexArrayObject>(),
                Doubles = new List<double>()
            };

            var complexArrayObjectNames = properties["complexObject.complexArrayObjects.name"].Split(',');
            var complexArrayObjectDescriptions = properties["complexObject.complexArrayObjects.description"].Split(',');

            for (var i = 0; i &lt; complexArrayObjectNames.Length; i++) {
                var complexArrayObjectName = complexArrayObjectNames[i];

                complexObject.ComplexArrayObjects.Add(new ComplexArrayObject {
                    Name = complexArrayObjectName,
                    Description = complexArrayObjectDescriptions[i]
                });
            }

            var complexArrayObjectDoubles = properties["complexObject.doubles"].Split(',');

            foreach (var @double in complexArrayObjectDoubles)
                complexObject.Doubles.Add(Convert.ToDouble(@double));

            return complexObject;
        }
    }

As before, we deserialise as follows:

    Json.Deserialise(new ComplexObjectDeserialiser(new StandardJsonNameValueCollection("<JSON string...>")), true);

JSON# does most of the work, loading each serialised element into a NameValueCollection. We simply read from that collection, picking and choosing each element to map to an associated POCO property.
For collection-based properties, we simply retrieve the value associated with the collection’s key, split that value into an array, and loop through the array, loading a new object for each item in the collection, building our ComplexObject step-by-step.

This is the final JSON# post covering tutorial-based material. I’m working on a more thorough suite of performance benchmarks, and will publish the results, as well as offering a more in-depth technical analysis of JSON# in future posts. Please contact me if you would like to cover a specific topic.

The next posts will feature a tutorial series outlining Object Oriented, Test Driven Development in C# and Java. Please follow this blog to receive updates.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

JSON# – Tutorial #4: Deserialising Simple Objects

Fork on Github
Download the Nuget package

The previous tutorial focused on serialising complex JSON objects. This tutorial describes the process of deserialisation using JSON#.

The purpose of JSON# is to allow memory-efficient JSON processing. At the root of this is the ability to dissect large JSON files and extract smaller structures from within, as discussed in Tutorial #1.

One thing, among many, that JSON.NET achieves, is allow memory-efficient JSON-parsing using the JsonTextReader component. The last thing that I want to do is reinvent the wheel; to that end, JSON# also provides a simple means of deserialisation, which wraps JsonTextReader. Using JsonTextReader alone, requires quite a lot of custom code, so I’ve provided a wrapper to make things simpler and allow reusability.

At the root of the deserialization process lies the StandardJsonNameValueCollection class. This is an implementation of the JsonNameValueCollection, from which custom implementations can be derived, if necessary, in a classic bridge design, leveraged from the Deserialiser component. Very simply, it reads JSON node values and stores them as key-value pairs; they key is the node’s path from root, and the value is the node’s value. This allows us to cache the JSON object and store it in a manner that provides easy access to its values, without traversing the tree:

class StandardJsonNameValueCollection : JsonNameValueCollection {
    public StandardJsonNameValueCollection(string json) : base(json) {}

    public override NameValueCollection Parse() {
    var parsed = new NameValueCollection();

    using (var reader = new JsonTextReader(new StringReader(json))) {
        while (reader.Read()) {
            if (reader.Value != null &amp;&amp; !reader.TokenType.Equals(JsonToken.PropertyName))
                parsed.Add(reader.Path, reader.Value.ToString());
            }
            return parsed;
        }
    }
}

Let’s work through an example using our SimpleObject class from previous tutorials:

class SimpleObject : IHaveSerialisableProperties {
    public string Name { get; set; }
    public int Count { get; set; }

    public virtual SerialisableProperties GetSerializableProperties() {
        return new SerialisableProperties("simpleObject", new List {
            new StringJsonProperty {
                Key = "name",
                Value = Name
            },
            new NumericJsonProperty {
                Key = "count",
                Value = Count
            }
        });
    }
}

Consider this class represented as a JSON object:

{
    "simpleObject": {
        "name": "SimpleObject",
        "count": 1
    }
}

The JSON# JsonNameValueCollection implementation will read each node in this JSON object and return the values in a NameValueCollection. Once stored, we need only provide a mechanism to instantiate a new SimpleObject POCO with the key-value pairs. JSON# provides the Deserialiser class as an abstraction to provide this functionality. Let’s create a class that accepts the JsonNameValueCollection and uses it to populate an associated POCO:

class SimpleObjectDeserialiser : Deserialiser {
    public SimpleObjectDeserialiser(JsonNameValueCollection parser) : base(parser) {}

    public override SimpleObject Deserialise() {
        var properties = jsonNameValueCollection.Parse();
        return new SimpleObject {
            Name = properties.Get("simpleObject.name"),
            Count = Convert.ToInt32(properties.Get("simpleObject.count"))
        };
    }
}

This class contains a single method designed to map properties. As you can see from the code snippet above, we read each key as a representation of corresponding node’s path, and then bind the associated value to a POCO property.
JSON# leverages the SimpleObjectDeserialiser as a bridge to deserialise the JSON string:

const string json = "{\"simpleObject\":{\"name\":\"SimpleObject\",\"count\":1}}";
var simpleObjectDeserialiser = new SimpleObjectDeserialiser(new StandardJsonNameValueCollection(json));
var simpleObject = Json.Deserialise(_simpleObjectDeserialiser);

So, why do this when I can just bind my objects dynamically using JSON.NET:

JsonConvert.DeserializeObject(json);

There is nothing wrong with deserialising in the above manner. But let’s look at what happens. First, it will use Reflection to look up CLR metadata pertaining to your object, in order to construct an instance. Again, this is ok, but bear in mind the performance overhead involved, and consider the consequences in a web application under heavy load.

Second, it requires that you decorate your object with serialisation attributes, which lack flexibility in my opinion.

Thirdly, if your object is quite large, specifically over 85K in size, you may run into memory issues if your object is bound to the Large Object Heap.

Deserialisation using JSON#, on the other hand, reads the JSON in stream-format, byte-by-byte, guaranteeing a small memory footprint, nor does it require Reflection. Instead, your custom implementation of Deserialiser allows a greater degree of flexibility when populating your POCO, than simply decorating properties with attributes.

I’ll cover deserialising more complex objects in the next tutorial, specifically, objects that contain complex properties such as arrays.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

JSON# – Tutorial #3: Serialising Complex Objects

Fork on Github
Download the Nuget package

The last tutorial focused on serialising simple JSON objects. This tutorial contains a more complex example.

Real-world objects are generally more complex than typical “Hello, World” examples. Let’s build such an object; and object that contains complex properties, such as other objects and collections. We’ll start by defining a sub-object:

class SimpleSubObject: IHaveSerialisableProperties {
    public string Name { get; set; }
    public string Description { get; set; }

    public SerialisableProperties GetSerializableProperties() {
        return new SerialisableProperties(&quot;simpleSubObject&quot;, new List&lt;JsonProperty&gt; {
            new StringJsonProperty {
                Key = &quot;name&quot;,
                Value = Name
            },
            new StringJsonProperty {
                Key = &quot;description&quot;,
                Value = Description
            }
        });
    }
}

This object contains 2 simple properties; Name and Description. As before, we implement the IHaveSerialisableProperties interface to allow JSON# to serialise the object. Now let’s define an object with a property that is a collection of SimpleSubObjects:

class ComplexObject: IHaveSerialisableProperties {
    public string Name { get; set; }
    public string Description { get; set; }

    public List&lt;SimpleSubObject&gt; SimpleSubObjects { get; set; }
    public List&lt;double&gt; Doubles { get; set; }

    public SerialisableProperties GetSerializableProperties() {
        return new SerialisableProperties(&quot;complexObject&quot;, new List&amp;lt;JsonProperty&amp;gt; {
            new StringJsonProperty {
                Key = &quot;name&quot;,
                Value = Name
            },
            new StringJsonProperty {
                Key = &quot;description&quot;,
                Value = Description
            }
        }, 
        new List&lt;JsonSerialisor&gt; {
            new ComplexJsonArraySerialisor(&quot;simpleSubObjects&quot;,
                SimpleSubObjects.Select(c =&amp;gt; c.GetSerializableProperties())),
            new JsonArraySerialisor(&quot;doubles&quot;,
                Doubles.Select(d =&amp;gt; d.ToString(CultureInfo.InvariantCulture)), JsonPropertyType.Numeric)
        });
    }
}

This object contains some simple properties, as well as 2 collections; the first, a collection of Double, the second, a collection of SimpleSubObject type.

Note the GetSerializableProperties method in ComplexObject. It accepts a collection parameter of type JsonSerialisor, whichrepresents the highest level of abstraction in terms of the core serialisation components in JSON#. In order to serialise our collection of SimpleSubObjects, we leverage an implementation of JsonSerialisor called ComplexJsonArraySerialisor, designed specifically to serialise collections of objects, as opposed to primitive types. Given that each SimpleSubObject in our collection contains an implementation of GetSerializableProperties, we simply pass the result of each method to the ComplexJsonArraySerialisor constructor. It will handle the rest.

We follow a similar process to serialise the collection of Double, in this case leveraging JsonArraySerialisor, another implementation of JsonSerialisor, specifically designed to manage collections of primitive types. We simply provide the collection of Double in their raw format to the serialisor.

Let’s instantiate a new instance of ComplexObject:

var complexObject = new ComplexObject {
    Name = &quot;Complex Object&quot;,
    Description = &quot;A complex object&quot;,

    SimpleSubObjects = new List&lt;SimpleSubObject&gt; {
        new SimpleSubObject {
            Name = &quot;Sub Object #1&quot;,
            Description = &quot;The 1st sub object&quot;
        },
            new SimpleSubObject {
            Name = &quot;Sub Object #2&quot;,
            Description = &quot;The 2nd sub object&quot;
        }
    },
    Doubles = new List&lt;double&gt; {
        1d, 2.5d, 10.8d
    }
};

As per the previous tutorial, we serialise as follows:

var writer = new BinaryWriter(new MemoryStream(), new UTF8Encoding(false));
var serialisableProperties = complexObject.GetSerializableProperties();

using (var serialisor = new StandardJsonSerialisationStrategy(writer))
    Json.Serialise(serialisor, new JsonPropertiesSerialisor(serialisableProperties));

Note the use of StandardJsonSerialisationStrategy here. This is the only implementation of JsonSerialisationStrategy, one of the core serialisation components in JSON#. The abstraction exists to provide extensibility, so that different strategies might be applied at runtime, should specific serialisation rules vary across requirements.

In the next tutorial I’ll discuss deserialising objects using JSON#.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Load Balancing a RabbitMQ Cluster

The Problem

Given a cluster of RabbitMQ nodes, we want to achieve effective load-balancing.

The Solution

First, let’s look at the feature at the core of RabbitMQ – the Queue itself.
RabbitMQ Queues are singular structures that do not exist on more than one Node. Let’s say that you have a load-balanced, HA (Highly Available) RabbitMQ cluster as follows:

RabbitMQ Cluster with Load Balancer

RabbitMQ Cluster with Load Balancer

Nodes 1 – 3 are replicated across each other, so that snapshots of all HA-compliant Queues are synchronised across each node. Suppose that we log onto the RabbitMQ Admin Console and create a new HA-configured Queue. Our Load Balancer is configured in a Round Robin manner, and in this instance, we are directed to Node #2, for argument’s sake. Our new Queue is created on Node #2. Note: it is possible to explicitly choose the node that you would like your Queue to reside on, but let’s ignore that for the purpose of this example.

Now our new Queue, “NewQueue”, exists on Node #2. Our HA policy kicks in, and the Queue is replicated across all nodes. We start adding messages to the Queue, and those messages are also replicated across each node. Essentially, a snapshot of the Queue is taken, and this snapshot is replicated across each node after a non-determinable period of time has elapsed (it actually occurs as an asynchronous background task, when the Queue’s state changes).

We may look at this from an intuitive perspective and conclude that each node now contains an instance of our new Queue. Thus, when we connect to RabbitMQ, targeting “NewQueue”, and are directed to an appropriate node by our Load Balancer, we might assume that we are connected to the instance of “NewQueue” residing on that node. This is not the case.

I mentioned that a RabbitMQ Queue is a singular structure. It exists only on the node that it was created, regardless of HA policy. A Queue is always its own master, and consists of 0…N slaves. Based on the above example, “NewQueue” on Node #2 is the Master-Queue, because this is the node on which the Queue was created. It contains 2 Slave-Queues – it’s counterparts on nodes #1 and #3. Let’s assume that Node #2 dies, for whatever reason; let’s say that the entire server is down. Here’s what will happen to “NewQueue”.

  1. Node #2 does not return a heartbeat, and is considered de-clustered
  2. The “NewQueue” master Queue is no longer available (it died with Node #2)
  3. RabbitMQ promotes the “NewQueue” slave instance on either Node #1 or #3 to master

This is standard HA behaviour in RabbitMQ. Let’s look at the default scenario now, where all 3 nodes are alive and well, and the “NewQueue” instance on Node #2 is still master.

  1. We connect to RabbitMQ, targeting “NewQueue”
  2. Our Load Balancer determines an appropriate Node, based on round robin
  3. We are directed to an appropriate node (let’s say, Node #3)
  4. RabbitMQ determines that the “NewQueue” master node is on Node #2
  5. RabbitMQ redirects us to Node #2
  6. We are successfully connected to the master instance of “NewQueue”

Despite the fact that our Queues are replicated across each HA node, there is only one available instance of each Queue, and it resides on the node on which it was created, or in the case of failure, the instance that is promoted to master. RabbitMQ is conveniently routing us to that node in this case:

RabbitMQ Cluster Exhibiting Extra Network-hop

RabbitMQ Cluster Exhibiting Extra Network-hop

Unfortunately for us, this means that we suffer an extra, unnecessary network hop in order to reach our intended Queue. This may not seem a major issue, but consider that in the above example, with 3 nodes and an evenly-balanced Load Balancer, we are going to incur that extra network hop on approximately 66% of requests. Only one in every three requests (assuming that in any grouping of three unique requests we are directed to a different node) will result in our request being directed to the correct node.

“Does this mean that we can’t implement round robin load-balancing?”
“No, but if we do, it will result in a less than optimal solution.”
“So, what’s the alternative?”

Well, in order to ensure that each request is routed to the correct node, we have two choices:

  • Explicitly connect to the node on which the target Queue resides
  • Spread Queues evenly as possible across nodes

Both solutions immediately introduce problems. In the first instance, our client application must be aware of all nodes in our RabbitMQ cluster, and must also know where each master Queue resides. If a Node goes down, how will our application know? Not to mention that this design breaks the Single Responsibility principle, and increases the level of coupling in the application.

The second solution offers a design in which our Queues are not linked to single nodes. Based on our “NewQueue” example, we would not simply instantiate a new Queue on a single node. Instead, in a 3-node scenario, we might instantiate 3 Queues; “NewQueue1”, “NewQueue2” and “NewQueue3”, where each Queue is instantiated on a separate node.

The second solution offers a design in which our Queues are not linked to single Nodes. Based on our “NewQueue” example, we would not simply instantiate a new Queue on a single Node. Instead, in a 3-node scenario, we might instantiate 3 Queues; “NewQueue1”, “NewQueue2” and “NewQueue3”, where each Queue is instantiated on a separate Node:

RabbitMQ Cluster with Separate Queues

RabbitMQ Cluster with Separate Queues


Our client application can now implement, for example, a simple randomising function that selects one of the above Queues and explicitly connects to it. In a web-based application, given 3 separate HTTP requests, each request would target one of the above Queues, and no Queue would feature more than once across all 3 requests. Now we’ve achieved a reasonable balance of load across our cluster, without the use of a traditional Load Balancer.
RabbitMQ Cluster with Randomiser

RabbitMQ Cluster with Randomiser


But we’re still faced with the same problem; our client application needs to know where our Queues reside. So let’s look at advancing the solution further, so that we can avoid this shortcoming.

Firstly, we need to provide mapping metadata that describes our RabbitMQ infrastructure. Specifically, where Queues reside. This should be a resilient data-source such as a database or cache, as opposed to something like a flat file, because multiple sources (2, at least) may access this data concurrently.

Now introduce an always-on service that polls RabbitMQ, determining whether or not nodes are alive. New Queues should also be registered with this service, and it should keep an up-to-date registry, providing metadata about Nodes and their Queues:

RabbitMQ Cluster with Monitor Service

RabbitMQ Cluster with Monitor Service


Our client application, on initial load, should poll this service and retrieve RabbitMQ metadata, which should then be retained for incoming requests. Should a request fail due to the fact that a Node becomes compromised, the client application can poll the Queue Metadata Store, return up-to-date RabbitMQ metadata, and re-route the message to a working Node.

This approach is a design that I’ve had some success with. From a conceptual point-of-view, it forms a small section of an overall Microservice Architecture, which I’ll talk about in a future post.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

RabbitMQ QOS vs. Competing Consumers

I recently answered a question on stackoverflow that revolved around this argument:
“Should I scale out my AMQP consumers, tweak the QOS values, or both?”
This is a broad question. The first thing to consider is the actual business process that facilitated by AMQP. Is your business process time-sensitive? For example, let’s say that you have a process which persist data to a database. How important from a business perspective is it that this data is saved immediately?

Quality of Service (QOS)

AMQP Channels contain a property called QOS. The value of this property determines the manner in which AMQP Consumers read messages from Queues. A QOS value of “1” ensures that only a single message at a time will be de-queued. The next message will not be processed until the current message has been handled. Consider the implications of this. Given a Queue that contains 5 messages, and a Consumer, with QOS set to “1”, that reads from that Queue, message #5 will not be processed until messages 1 – 4 have been de-queued and processed:

AMQP Queue with 5 Messages

AMQP Queue with 5 Messages

If it takes 200ms to process each message, then M5 will not be completely processed for 1 second. This figure will rise exponentially as the Queue grows in length. If a Publisher, or set of Publishers suddenly load 1,000 messages onto the Queue, our Consumer’s processing time now becomes minutes, let alone seconds.

Single AMQP Consumer

Single AMQP Consumer

So let’s increase our QOS value to “5”. Now our Consumer reads from the Queue at a rate of 5 messages at a time. Our Queue is now empty. But what’s happening at the Consumer? The Consumer manages its own Shared Queue in memory. This Shared Queue is a local representation of an AMQP Queue. When a Consumer de-queues a message, that message is cached in the Consumer’s Shared Queue, and processed accordingly.

Based on the above example, our Consumer’s Shared Queue now contains messages 1 – 5, and our actual AMQP Queue is empty. This offers a win; remember that our goal with RabbitMQ, and AMQP in general, is to keep our Queues empty, or near as possible, at any given time to reduce resource consumption. In this case, our Queue is empty.

Well, we’ve reduced RabbitMQ’s resource-footprint, but what’s the catch? Well, we’re still faced with the same problem, which is that our messages are processed sequentially, so we haven’t reduced the overall time that it takes to process the messages – we’ve just moved the problem from one context to another.

Competing Consumers

If we could process each message in parallel, then we could reduce or processing time. We can achieve this by adding multiple Consumers. Each message takes 200ms for a single Consumer to process, so 5 Consumers can process 5 messages in 200ms.

Multiple AMQP Consumers

Multiple AMQP Consumers

Actually, this won’t happen, or at least is not likely to happen, because we’ve left our QOS-value at “5”, so let’s follow the process flow, assuming that we’ve started each Consumer in order of 1 through 5. This is important because in situations where multiple Consumers are listening to a Queue, RabbitMQ will prioritise delivery of messages in the same order that the Consumers started listening. If Consumer #1 was the first Consumer to start, it will be the first to receive messages.

This is the problem, in our case. Remember, a QOS value of “5” means that the Consumer will read 5 messages at a time. So in this case, assuming that our Queue contains 5 messages, Consumer #1 will read all 5 messages, and process them sequentially. Consumer’s 2 – 5 will remain idle and wasted.

Our problem is now that our Consumers are not adequately balanced. To facilitate accurate round-robin style balancing among Consumers, we simply set the QOS value of each Consumer to “1”. This results in the following behaviour:

  • Each Consumer reads 1 message at a time
  • Older Consumers will receive messages first, assuming that they are not busy

Summary

We’ve reduced our overall processing time to 200MS. We can now process 5 messages in 200MS. So why not do this all the time? There are 2 answers to this. The first concerns the technical aspect of the design. Consider that running multiple Consumers costs resources, especially memory. Just remember this before you think about creating thousands of Consumers. The above problem is simplistic, but this design may not suit your infrastructure for real-world problems.

The second answer concerns the business needs of the actual process that we are managing. Let’s say that our process is a data-logging mechanism, and that our messages contain logging metadata. In this case, we may not care whether or not it takes several minutes to save this data. Logging data is rarely referenced until something goes wrong, so our initial single Consumer solution may be adequate. Now consider a business process that persists time-sensitive metadata to a database. In this case, our multiple Consumer solution may better suit our needs.

What if 1 second is an acceptable processing time? Then we can use a combination of both solutions. We can introduce multiple Consumers, and set their QOS values to “5”. The result is that no Consumer will ever process more than 5 messages at a time, and therefore won’t take more than 1 second to process. Assuming that we introduce 5 Consumers, we can process 25 messages per second. Why not just use 5 Consumers with a QOS value of “1”? Won’t this achieve the same result – 25 messages per second? Yes, or at least close enough, but likely not exactly, because there is more network traffic involved, as we’re now reading 25 messages individually as opposed to 25 messages in batches of 5.

It is critical to take into account the underlying business process when considering an AMQP solution. Generally, a one-size-fits-all approach for all business processes is too broad an approach.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+