New Relic Insights .NET Client

Fork me on GitHubNew Relic.NET

A lightweight C# client for New Relic Insights

  • Upload New Relic Insights events on-demand
  • Upload in batches, as a scheduled task
  • Minimal CPU usage (1 thread)
  • Proxy-aware

Upload a Single Event

Create the Event

Create a class that implements NewRelicInsightsEvent:

public class CustomNewRelicInsightsEvent : NewRelicInsightsEvent
{
	[JilDirective(Name = "name")]
	public string Name { get; set; }

	[JilDirective(Name = "count")]
	public int Count { get; set; }

	[JilDirective(Name = "unixTimeStamp")]
	public int UnixTimeStamp => 
           (int) DateTime.UtcNow
                 .Subtract(new DateTime(1970, 1, 1))
                 .TotalSeconds;

	///
	/// EventType is the New Relic Insights Event Grouping. It determines the
	/// database to which the event will persist.
	///

	///
	/// /// must be serialised
	/// in Camel case, in order to be correctly interpreted by New Relic
	/// Insights.
	/// /// /// Apply the following attribute to the
	///
	/// property in your implementation:
	/// /// /// [JilDirective(Name = "eventType")]
	/// ///
	[JilDirective(Name = "eventType")]
	public string EventType { get; set; }
}

Note: NewRelic.NET leverages Jil for serialisation. Though JilDirective attributes are not required for custom properties, the EventType property requires the specified JilDirective.

Initialise New Relic Metadata

NewRelicInsightsClient.Instance.NewRelicInsightsMetadata.AccountID = 
	"{New Relic Account ID}";
NewRelicInsightsClient.Instance.NewRelicInsightsMetadata.APIKey = 
	"{New Relic API key}";
NewRelicInsightsClient.Instance.NewRelicInsightsMetadata.URI =
	new Uri("https://insights-collector.newrelic.com/v1/accounts");

Upload the Event

New Relic Event Upload

    var customNewRelicInsightsEvent = new CustomNewRelicInsightsEvent
    {
        Name = "TEST",
        Count = id,
        EventType = "Test"
    };

Synchronously

    NewRelicInsightsClient.Instance.UploadEvents(
	new NewRelicInsightsEvent[]
	{ 
		customNewRelicInsightsEvent 
	}, 
	new HttpClientFactory());

Asynchronously

    NewRelicInsightsClient.Instance.UploadEventsAsync(
	new NewRelicInsightsEvent[]
	{ 
		customNewRelicInsightsEvent 
	}, 
	new HttpClientFactory());

Upload Events in Batches

New Relic Batch Upload

Start the Event Cache

if (!NewRelicInsightsClient.Instance.HasStarted)
{
	NewRelicInsightsClient.Instance.Initialise();
}

Add Events to the Cache

NewRelicInsightsClient.Instance
    .AddNewRelicInsightEvent(customNewRelicInsightsEvent);

Event Upload Frequency

Batch-upload occurs every minute, by default, unless a custom frequency is specified. The following example sets the upload-frequency to 10 minutes:

NewRelicInsightsClient.Instance.RecurringTaskInterval = 10;

Proxy Server

The following example indicates that a proxy should be leveraged when invoking HTTP requests to New Relic:

NewRelicInsightsClient.Instance.NewRelicInsightsMetadata
    .UseWebProxy = true;
NewRelicInsightsClient.Instance.NewRelicInsightsMetadata
    .WebProxy = new WebProxy(“127.0.0.1:8080”);

Custom HTTP Timeout

Outbound HTTP requests to NewRelic are restricted to a specific timeout (5 seconds) as follows:

NewRelicInsightsClient.Instance.NewRelicInsightsMetadata
    .UseNonDefaultTimeout = true;
NewRelicInsightsClient.Instance.NewRelicInsightsMetadata
    .NonDefaultTimeout = new TimeSpan(0,0,5);

Otherwise, the default C# HttpClient-timeout applies.

Restricting Batch Upload Size

The default batch-upload size is 1,000 events. That is, a batch consisting of no more than 1,000 events will be uploaded upon each cache upload-cycle. The following example indicates that the entire cache should be emptied upon every upload-cycle:

NewRelicInsightsClient.Instance.NewRelicInsightsMetadata
    .CacheUploadLimit = int.MaxValue;

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

HSTS Supercookies with ASP.NET

Fork me on GitHub
HSTS, or HTTP Strict Transport Security is essentially a means of ensuring that your connection is secure. It is a feature of modern browsers that is designed to prevent, for example, man-in-the-middle attacks, where you request a secure resource, such as https://mybank.com, and are redirected by a malicious 3rd party over a non-secure connection, to http://mybank.com. Note the missing “s” in the 2nd URL Scheme.

How HSTS Works

Browsers typically solve this problem by storing security preferences in a small data structure. In its simplest form, this is a key-value pair index, where the key is the resource URL and the value is a boolean variable indicating whether or not the connection to the associated resource should be established in a secure manner:

_____________________
 google.com    | 1 |
 bing.com      | 1 |
 apple.com     | 1 |
 ____________________

Note that the above example indicates that all requests to google.com, bing.com, and apple.com should be made in a secure manner, over HTTPS. We can infer then, that entries that do not exist in the HSTS database can be said to allow non-secure connections. If we were to view this in tabular-format, it would resemble the following, where entries for both http://yahoo.com and http://wordpress.com do not exist in the browsers HSTS database:

_____________________
 google.com    | 1 |
 yahoo.com     | 0 |
 bing.com      | 1 |
 wordpress.com | 0 |
 apple.com     | 1 |
 ____________________

When read as a single value, the complete sequence of boolean values for this table reads as “10101”. It is therefore possible to leverage this table to store arbitrary binary values.

However, it is the sights themselves that determine whether or not they should be accessed over a secure connection or not. This is achieved by returning a HTTP 301 response to requests established over non-secure channels. The HTTP response also includes a reference to the secure URL. The browser will honour this response by redirecting to the secure URL, which returns the following HTTP Header:

Strict-Transport-Security: max-age=31536000

Note that the above max-age parameter may be set as required; the above is simply an example.

The browser, upon receiving this response, will add an entry to its HSTS database, indicating that all future requests should be established over a secure channel.

How to Hack HSTS

In order to “save” a binary value to the HSTS database, we need to control the URL entries that will reside within the database. Let’s assume that I own the following 4 domains:

1.supercookies.com
2.supercookies.com
3.supercookies.com
4.supercookies.com

I configure each of these sites to indicate the connections should only be established over secure channels. Imagine then, that I create a website that contains a JavaScript file that creates a random 4-digit binary value – in this case, “1010″.

In order to “save” this value, my JavaScript file should contain a function that connects to both 1.supercookies.com and 3.supercookies.com. This will create the following entries in the HSTS database:

_____________________________
 1.supercookies.com    | 1 |
 3.supercookies.com    | 1 |
_____________________________

We can infer from this that taking into account both other domains, out view of each domain expressed in tabular format might represent the following:

_____________________________
 1.supercookies.com    | 1 |
 2.supercookies.com    | 0 |
 3.supercookies.com    | 1 |
 4.supercookies.com    | 0 |
_____________________________

In other words, by implementing a custom endpoint in each domain that simply returns a boolean value indicating whether or not the inbound HTTP request is secure or not will indicate to us whether or not there is an entry in the browsers HSTS database for that domain. For example, if we invoke a connection to http://1.supercookies.com (note the non-secure HTTP Scheme) then we would expect the browser to force a redirect to the secure equivalent of that URL (https://1.supercookies.com). Thus, if out endpoint returns a positive boolean, we can infer that this domain is present in our browsers HSTS database. Otherwise, the domain is not present, and our endpoint will return a negative boolean. By establishing connections to each domain, we can build a series of boolean values; in this case, “1010“.

Practical Example with ASP.NET Web API

Add the following ASP.NET Web API Controller method to write an entry to the HSTS database for the domain that hosts your ASP.NET application:

public HttpResponseMessage Write()
{
    HttpResponseMessage response;

    if (Request.RequestUri.Scheme.Equals("https"))
    {

        response = Request.CreateResponse(HttpStatusCode.NoContent);
        response.Headers.Add("Strict-Transport-Security", "max-age=3153600");

        return response;
    }

    response = Request.CreateResponse(HttpStatusCode.MovedPermanently);
    response.Headers.Location = new Uri(Request.RequestUri.AbsoluteUri.Replace("http", "https"));

    return response;
}

In simple cases, the above method simply returns a HTTP 301, that indicates to the browser to redirect to the secure equivalent of the origin URL. Upon redirecting, the browser receives the HSTS Header that results in an entry in the HSTS database for the domain that hosts your ASP.NET application.

Add the following method in order to read the HSTS entry (if present) for the domain that hosts your ASP.NET application:

public class HSTSResponse
{
    public bool IsSet { get; set; }
}

public HSTSResponse Read()
{
    if (Request.RequestUri.Scheme.Equals("https"))
    {
        return new HSTSResponse
        {
            IsSet = true
        };
    }
    return new HSTSResponse();
}

This method returns a positive boolean value if the inbound HTTP request is secure, implying that the upstream browser contains an entry in its HSTS database for the domain that hosts your ASP.NET application.

Generating Tracking IDs


It is not necessary to compile or run the source code – simply browse to the included index.html file in order to demonstrate the process. You can, of course, run the application locally if you wish.

The complete code leverages 4 external websites, as per the above example, in order to generate a binary value and indirectly store it in the HSTS database. Leveraging 4 external websites yields a total of 24 possible unique values – hardly enough to constitute a unique tracking mechanism. However, consider that if we own 32 external domains we can now control over 2.6 billion unique tracking IDs using this method. Note that the tracking ID in the sample code is rendered as Base-36 for legibility.

Why use the HSTS database as a storage mechanism

Cookies can be removed, edited, and faked. Leveraging the HSTS database as a storage mechanism potentially reduces the possibility that your tracking ID will be deleted. While this style of design is generally considered unscrupulous, the purpose of this post is to educate; whether or not this mechanism should be implemented in the wild is a matter of opinion that I leave up to the reader.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Building Stateful Services with Azure Service Fabric

Fork me on GitHub
Azure Service Fabric offers two modes of operation: stateful and stateless. Both implementations allow for microservice-style application development. This tutorial focuses on building a simple, stateful microservice; that is, a microservice that maintains a degree of state between calls such that underlying objects retain their properties’ state after a client applies changes to the object.

Why Stateful

Stateful services provide the best of both worlds in terms of scalability and reliability. Azure Service Fabric encapsulates the scaling process to a great degree, by handling virtualisation and hardware-provisioning. The framework also handles load-balancing, failover, and state-synchronisation. The following is a step-by-step guide to implementing the simplest stateful service possible with Azure Service Fabric. The guide assumes that you have completed the steps involved in configuring your development environment.

1. Create a new Azure Service Fabric project

Create a new Azure Service Fabric project called “MyApplication”, selecting the Stateful Reliable Actor type from the list of available templates:

Stateful Reliable Actor Template

Stateful Reliable Actor Template

2. Add Custom Method Stubs

Locate the ISimpleActor” interface and note the existing method stubs. This interface and associated implementation is created automatically and includes two built-in method stubs. Add the following stubs so that the interface includes a “Name” property accessor and modifier:

    public interface ISimpleActor : IActor
    {
        Task<int> GetCountAsync();

        Task SetCountAsync(int count);

        Task<string> GetNameAsync();

        Task SetNameAsync(string name);
    }

Note that all method stubs are asynchronous by default. We have added two new methods; a simple name property accessor and modifier.

3. Add Name Property to State

Locate the ActorState class and modify so that the class includes a “Name” property:

        [DataContract]
        internal sealed class ActorState
        {
            [DataMember]
            public int Count { get; set; }

            [DataMember]
            public string Name { get; set; }

            public override string ToString()
            {
                return string.Format(CultureInfo.InvariantCulture, "SimpleActor.ActorState[Count = {0}]", Count);
            }
        }

4. Add Custom Methods

Add the following implementation methods to the SimpleActor class so that it satisfies the ISimpleActor interface:

        public Task<string> GetNameAsync()
        {
            return Task.FromResult(State.Name);
        }

        public Task SetNameAsync(string name)
        {
            State.Name = name;
            return Task.FromResult(true);
        }

5. Manage in Service Fabric Explorer

Deploy the application using Visual Studio:

Build -> Deploy Solution

Note that the application is now running locally in Service Fabric Explorer:

Azure Service Fabric Cluster Manager

Azure Service Fabric Cluster Manager

6. Create Proxy Client

Add a new Console Application to the solution and install the Microsoft.ServiceFabric.Services NuGet package:

Azure Service Fabric NuGet Package

Azure Service Fabric NuGet Package

Note: You may need to change the Target Framework property to 4.5.1, and also the Platform Target to X64:

Modify Target Framework

Modify Target Framework

Modify Platform Target

Modify Platform Target

Finally, include a reference to the SimpleActor.Interfaces project, and add the following to the Main method in the Program class:

            var actorId = ActorId.NewId();

            var simpleActor =
                ActorProxy.Create<ISimpleActor>(actorId, "fabric:/MyApplication");

            simpleActor.SetNameAsync("Bob");
            var name = simpleActor.GetNameAsync();

            Console.WriteLine("Hello, " + name.Result);
            Console.ReadLine();

Summary

Run the program and observe that the text “Hello, Bob” is printed to the command window. The Name property retains its state across calls. Subsequent accessor calls to the SimpleActorService will return the value “Bob”, unless that value is explicitly modified. Note that the value is synchronised across all running instances and nodes.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

.NET CLR Explained

Inside the CLR

Overview

This post aims to provide a summary cheat sheet that outlines the .NET CLR, its key components and features, and inner workings as a frame of reference for developers of every calibre.

Key Points

  • Provides services such as garbage-collection, exception-handling, debugging, and security
  • Compilers output metadata in a format recognisable by the CLR
  • Executes code that has been compiled by a .NET-compliant compiler
  • Automatic memory-management – arguably the primary feature of the CLR
  • Once compiled, all CLR-based languages interface seamlessly

Managed Execution

Managed Execution consists of the following steps:

  • Loading source code
  • Executing source code
  • Providing automatic memory-management through garbage-collection

Managed Code to MSIL

C#
Microsoft Intermediate Language is a common denominator to which all CLR-compliant languages are transformed upon compilation. MSIL can be described as follows:

  • A set of CPU-independent instructions
  • Contains instructions for loading, storing, constructing, and executing methods
  • Removes the need for type libraries or registry entires
  • Exposes metadata, used to generate native code, through portable files

MSIL to Native Code

Native code is code that can be interpreted and executed by a specific OS architecture. Generally speaking, any given application will target one or more OS frameworks. In order to run on these frameworks, compiled MSIL must be compiled further to native code, or code that is specific to a particular environment, or collection of environments. Native code can be described as follows:

  • Runs on OS architecture supported by the underlying MSIL compiler
  • MSIL must be compiled to native code that targets OS architecture
  • This can be achieved either prior to (AOT compiler), or during (JIT compiler) code-execution

JIT Compiler

  • Convert MSIL to managed code at runtime
  • Stores native code in memory
  • Does not load entire assembly
  • Stubs un-called MSIL directives until invoked
  • Loads MSIL stubs to memory when invoked

AOT Compiler (NGEN.exe)

  • Convert MSIL to managed code prior to assembly-execution
  • Loads the entire assembly
  • Persists the generated code to disk in the Native Image Cache

Code Verification

MSIL is subject to a verification process, unless explicitly overridden by policy. The verification process ensures that MSIL adheres to and upholds the contract defined by the CLR in terms of allowing managed-execution. This helps ensure that code is type-safe, and protected from corruption.

Summary

The following topics define brief descriptions of CLR key features

Running Code

  • Provides infrastructure to facilitate managed-execution of source code
  • JIT compiles methods as they are invoked
  • Subscribes executing assemblies to garbage-collection, security, debugging services, etc.

Automatic Memory Management

One of the most seismic paradigm shifts in the field of software development has been the advent of frameworks that automatically manage memory. This feature shields developers from the complexities and pitfalls of memory-management, removing complexity and increasing productivity. The following outline key features of automatic memory management in the CLR

Allocating Memory

  • Objects are saved to memory in a contiguous manner (side-by-side)
  • Pointers provide access to Reference Types on the Managed Heap
  • Access to objects is fast and reliable

Releasing Memory

  • Garbage Collector releases memory assigned to objects that are no longer in use
  • Application Roots point to objects currently in memory
  • Unreachable objects are removed from memory
  • Heap-compaction occurs in cases where significant numbers of unreachable objects exist
  • Objects over 85KB in size are assigned to the Large Object Heap

Generations and Performance

  • Managed Heap is divided into 3 Generations
  • Segmentation allows for more effective Heap-compaction
  • New objects are assigned to Generation 0
  • Objects that survive garbage-collection are promoted upwards through Generations
  • Garbage-collection occurs when Generation 0 is full
  • Most objects live and die in Generation 0
  • Certain objects, such as database connections, tend to survive multiple garbage-collection cycles
  • Objects in Generation 2 remain there until they are deemed to be unreachable

Unmanaged Resources

  • Require explicit removal from memory
  • Examples include handles to the OS file system, network connections, etc.
  • Clients explicitly release unmanned resource via Dispose methods

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Levaraging Azure Service Bus with C#

Fork me on GitHubAzure Service Bus
Microsoft Azure provides offers Azure Service Bus as a means of leveraging the Decoupled Middleware design pattern, among other things in your application. This post outlines a step-by-step guide to implementation, assuming that you have already established an Azure account, and have initialised an associated Service Bus.

Start with the Abstraction

This library abstracts the concept of a Service Bus to a level that is not restricted to MS Azure alone. Both ServiceBus and ServiceBusAdapter classes offer any Service Bus implementation the means to establish associated implementations in this library. Having said that, this library explicitly implements concrete classes that are specific to MS Azure Service Bus.

The MS Azure Service Bus

The MSAzureServiceBus class provides a succinct means of interfacing with an MS Azure Service Bus, consuming messages as they arrive. Upon initialisation, MSAzureServiceBus requires that a delegate be established that determines appropriate behaviour to invoke in the event of inbound new messages. Very simply, this functionality is exposed as follows:

Incoming Message-handling

public override event EventHandler<MessageReceivedEventArgs<BrokeredMessage>> MessageReceived;

private void OnMessageReceived(MessageReceivedEventArgs<BrokeredMessage> e) {
    var handler = MessageReceived;
    if (handler != null) handler(this, e);
}

Duplicate Message-handling

Similarly, behaviour applying to duplicate messages, that is, messages that have already been processed by MSAzureServiceBus, can also be established:

public override event EventHandler<MessageReceivedEventArgs<BrokeredMessage>> DuplicateMessageReceived;

private void OnDuplicateMessageReceived(MessageReceivedEventArgs<BrokeredMessage> e) {
    var handler = DuplicateMessageReceived;
    if (handler != null) handler(this, e);
}

Receiving Messages Explicitly

Bootstrapping delegates aside, MSAzureServiceBus provides a method designed to retrieve the next available message from the MS Service Bus. This method may be invoked on demand, or as part of a continuous loop, polling the MS Service Bus and consuming new messages immediately after they become available.

        protected override void ReceiveNextMessage(string publisherName, TimeSpan timeout, bool autoAcknowledge) {
            var message = serviceBusAdapter.ReceiveNextMessage(publisherName, timeout, autoAcknowledge);
            if (message == null) return;

            var isValidMessage = messageValidator.ValidateMessageId(message.MessageId);

            if (isValidMessage) {
                messageValidator.AddMessageIdToCache(message.MessageId);
                OnMessageReceived(new BrokeredMessageReceivedEventArgs(message));
            }
            else {
                OnMessageReceived(new BrokeredMessageReceivedEventArgs(message));
            }
        }

The MS Azure ServiceBus Adapter

The MSAzureServiceBusAdapter class is a Bridge that encapsulate the underlying mechanisms required to establish a connection to, send, and receive messages to and from MS Azure Service Bus. Let’s consider the functionality in that order:

Initialising a Connection

Firstly, we must establish a NamespaceManager based on an appropriate connection-string associated with out MS Azure Service Bus account:

            var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            _namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

Now we return a reference to a desired Topic, creating the Topic if it does not already exist:

                _topic = !_namespaceManager.TopicExists(topicName) ?
                _namespaceManager.CreateTopic(topicName) : _namespaceManager.GetTopic(topicName);

Lastly, we create a Subscription to the Topic, if one does not already exist:

                if (!_namespaceManager.SubscriptionExists(_topic.Path, subscriptionName))
                _namespaceManager.CreateSubscription(_topic.Path, subscriptionName);

The Complete Listing

        public override void Initialise(string topicName) {
            var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            _namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);

            _topic = !_namespaceManager.TopicExists(topicName) ?
                _namespaceManager.CreateTopic(topicName) : _namespaceManager.GetTopic(topicName);

            if (!_namespaceManager.SubscriptionExists(_topic.Path, subscriptionName))
                _namespaceManager.CreateSubscription(_topic.Path, subscriptionName);

            _isInitialised = true;
        }

It’s worth noting that all methods pertaining to MSAzureServiceBusAdapter will implicitly invoke the Initialise method if a connection to MS Azure Service Bus has not already been established.

Sending Messages

This library offers the means to send messages in the form of BrokeredMessage objects to MS Azure Service Bus. Firstly, we must establish a connection, if one does not already exist:

if (!_isInitialised) Initialise(topicName);

Finally, initialise a SubscriptionClient, if one has not already been established, and simply send the message as-is, in BrokeredMessage-format:

            if (_topicClient == null)
                _topicClient = TopicClient.Create(topicName);
            _topicClient.Send(message);

The Complete Listing

        public override void SendMessage(string topicName, BrokeredMessage message) {
            if (!_isInitialised) Initialise(topicName);

            if (_topicClient == null)
                _topicClient = TopicClient.Create(topicName);
            _topicClient.Send(message);
        }

Receiving Messages

Receiving Messages
Messages are consumed from MS Azure Service Bus in a serial manner, one after the other. Once again, we must initially establish a connection, if one does not already exist:

            if (!_isInitialised)
                Initialise(topicName);

Next, we initialise a SubscriptionClient, if one has not already been established, and define a BrokeredMessage instance, the desired method return-type:

            if (_subscriptionClient == null)
                _subscriptionClient = SubscriptionClient.Create(topicName, subscriptionName);

            BrokeredMessage message = null;

Next, we return the next available message, or null, if there are no available messages:

                message = _subscriptionClient.Receive(timeout);
                if (message == null)
                    return null;

Note that this method defines an “autoAcknowledge” parameter. If true, we must explicitly acknowledge the consumption of the message:

                if (!autoAcknowledge) return message;
                message.Complete();

Finally, we return or abandon the message, depending on whether or not an Exception occurred:

            catch (Exception) {
                if (message != null) message.Abandon();
                throw;
            }
            return message;

The Complete Listing

        public override BrokeredMessage ReceiveNextMessage(string topicName, TimeSpan timeout, bool autoAcknowledge = false) {
            if (!_isInitialised)
                Initialise(topicName);

            if (_subscriptionClient == null)
                _subscriptionClient = SubscriptionClient.Create(topicName, subscriptionName);

            BrokeredMessage message = null;

            try {
                message = _subscriptionClient.Receive(timeout);
                if (message == null)
                    return null;
                if (!autoAcknowledge) return message;
                message.Complete();
            }
            catch (Exception) {
                if (message != null) message.Abandon();
                throw;
            }
            return message;
        }

A Practical Example

Let’s build a small Console Application to demonstrate the concept. Our application will interface with MS Azure Service Bus and continuously poll for messages until the application terminates:

            var serviceBus = new MSAzureServiceBus(new MSAzureServiceBusAdapter(), new MessageValidator());
            serviceBus.MessageReceived += serviceBus_MessageReceived;

            private static void serviceBus_MessageReceived(object sender, MessageReceivedEventArgs<BrokeredMessage> e) {
                Console.WriteLine(e.Message.MessageId);
            }

Message Validation

Notice the MessageValidator instance in the above code snippet. Let’s pause for a moment and consider the mechanics.

Messages contain message identifiers in GUID format. Our application retains an index that maps these identities. Incoming messages are validated by comparing the incoming message ID to those IDs stored within the index. If a match is found, the message is determined to be a duplicate, and appropriate action can be taken.

Here we can see that our inbound message IDs are stored in a simple HashSet of type String. Incidentally, we leverage a HashSet here to achieve what is known as constant complexity in terms of time. Essentially, the time taken to perform a lookup will remain constant (external factors such as garbage collection aside) regardless of HashSet size:

private readonly HashSet<string> _cache = new HashSet<string>();

public IEnumerable<string> Cache { get { return _cache; } }

Newly added messages are formatted to remove all hyphens, if any exist, so that the same standard is applied to message IDs, regardless of format:

        public void AddMessageIdToCache(string messageId) {
            _cache.Add(messageId.Replace('-', '\0'));
        }

        public bool ValidateMessageId(string messageId) {
            return _cache.Contains(messageId);
        }

Once initialised, the application will continuously poll MS Azure Service Bus until the return key is pressed:

            serviceBus.StartListening("TestTopic", new TimeSpan(0, 0, 1), true);
            Console.WriteLine("Listening to the Service Bus. Press any key to quit...");

            Console.ReadLine();
            serviceBus.StopListening();

            Console.WriteLine("Disconnecting...");

The Complete Listing

    internal class Program {
        private static void Main(string[] args) {
            var serviceBus = new MSAzureServiceBus(new MSAzureServiceBusAdapter(), new MessageValidator());
            serviceBus.MessageReceived += serviceBus_MessageReceived;

            serviceBus.StartListening("TestTopic", new TimeSpan(0, 0, 1), true);
            Console.WriteLine("Listening to the Service Bus. Press any key to quit...");

            Console.ReadLine();
            serviceBus.StopListening();

            Console.WriteLine("Disconnecting...");
        }

        private static void serviceBus_MessageReceived(object sender, MessageReceivedEventArgs<BrokeredMessage> e) {
            Console.WriteLine(e.Message.MessageId);
        }
    }

Simply add a new message to your MS Azure Service Bus instance. The application will consume the message and display the message ID on-screen.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Automatically Recover from Connectivity Failure in Go

Fork me on GitHubGo Fallback
Go provides a very effective means of executing HTTP requests, exposed through the net/http package. In certain scenarios, it may be favourable to provide recovery mechanisms that allow Go applications to function as normal in the event of HTTP connectivity failure.

Assuming the following process flow:

Application with dependency

Application with dependency

Our Go application has an explicit dependency on System 1. Let’s assume that communication between both systems occurs over HTTP, and that our Go application must be highly available; in that it must remain functional if downstream systems fail:

Application with fallback dependencies

Application with fallback dependencies

Now we must configure our Go application, in the event of System 1 failure; to automatically attempt to repeat the failing HTTP request to Backup 1.

Similarly, should the connection to Backup 1 become unavailable, our Go application should attempt to repeat the failing HTTP request to Backup 2.

Chain of Responsibility

Chain of Responsibility

Chain of Responsibility

The Chain of Responsibility design pattern provides a suitable solution. The pattern defines a series of handlers, each one designed to execute a specific task. The task is passed along the chain until a handler successfully completes the task, or all handlers fail to complete the task.

Package fallback is designed to achieve this pattern, and to provide redundancy in the event of connectivity failure. The process flow pertaining to the above example is as follows:

Fallback process flow

Fallback process flow

Builder

Builder Design Pattern

Builder Design Pattern

Package fallback handles the execution of HTTP requests by wrapping each request in a ConnectionBuilder, as per the Builder design pattern. This allows the developer control over the manner in which underlying HTTP requests are constructed. Each ConnectionBuilder controls an underlying Connection – a wrapper for the actual HTTP request.

Each HTTP request is wrapped in a ConnectionBuilder, and managed by a ConnectionManager –  the Director component. Once the Chain of Responsibility has been established, the ExecuteHTTPRequest method is executed on the first underlying Connection. This method will attempt to successfully execute the underlying HTTP request. Should the request fail, the method will invoke the ExecuteHTTPRequest on the next Connection in the chain, in a recursive manner, until any given request succeeds, or all requests fail.

Installation

go get github.com/daishisystems/fallback

Sample Code

First, we need to establish some data structures:

    // BasicResponse represents the response issued from the first successful
    // HTTP request, if applicable.
    type BasicResponse struct {
        Text   string
        Detail string
    }

    // BasicError represents the error issued from the last unsuccessful
    // HTTP request, if applicable.
    type BasicError struct {
        Code    int
        Message string
    }

    //
    type PostBody struct {
        Name   string
        Amount int
    }

    basicResponse := &BasicResponse{}
    basicError := &BasicError{}

    postBody := PostBody{
        "Random", 100,
    }

Next, we initialise a ConnectionManager (the Director in our Builder implementation) and pass a ConnectionBuilder (the Builder itself). This particular ConnectionBuilder, in this example, is designed to execute successfully:

    passPath := "http://demo7227109.mockable.io/get-basic"
    failPath2 := "http://demo7227109.mockable.io/fail-basic"
    failPath1 := "http://demo7227109.mockable.io/fail-basic-post"

    connectionManager := fallback.ConnectionManager{}

    // This Connection will execute last, and will succeed.
    passBuilder := fallback.NewConnectionBuilder("PASS", "GET", passPath, true,
        nil, nil, &basicResponse, &basicError, nil)
    connectionManager.CreateConnection(passBuilder)

Now add 2 more ConnectionBuilders to the chain, both of which, in this example, are designed to fail execution:

    // This Connection will be the 2nd Connection to execute, and will fail.
    failBuilder2 := fallback.NewConnectionBuilder("FAIL2", "POST", failPath2,
        true, nil, nil, &basicResponse, &basicError, passBuilder.Connection)
    connectionManager.CreateConnection(failBuilder2)

    //This Connection will be the 1st Connection to execute, and will fail.
    failBuilder1 := fallback.NewConnectionBuilder("FAIL1", "POST", failPath1,
        true, postBody, nil, &basicResponse, &basicError,
        failBuilder2.Connection)
    connectionManager.CreateConnection(failBuilder1)

Finally, invoke the first ConnectionBuilder in the chain. In this example, both first and second ConnectionBuilders are designed to fail. Execution recursively falls back to passBuilder, which executes successfully:

    // Each Connection will be invoked in a recursive manner until a
    // Connection succeeds, or all Connections fail. Please refer to the Chain
    // of Responsibility design for more information.
    statusCode, err := failBuilder1.Connection.ExecuteHTTPRequest()
    if err != nil {
        panic(err)
    }

    fmt.Printf("HTTP status code: %d\n", statusCode)
    fmt.Printf("Text: %s\n", basicResponse.Text)
    fmt.Printf("Detail: %s", basicResponse.Detail)

The fall-back process is encapsulated within package fallback; consuming clients are unaware of failures, and instead, are guaranteed success, long as at least one HTTP connection remains viable.

Summary

Package fallback enhances the durability of your API by automatically recovering from connectivity failure. It achieves this by providing an enhanced degree of redundancy to HTTP requests, introducing a Chain of Responsibility, consisting of a series of fallback HTTP requests designed to augment an initial HTTP request. Should the initial HTTP request fail, the next fallback HTTP request in the chain will execute.

Any number of fallback HTTP requests can be chained sequentially. Redundancy is achieved by executing each fallback HTTP request in a recursive manner until one of the requests succeeds, or all requests fail.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Simple Date Handling in Go

Fork me on GitHubGo Month Package
Package time provides functionality focused on managing times and dates. The package consists of a wide range of features designed to facilitate a Gregorian calendar. Here is a canonical example:

package main

import (
	"fmt"
	"time"
)

func main() {
	t := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
	fmt.Printf("Time is set to %s\n", t.Local())
}

Last Day of the Month

Very nice, neat, and concise. Let’s say that I want to calculate the last day of the month, for any given month in any given year. For example, I know that 2012 was a leap year, and February therefore had 29 days. Here is how such a calculation would look in C#:

var lastDayofMonth = DateTime.DaysInMonth(2012, 02);

And in Java:

// Java 7
SimpleDateFormat dateFormat = new SimpleDateFormat("MM/dd/yyyy");
Date convertedDate = dateFormat.parse(date);
Calendar c = Calendar.getInstance();
c.setTime(convertedDate);
c.set(Calendar.DAY_OF_MONTH, c.getActualMaximum(Calendar.DAY_OF_MONTH));

// Java 8
LocalDate date = LocalDate.of(2012, Month.FEBRUARY, 1);

System.out.printf("The last day of February is: %s%n",
	date.with(TemporalAdjusters.lastDayOfMonth()));

That’s a little verbose. Let’s try JavaScript:

return new Date(2012, 2, 0).getDate();

How does Go stack up against these idioms?

m := time.February
t := time.Date(2012, m+1, 0, 0, 0, 0, 0, time.UTC)
fmt.Println(2012, m, t.Day())

That’s quite succinct, but not very intuitive. Essentially we’re incrementing the month value by 1, and setting the day value to zero. This causes the underlying date mechanisms to roll back the date to the last day of the previous month – remember, we specified the month value as February, and incremented this value by 1. Now our month value is equal to March. Setting the day value to 0 effectively rolls the date back to the last day of February; in this case, the 29th.

This method gets the job done, at the expense of adding verbosity. The above example in C# is arguably the most intuitive. It would be nice if we could calculate the last day of the month in a similar manner in Go.

Fortunately the Go Month package provides this functionality. The package is designed with developer productivity in mind, encapsulating some of the more verbose features of underlying Go packages, and exposing them intuitively. Let’s install the package and take a look at some examples:

go get github.com/daishisystems/month
    // The last numeric day of January is 31
    m := month.January
    fmt.Printf("The last numeric day of %s is %d\n", m, m.LastDay(2015))

    // The last numeric day of February is 28
    m = month.February
    fmt.Printf("The last numeric day of %s is %d\n", m, month.February.LastDay(2015))

    // The last numeric day of February is 29
    m = month.February
    fmt.Printf("The last numeric day of %s is %d\n", m, m.LastDay(2008))

    // The last numeric day of July is 31
    m = month.July
    fmt.Printf("The last numeric day of %s is %d\n", m, m.LastDay(2015))

What exactly does LastDay do? Is it just a wrapper that encapsulates built-in Go functionality?

No, it’s a bespoke method that doesn’t rely on existing date or time functionality. In fact, the package does not depend on any other packages at all. Here is the method in detail:

func (m Month) LastDay(year uint16) int {

	if m == 2 {

		isLeapYear := year%4 == 0 &&
			(year%100 != 0 || year%400 == 0)

		if isLeapYear {
			return 29
		}
		return 28
	}
	return monthDays[int(m)]
}

The method accepts a year value. This is necessary, given that the month of February contains 29 days in a leap year. Notice that I refer to the code snippet as a method. Methods and functions are not interchangeable terms in Go. Specifically, a method must have a receiver; a type to which the method is applied. In this case, our receiver is of type Month. Think of Go methods as being similar to class-level functions in C# or Java.

The package contains a static collection of months represented as integers, and the corresponding number of days in each month. Only February is omitted:

var monthNames = [12]string{
	"January",
	"February",
	"March",
	"April",
	"May",
	"June",
	"July",
	"August",
	"September",
	"October",
	"November",
	"December",
}

LastDay simply returns the value of the above map (Dictionary in C#, HashMap in Java) associated with the specified month, if the month is any other month than February. Otherwise, the method determines whether or not the year is a leap year. A leap year is defined as being:

  • Evenly divisible by 4
  • Not evenly divisible by 100, unless it is also evenly divisible by 400

LastDay employs this calculation as follows:

isLeapYear := year%4 == 0 &&
    (year%100 != 0 || year%400 == 0)

Very simply, if the year is determined to be a leap year, LastDay returns 29, otherwise, 28:

if isLeapYear {
    return 29
}
return 28

Summary

The Go Month Package provides functionality in an intuitive format, has no dependencies on existing date or time packages (or any packages at all), and very little verbosity. Please reach out and contact me for questions, suggestions, or to just talk tech in general.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

PayPal Express Checkout with PaySharp.NET

Fork me on GitHubPaySharp.NET
Express Checkout is the optimal solution for Enterprise Merchants to leverage PayPal, whose APIs are broad and intricate. This post aims to simplify the design and implementation of Express Checkout using a custom .NET library called PaySharp.NET.

Express Checkout is primarily composed of the following PayPal processes:

  1. SetExpressCheckout
  2. GetExpressCheckoutDetails
  3. DoExpressCheckoutPayment

The process flow is as follows:

PayPal Express Checkout in action

PayPal Express Checkout in action

Prerequisites

  • .NET Framework 4.5 or above

Installation

PM> Install-Package Daishi.PaySharp

Getting Started with PayPal Express Checkout

Register a Business Account with PayPal. PaySharp.NET requires the following prerequisite PayPal metadata:

  • Username
  • Password
  • Signature
  • ExpressCheckoutURI

ExpressCheckoutURI should refer to the PayPal Sandbox for all non-production environments. Each PayPal account is also associated with a Secure Merchant ID, which can be included in the Subjectfield (see code samples below), if for example, your application supports multiple currencies. This is an optional field, and is not prerequisite to fulfilling an Express Checkout transaction.

Explanation of Terms

SetExpressCheckout

Establishes a PayPal session based on Merchant credentials, and returns an Access Token pertaining to that session.

GetExpresscheckoutDetails

Returns a definitive collection of metadata that describes the PayPal user (nameaddress, etc.).

DoExpressCheckoutPayment

Collects the payment by transferring the transaction amount from the User’s account to the Merchant account.

Running the Test Harness

PaySharp.NET is covered by a range of Unit Tests, included with each build. To provide a greater degree of reliability, the SDK contains a Test Harness project. This project will execute a full Express Checkout transaction when invoked as follows:

  1. Locate App.config in Daishi.PaySharp.TestHarness
  2. Enter appropriate values for UserPasswordSignature, and Subject (if applicable)
  3. Run the project (F5)
  4. Press any key when prompted
  5. SetExpressCheckout executes and returns a PayPal Access TokenPaySharp Test Harness Step 1
  1. Open your web browser and navigate to https://www.sandbox.paypal.com/cgi-bin/webscr?cmd=_express-checkout&token=EC-1UR71602HJ0009422. Note that the token parameter is set to the value previously returned in Step 5
  2. Login to PayPal
  3. Your browser will redirect to http://www.example.com/success.html?token=EC-1UR71602HJ0009422&PayerID=783CTW8EXK5AE. Note the PayerID parameter
  4. Return to the Test Harness Command Prompt having copied the PayerID parameter from Step 3
  5. Press any key when prompted, and input the PayerID parameter from Step 3
  6. GetExpressCheckoutDetails is invokedPaySharp Test Harness Step 2
  1. Press any key when prompted
  2. DoExpressCheckoutPayment is invoked, successfully completing the transactionPaySharp Test Harness Step 3

Sample Code

SetExpressCheckout

                var user = ConfigurationManager.AppSettings["User"];
                var password = ConfigurationManager.AppSettings["Password"];
                var signature = ConfigurationManager.AppSettings["Signature"];
                var subject = ConfigurationManager.AppSettings["Subject"];

                var payPalAdapter = new PayPalAdapter();

                var setExpresscheckout =
                    payPalAdapter.SetExpressCheckout(
                        new SetExpressCheckoutPayload {
                            User = user,
                            Password = password,
                            Signature = signature,
                            Version = "108.0",
                            Amount = "19.95",
                            Subject = subject,
                            LocaleCode = "en-IE",
                            CurrencyCode = "EUR",
                            CancelUrl = "http://www.example.com/cancel.html",
                            ReturnUrl = "http://www.example.com/success.html",
                            PaymentRequestName = "TEST",
                            PaymentRequestDescription = "TEST BOOKING"
                        },
                        Encoding.UTF8,
                        ConfigurationManager.AppSettings["ExpressCheckoutURI"]);

                string accessToken;
                PayPalError payPalError;

                var ok = PayPalUtility.TryParseAccessToken(setExpresscheckout,
                    out accessToken, out payPalError);

GetExpressCheckoutDetails

                var getExpressCheckoutDetails = payPalAdapter
                    .GetExpressCheckoutDetails(
                        new GetExpressCheckoutDetailsPayload {
                            User = user,
                            Password = password,
                            Signature = signature,
                            Version = "108.0",
                            AccessToken = accessToken,
                            Subject = subject,
                            PayerID = payerID
                        },
                        ConfigurationManager.AppSettings["ExpressCheckoutURI"]);

                CustomerDetails customerDetails;

                ok = PayPalUtility.TryParseCustomerDetails(
                    getExpressCheckoutDetails, out customerDetails,
                    out payPalError);

DoExpressCheckoutPayment

                var doExpressCheckoutPayment = payPalAdapter
                    .DoExpressCheckoutPayment(
                        new DoExpressCheckoutPaymentPayload {
                            User = user,
                            Password = password,
                            Signature = signature,
                            Version = "108.0",
                            AccessToken = accessToken,
                            Subject = subject,
                            PayerID = payerID,
                            PaymentRequestAmt = "19.95",
                            PaymentRequestCurrencyCode = "EUR"
                        },
                        ConfigurationManager.AppSettings["ExpressCheckoutURI"]);

                TransactionResults transactionResults;

                ok = PayPalUtility.TryParseTransactionResults(
                    doExpressCheckoutPayment, out transactionResults,
                    out payPalError);

API Documentation

The API is fully documented; a *.chm Help-file is included with every build. If you prefer to view the API documentation in a web-based format, such as HTML, you can run the Sandcastle tool against any branch in order to generate the requisite files.

NoteYou will likely need to unblock the Help-file as part of Windows security measures.

FAQ

Does this library support C# Async?

Yes, there are asynchronous equivalents of each synchronous method exposed by the SDK.

I get weird errors from PayPal

Generally, PayPal issues intuitive error messages. Less intuitive error messages are usually returned as a result of uninitialized payload properties. In the case of SetExpressCheckout, scan through the properties in SetExpressCheckoutPayload and ensure that each property is set to an appropriate value.

Can I Fork this project?

By all means. I’m happy to contribute to any extensions.

What’s next?

An set of extensible components that make it easier for developers to create and augment objects proprietary to downstream systems, such as Fraud Prevention, Booking & Reservation, and Back-office Accounting systems.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Microservices in C# Part 5: Autoscaling

Fork me on GitHub

Balancing demand and processing power

Balancing demand and processing power

Autoscaling Microservices

In the previous tutorial, we demonstrated the throughput increase by invoking multiple instances of SimpleMathMicroservice, in order to facilitate a greater number of concurrent inbound HTTP requests. We experimented with various configurations, increasing the count of simultaneously running instances of SimpleMathMicroservice until the law of diminishing returns set it.

This is a perfectly adequate configuration for applications that absorb a consistent number of inbound HTTP requests over any given extended period of time. Most web applications, of course, do not adhere to this model. Instead, traffic tends to fluctuate, depending on several factors, not least of which is the type of business that the web application facilitates.

This presents a significant problem, in that we cannot manually throttle the number of concurrently running Microservice instances on-demand, as traffic dictates. We need an automated mechanism to scale our Microservice instances adequately.

Autoscaling involves more than simply increasing the count of running instances during heavy load. It also involves the graceful termination of superfluous instances, or instances that are no longer necessary to meet the demands of the application as load is reduced. Daishi.AMQP provides just such features, which we’ll cover in detail.

QueueWatch

QueueWatch is a mechanism that allows the monitoring of RabbitMQ Queues in real time. It achieves this by polling the RabbitMQ Management API (mentioned in Part #3) at regular intervals, returning metadata that describes the current state of each Queue.

Metadata

RabbitMQ exposes important metadata pertaining to each Queue. This metadata is presented in a user-friendly manner in the RabbitMQ Management Console:

Message Rates

Message Rates

These metrics represent the rates at which messages are processed by RabbitMQ. “Publish” illustrates the rate at which messages are introduced to the server, while “Deliver” represents the rate at which messages are dispatched to listening consumers (Microservices, in our case).

This information is readily available in the RabbitMQ Management API. QueueWatch effectively harvests this information, comparing the values retrieved in the latest poll with those retrieved in the previous, to monitor the flow of messages through RabbitMQ. QueueWatch can determine whether or not any given Queue is idling, overworked, or somewhere in between.

Once a Queue is determined to be under heavy load, QueueWatch triggers an event, and dispatches an AutoScale message to the Microservice consuming the heavily-laden Queue. The Microservice can then instantiate more AMQPConsumer instances in order to drain the Queue sufficiently.

Just Show Me the Code

Create a new Microservice instance called QueueWatchMicroservice; an implementation of Microservice, and add the following code to the Init method:

            var amqpQueueMetricsManager = new RabbitMQQueueMetricsManager(false, "localhost", 15672, "paul", "password");

            AMQPQueueMetricsAnalyser amqpQueueMetricsAnalyser = new RabbitMQQueueMetricsAnalyser(
                new ConsumerUtilisationTooLowAMQPQueueMetricAnalyser(
                    new ConsumptionRateIncreasedAMQPQueueMetricAnalyser(
                        new DispatchRateDecreasedAMQPQueueMetricAnalyser(
                            new QueueLengthIncreasedAMQPQueueMetricAnalyser(
                                new ConsumptionRateDecreasedAMQPQueueMetricAnalyser(
                                    new StableAMQPQueueMetricAnalyser()))))), 20);

            AMQPConsumerNotifier amqpConsumerNotifier = new RabbitMQConsumerNotifier(RabbitMQAdapter.Instance, "monitor");
            RabbitMQAdapter.Instance.Init("localhost", 5672, "paul", "password", 50);

            _queueWatch = new QueueWatch(amqpQueueMetricsManager, amqpQueueMetricsAnalyser, amqpConsumerNotifier, 5000);
            _queueWatch.AMQPQueueMetricsAnalysed += QueueWatchOnAMQPQueueMetricsAnalysed;

            _queueWatch.StartAsync();

There’s a lot to talk about here. Firstly, remember that the primary function of QueueWatch is to poll the RabbitMQ Management API. In doing so, QueueWatch returns several metrics pertaining to each Queue. We need to decide which metrics we are interested in.

Metrics are represented by implementations of AMQPQueueMetricAnalyser, and chained together as per the Chain of Responsibility Design Pattern. Each link in the chain is executed until a predefined performance condition is met. For example, let’s consider the ConsumerUtilisationTooLowAMQPQueueMetricAnalyser. This implementation of AMQPQueueMetricAnalyser inspects the ConsumerUtilisation metric, and determines whether the value is less than 99%, in which case, there are not enough consuming Microservices to adequately drain the Queue. At this point, a ConsumerUtilisationTooLow value is returned, the chain of execution ends, and QueueWatch issues an AutoScale directive:

        public override void Analyse(AMQPQueueMetric current, AMQPQueueMetric previous, ConcurrentBag<AMQPQueueMetric> busyQueues, ConcurrentBag<AMQPQueueMetric> quietQueues, int percentageDifference) {
            if (current.ConsumerUtilisation >= 0 && current.ConsumerUtilisation < 99) {
                current.AMQPQueueMetricAnalysisResult = AMQPQueueMetricAnalysisResult.ConsumerUtilisationTooLow;
                busyQueues.Add(current);
            }
            else analyser.Analyse(current, previous, busyQueues, quietQueues, percentageDifference);
        }

Scale-Out Directive

Scaling out

Scaling out

QueueWatch must issue Scale-Out directives through dedicated Queues in order to adhere to the Decoupled Middleware design. QueueWatch should not know anything about the downstream Microservices, and should instead communicate through AMQP, specifically, through a dedicated Exchange.

Each Microservice must now listen to 2 Queues. E.g., SimpleMathMicroservice will continue listening to the Math Queue, as well as a Queue called AutoScale, for the purpose of demonstration. SimpleMathMicroservice will receive Scale-Out directives through this Queue. We should modify SimpleMathMicroservice accordingly:

        public void Init() {
            _adapter = RabbitMQAdapter.Instance;
            _adapter.Init("localhost", 5672, "guest", "guest", 50);

            _rabbitMQConsumerCatchAll = new RabbitMQConsumerCatchAll("Math", 10);
            _rabbitMQConsumerCatchAll.MessageReceived += OnMessageReceived;

            _autoScaleConsumerCatchAll = new RabbitMQConsumerCatchAll("AutoScale", 10);
            _autoScaleConsumerCatchAll.MessageReceived += _autoScaleConsumerCatchAll_MessageReceived;

            _consumers.Add(_rabbitMQConsumerCatchAll);

            _adapter.Connect();
            _adapter.ConsumeAsync(_autoScaleConsumerCatchAll);
            _adapter.ConsumeAsync(_rabbitMQConsumerCatchAll);
        }

Create a Topic Exchange called “monitor”. QueueWatch will publish to this Exchange, which will route the message to an appropriate Queue. Now create a binding between the monitor Exchange and the AutoScale Queue:

Exchange Binding

Exchange Binding

Note that the Routing Key is the name of the Queue under monitor. If QueueWatch determines that the Math Queue is under load, then it will issue a Scale-Out directive to the monitor Exchange, with a Routing Key of “Math”. The monitor Exchange will react by routing the Scale-Out directive to the AutoScale Queue, to which an explicit binding exists. SimpleMathMicroservice consumes the Scale-Out directive and reacts appropriately, by instantiating a new AMQPConsumer:

            if (e.Message.Contains("scale-out")) {
                var consumer = new RabbitMQConsumerCatchAll("Math", 10);
                _adapter.ConsumeAsync(consumer);
                _consumers.Add(consumer);
            }
            else {
                if (_consumers.Count <= 1) return;
                var lastConsumer = _consumers[_consumers.Count - 1];

                _adapter.StopConsumingAsync(lastConsumer);
                _consumers.RemoveAt(_consumers.Count - 1);
            }

Summary

QueueWatch provides a means of returning key RabbitMQ Queue metrics at regular intervals, in order to determine whether demand, in terms of the number of running Microservice instances, is waxing or waning. QueueWatch also provides a means of reacting to such events, by publishing AutoScale notifications to downstream Microservices, so that they can scale accordingly, providing sufficient processing power at any given instant. The process is simplified as follows:

  1. QueueWatch returns metrics describing each Queue
  2. Queue metrics are compared against the last batch returned by QueueWatch
  3. AutoScale messages are dispatched to a Monitor Exchange
  4. AutoScale messages are routed to the appropriate Queue
  5. AutoScale messages are consumed by the intended Microservices
  6. Microservices scale appropriately, based on the AutoScale message

Next Steps

  • Prevent a “bounce” effect as traffic arbitrarily fluctuates for reasons not pertaining to application usage, such as network slow-down, or hardware failure
  • The current implementation compares metrics in a very simple fashion. Future implementations will instead graph metric metadata, and react to more thoroughly defined thresholds

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+

Microservices in C# Part 4: Scaling Out

Fork me on GitHub

Scaling Out

Scaling out our Microservices

So far, we have

  • established a simple Microservice
  • abstracted and sufficiently covered the Microservice core logic in terms of tests
  • created a reusable Microservice template
  • implemented the queue-pooling concept to ensure reliable message delivery
  • run simple load tests to adequately size Queue resources

Now it’s time to scale out. Here’s how our design currently looks:

Our current design

Our current design

This design is fine for demonstration purposes, but requires augmentation to facilitate production release. Consider that the current design will only service a single request at any given time, and will service requests in a FIFO manner, assuming that no hardware failure, or otherwise, occurs.

Even under ideal conditions, assuming that each request takes exactly 1 second to complete, given 100 inbound HTTP requests, the 1st request will complete in 1 second. The final, 100th request, will complete in 100 seconds.

Clearly, this is less than ideal. Intuitively, we might consider optimising the processing speed of our Microservice. Certainly this will help, but does little to solve the problem. Let’s say that our engineers work tirelessly to cut response times in half:

Working tirelessly to shatter response-times!

Working tirelessly to shatter response-times!

Even if they achieve this, in a batch of 100 requests, the 100th request will still take 50 seconds to complete. Instead, let’s focus on serving multiple requests in a concurrent, and potentially parallel manner. Our augmented design will be as follows:

Augmented design

Augmented design

Notice that instead of a single instance of SimpleMathMicroservice, there are now multiple instances running. How many instances do we need? That depends on 2 factors – response times and something called Quality-of-Service (QOS).

Quality of Service

Quality of Service is a feature of AMQP that defines the level of service exhibited by AMQP Channels at any given time. QOS is expressed as a percentage; 100% suggests that any given channel is utilised to maximum effect. Essentially, we need to avoid downtime in terms of channel-usage. Downtime can be described as the period of time that a Microservice is idle, or not doing work.

Typically, such scenarios occur when a Microservice is waiting on messages in transit, or is itself transmitting message-receipt acknowledgements to the Message Bus. For more information on QOS, please refer to this post. For the moment, we’re going to begin with the most intuitive design possible, without delving deeply into the complexities of QOS, and related concepts such as prefetch-count.

To that end, we are going to deploy multiple instances of our SimpleMathMicroservice (10, to be exact), and retain the default message-delivery mechanism – to read each message from a Queue one-at-a-time. In order to achieve this, we must modify our application slightly, specifically, the Global.asax.cs file. First, add a simple collection to house multiple running SimpleMathMicroservice instances:

private readonly List<SimpleMathMicroservice> _simpleMathMicroservices = new List<SimpleMathMicroservice>();

Now, instantiate 10 unique instances of SimpleMathMicroservice, initialise each instance, and add it to the collection:

            for (var i = 0; i < 10; i++) {
                var simpleMathMicroservice = new SimpleMathMicroservice();
                _simpleMathMicroservices.Add(simpleMathMicroservice);

                simpleMathMicroservice.Init();
            }

Finally, modify the Application_End function such that it gracefully shuts down each SimpleMathMicroservice instance:

            foreach (var simpleMathMicroservice in _simpleMathMicroservices) {
                simpleMathMicroservice.Shutdown();
            }

Now, on startup, 10 instances of SimpleMathMicroservice will be invoked, and will each actively listen to the Math Queue.

Message Distribution

SimpleMathMicroservice leverages a component called AMQPConsumer within the Daishi.AMQP library that defines the manner in which SimpleMathMicroservice will read messages from any given Queue. AMQPConsumer exposes a constructor that accepts a value called prefetchCount:

        protected AMQPConsumer(string queueName, int timeout, ushort prefetchCount = 1, bool noAck = false,
            bool createQueue = true, bool implicitAck = true, IDictionary<string, object> queueArgs = null) {
            this.queueName = queueName;
            this.prefetchCount = prefetchCount;
            this.noAck = noAck;
            this.createQueue = createQueue;
            this.timeout = timeout;
            this.implicitAck = implicitAck;
            this.queueArgs = queueArgs;
        }

Notice the default prefetchCount value of 1. This default setting results behaviour that allows the component to read messages one-at-a-time. It also ensures that RabbitMQ will distribute messages evenly, in a round-robin manner, among consumers. Now our application is configured to process multiple requests in a concurrent manner.

Concurrency and Parallelism

Can our application now be described a parallel? That depends. Concurrency is essentially the act of performing multiple tasks on a single CPU, or core. Parallelism on the other hand, can be described as the act of performing multiple tasks, or multiple stages of a single task, across multiple cores.

By this definition, or application certainly operates in a concurrent manner. But does it also operate in a parallel manner? That depends. Running the application on a single core machine obviously prohibits parallelism. Running on multiple cores will very likely result in parallel processing. Under the hood, the Daishi.AMQP library invokes a new thread for each Microservice operation that consumes messages from a Queue:

        public void ConsumeAsync(AMQPConsumer consumer) {
            if (!IsConnected) Connect();

            var thread = new Thread(o => consumer.Start(this));
            thread.Start();

            while (!thread.IsAlive)
                Thread.Sleep(1);
        }

“Wait, you shouldn’t invoke threads manually! That’s what ThreadPool.QueueUserWorkItem() is for!”

ThreadPool.QueueUserWorkItem() invokes threads as background operations. We require foreground threads, to ensure that the OS provides enough resources to run sufficiently, and also to prevent the OS from pre-empting the thread altogether, in cases when heavy load reduces resource availability.

Assuming that batches of newly created threads run (or are context-switched) across multiple cores, one could argue that our application exhibits parallel processing behaviour.

Run an ApacheBench load test against the running application:

ab -n 10000 -c 10 http://localhost:46653/api/math/1500

While the test is running, refer to the Math Queue in the RabbitMQ Administrator interface:

http://localhost:15672/#/queues/%2F/Math

Notice the number of Consumers (10) and the Consumer Utilisation figure. This figure represents the QOS value associated with the Queue. It should settle at the 100% mark for the duration of the test, indicating that each of all 10 SimpleMathMicroservice instances are constantly busy, and not idle:

Quality of Service

Quality of Service

Next Steps

Modify the number of running SimpleMathMicroservice instances, and apply load tests to each setting. Ideally, push the number of running instances upwards in reasonable increments (batches of 5-10) and observe the response times, comparing each run against the last.

Response times should improve incrementally, then plateau, and ultimately decrease as you increase the number of running instances. This is an indication that your application has reach critical mass, based on the law of diminishing returns. Doing this will yield the number of SimpleMathMicroservice instances that you should deploy in order to achieve optimal throughput.

Connect with me:

RSSGitHubTwitter
LinkedInYouTubeGoogle+