Showing posts with label Cloud. Show all posts
Showing posts with label Cloud. Show all posts

Friday, 3 April 2015

Utilisation and High Availability analysis: Containers for Microservices

Microservices? Is this not the same SOA principles repackaged and sold under different label? Not this time, I will attend this question in another posts. But if you are considering Microservices for your architecture, beware of the cost and availability concerns. In this post we will look at how using containers (such as Docker) can help you improve your cloud utilisation, decrease costs and above all improve availability.

Elephant in the room: most of the cloud resources are under-utilised

We almost universally underestimate how long it takes to build a software feature. Not sure it is because our time is felt more precious than money, but for hardware almost always the reverse is true: we always overestimate hardware requirements of our systems. Historically this could have been useful since commissioning hardware in enterprises usually a long and painful process and on the other hand, this included business growth over the years and planned contingency for spikes.
But in an elastic environment such as cloud? Well it seems we still do that. In UK alone £1bn is wasted on unused or under-utilised cloud resource.

Some of this is avoidable, by using elasticity of the cloud and scaling up and down as needed. Many cloud vendors provide such functionality out of the box with little or no coding. But many companies already do that, so why waste is so high?

From personal experience I can give you a few reasons why my systems do that...

Instance Redundancy

Redundancy is one of the biggest killers in the computing costs. And things do not change a lot being in the cloud: vendors' availability SLAs usually are defined in a context of redundancy and to be frank, some of it purely cloud related. For example, on Azure you need to have your VMs in an "availability set" to qualify for VM SLAs. In other words, at least 2 or more VMs are needed since your VMs could be taken for patching at any time but within an availability zone this is guaranteed not to happen on all machines in the same availability zone at the same time.

The problem is, unless you are company with massive number of customers, even a small instance VM could suffice for your needs - or even for a big company with many internal services, some services might not need big resource allocation.

Looking from another angle, adopting Microservices will mean you can iterate your services more quickly releasing more often. The catch is, the clients will not be able to upgrade at the same time and you have to be prepared to run multiple versions of the same service/microservice. Old versions of the API cannot be decommissioned until all clients are weaned off the old one and moved to the newer versions. Translation? Well some of your versions will have to run on the shoestring budget to justify their existence.

Containerisation helps you to tap into this resource, reducing the cost by running multiple services on the same VM. A system usually requires at least 2 or 3 active instances - allowing for redundancy. Small services loaded into containers can be co-located on the same instances allowing for higher utilisation of the resources and reduction of cost.

Improved utilisation by service co-location



This ain't rocket science...

Resource Redundancy

Most services have different resource requirements. Whether Network, Disk, CPU or memory, some resources are used more heavily that others. A service encapsulating an algorithm will be mainly CPU-heavy while an HTTP API could benefit from local caching of resources. While cloud vendors provide different VM setups that can be geared for memory, Disk IO or CPU, a system still usually leaves a lot of redundant resources.

Possible best explained in the pictures below. No rocket science here either but mixing services that have different resource allocation profiles gives us best utilisation.


Co-location of Microservices having different resource allocation profile


And what's that got to do with Microservices?

Didn't you just see it?! Building smaller services pushes you towards building ad deploying more services many of which need the High Availability provided by the redundancy but not the price tag associated with it.

Docker is absolutely a must-have if you are doing Microservices or you are paying through the nose for your cloud costs. In QCon London 2015, John Wilkes from Google explained how they "start over 2 billion containers per week". In fact, to be able to take advantage of the spare resources on the VMs, they tend to mix their Production and Batch processes. One difference here is that the Live processes require locked allocated resources while the Batch processes take whatever is left. They analysed the optimum percentages minimising the errors while keeping utilisation high.

Containerisation and availability

As we discussed, optimising utilisation becomes a big problem when you have many many services - and their multiple versions - to run. But what would that mean in terms of Availability? Does containerisation improve or hinder your availability metrics? I have not been able to find much in the literature but as I will explain below, even if you do not have small services requiring VM co-location, you are better off co-locating and spreading the service onto more machines. And it even helps you achieve higher utilisation.

By having spreading your architecture to more Microservices, availability of your overall service (the one the customer sees) is a factor of availability of each Microservice. For instance, if you have 10 Microservices with availability of 4 9s (99.99%), the overall availability drops to 3 9s (99.9%). And if you have 100 Microservice, which is not uncommon, obviously this drops to only two 9s (99%). In this term, you would need to strive for a very high Microservice availability.

Hardware failure is very common and for many components it goes above 1% (Annualised Failure Rate). Defining hardware and platform availability in respect to system availability is not very easy. But for simplicity and the purpose of this study, let's assume failure risk of 1% - at the end of the day our resultant downtime will scale accordingly.

If service A is deployed onto 3 VMs, and one VM goes down (1%), other two instances will have to bear the extra load until another instance is spawned - which will take some time. The capacity planning can leave enough spare resources to deal with this situation but if two VMs go down (0.01%), it will most likely bring down the service as it would not be able cope with the extra load. If the Mean Time to Recovery is 20 minutes, this alone will dent your service Microservice availability by around half of 4 9s! If you have worked hard in this field, you would know how difficult it is to gain those 9s and losing them like that is not an option.

So what's the solution? This diagram should speak for more words:

Service A and B co-located in containers, can tolerate more VM failures

By using containers and co-locating services, we spread instance more thinly and can tolerate more failures. In the example above, our services can tolerate 2 or maybe even 3 VM failures at the same time.

Conclusion

Containerisation (or Docker if you will) is a must if you are considering Microservices. It helps you with increasing utilisation, bringing down cloud costs and above all, improves your availability.


Sunday, 5 October 2014

What should I do?

Level [C1]

TLDR; : I was charged for a huge egress on one of my VMs and I have no way of knowing what caused it or whether it was an infrastructure glitch nothing to do with VM.

OK, here is the snippet of the last email I received back:

"I understand what you’re saying. Because this involves a non-windows VM, we wouldn’t be able to determine what caused this. we can only validate the usage, and as you already know, the data usage seems quite appropriate, comparing to our logs. Had this been a Windows machine, we could have engaged another team(s) to have this matter looked into. As of now, I am afraid, this is all we have. You might want to check with Ubuntu support to see what has caused this."

The story started two weeks ago. I have, you know, MSDN account courtesy of my work which provides around £95/mo free Windows Azure credit - for which I am really grateful. It has allowed me to run some kinda pre-startup stuff on a shoestring. I recently realised my free credit can take you so far so started using Azure services more liberally knowing that I am going to be charged. At the end of the day, nothing valuable comes out of nothing. But before doing that, I also registered for AWS and as you know, it provides some level of free services which I again took advantage of.

But I have not said anything about the problem yet. It was around the end of the month and I knew my remaining credit would be enough to carry me to the next month. Then I noticed my credit panel turning orange from green (this is quite handy, telling you with the rate of usage you will soon run out of credit) which I thought was bizarre and then next day I realised all my services had disappeared. Totally gone! Bang! I had run out of credit...

This was a Saturday and I spent Saturday and Sunday reinstating my services. So I learnt the lesson that I need remove spending cap, which is not the reason why you read this. The reason I ran out of credit was due to egress (=data out) from one of my Linux boxes... so this box used to have an egress of a few MB to max few hundred MB a day and suddenly shoot up to 175GB and 186GB! OK, either there is a mistake or my box has been hacked into - with the latter more likely.

Here is the egress from that "renegade" Linux box:
8/30/2014 "Data Transfer Out (GB)" "GB" 0.004967
8/31/2014 "Data Transfer Out (GB)" "GB" 0.006748
9/1/2014 "Data Transfer Out (GB)" "GB" 0.001735
9/2/2014 "Data Transfer Out (GB)" "GB" 0.17618
9/3/2014 "Data Transfer Out (GB)" "GB" 0.003499
9/4/2014 "Data Transfer Out (GB)" "GB" 0.013394
9/5/2014 "Data Transfer Out (GB)" "GB" 0.016147
9/6/2014 "Data Transfer Out (GB)" "GB" 0.005412
9/7/2014 "Data Transfer Out (GB)" "GB" 0.005803
9/8/2014 "Data Transfer Out (GB)" "GB" 0.001547
9/9/2014 "Data Transfer Out (GB)" "GB" 0.003044
9/10/2014 "Data Transfer Out (GB)" "GB" 0.002179
9/11/2014 "Data Transfer Out (GB)" "GB" 0.02876
9/12/2014 "Data Transfer Out (GB)" "GB" 0.008922
9/13/2014 "Data Transfer Out (GB)" "GB" 0.28983
9/14/2014 "Data Transfer Out (GB)" "GB" 0.099229
9/15/2014 "Data Transfer Out (GB)" "GB" 0.002653
9/16/2014 "Data Transfer Out (GB)" "GB" 0.00191
9/17/2014 "Data Transfer Out (GB)" "GB" 0.00182
9/18/2014 "Data Transfer Out (GB)" "GB" 175.69292
9/19/2014 "Data Transfer Out (GB)" "GB" 182.974478

This box was running an ElasticSearch instance which had barely 1GB of data. And yes, it was not protected so it could have been hacked into. So what I did, with a bunch of bash commands which I conveniently copied and pasted from google searches, was to create a list files that were changed on the box ordered by the date and send to the support. There was nothing suspicious there - and the support team did not find it any more useful [in fact the comment was that it was "poorly formatted", I assume due to the difference in new line character in linux :) ].

So it seemed less likely that it was hacked but maybe someone has been running queries against the ElasticSearch which had been secured only by its obscurity. But hang on! If that were the case, the ingress should somehow correspond:
8/30/2014 "Data Transfer In (GB)" "GB" 0.004335
8/31/2014 "Data Transfer In (GB)" "GB" 0.005579
9/1/2014 "Data Transfer In (GB)" "GB" 0.000744
9/2/2014 "Data Transfer In (GB)" "GB" 0.021571
9/3/2014 "Data Transfer In (GB)" "GB" 0.002983
9/4/2014 "Data Transfer In (GB)" "GB" 0.002571
9/5/2014 "Data Transfer In (GB)" "GB" 0.002961
9/6/2014 "Data Transfer In (GB)" "GB" 0.001994
9/7/2014 "Data Transfer In (GB)" "GB" 0.001642
9/8/2014 "Data Transfer In (GB)" "GB" 0.000483
9/9/2014 "Data Transfer In (GB)" "GB" 0.001879
9/10/2014 "Data Transfer In (GB)" "GB" 0.002022
9/11/2014 "Data Transfer In (GB)" "GB" 0.017067
9/12/2014 "Data Transfer In (GB)" "GB" 0.002644
9/13/2014 "Data Transfer In (GB)" "GB" 0.347959
9/14/2014 "Data Transfer In (GB)" "GB" 0.089146
9/15/2014 "Data Transfer In (GB)" "GB" 0.000404
9/16/2014 "Data Transfer In (GB)" "GB" 0.001912
9/17/2014 "Data Transfer In (GB)" "GB" 0.001733
9/18/2014 "Data Transfer In (GB)" "GB" 0.012967
9/19/2014 "Data Transfer In (GB)" "GB" 0.021446

which it does in all days other than 18th and 19th. Which made me think, it was perhaps all a mistake and maybe an Azure infrastructure agent or something has gone out of control and started doing this.

So I asked the support to start investigating the issue. And it took a week to get back to me and the investigation provided only the hourly breakdown (and I was hoping for more, perhaps some kind of explanation or identifying the IP address all this egress was going). The pattern is also bizarre. For example on 19th (at the end of which my credit ran out):
2014-09-18T00:00:00 2014-09-18T01:00:00 DataTrOut 166428 External
2014-09-18T01:00:00 2014-09-18T02:00:00 DataTrOut 374040 External
2014-09-18T02:00:00 2014-09-18T03:00:00 DataTrOut 2588121384 External
2014-09-18T03:00:00 2014-09-18T04:00:00 DataTrOut 539993671 External
2014-09-18T04:00:00 2014-09-18T05:00:00 DataTrOut 1128216 External
2014-09-18T05:00:00 2014-09-18T06:00:00 DataTrOut 25462 External
2014-09-18T06:00:00 2014-09-18T07:00:00 DataTrOut 18308 AM2
2014-09-18T06:00:00 2014-09-18T07:00:00 DataTrOut 63250 External
2014-09-18T07:00:00 2014-09-18T08:00:00 DataTrOut 24588 External
2014-09-18T08:00:00 2014-09-18T09:00:00 DataTrOut 82296 External
2014-09-18T09:00:00 2014-09-18T10:00:00 DataTrOut 59362 External
2014-09-18T10:00:00 2014-09-18T11:00:00 DataTrOut 10573316727 External
2014-09-18T11:00:00 2014-09-18T12:00:00 DataTrOut 11443247791 External
2014-09-18T12:00:00 2014-09-18T13:00:00 DataTrOut 13854724048 External
2014-09-18T13:00:00 2014-09-18T14:00:00 DataTrOut 8115190263 External
2014-09-18T14:00:00 2014-09-18T15:00:00 DataTrOut 13748807057 External
2014-09-18T15:00:00 2014-09-18T16:00:00 DataTrOut 10389478694 External
2014-09-18T16:00:00 2014-09-18T17:00:00 DataTrOut 19979259451 External
2014-09-18T17:00:00 2014-09-18T18:00:00 DataTrOut 21398993891 External
2014-09-18T18:00:00 2014-09-18T19:00:00 DataTrOut 22843598777 External
2014-09-18T19:00:00 2014-09-18T20:00:00 DataTrOut 23087199863 External
2014-09-18T20:00:00 2014-09-18T21:00:00 DataTrOut 16958070173 External
2014-09-18T21:00:00 2014-09-18T22:00:00 DataTrOut 13126214430 External
2014-09-18T22:00:00 2014-09-18T23:00:00 DataTrOut 352327 External
2014-09-18T23:00:00 2014-09-19T00:00:00 DataTrOut 358377 External

So what should I do?

So first of all, I have now put the ElasticSearch box behind a proxy and access to it requires authentication with the proxy. And better to do it now rather than later. And the ES box now is protected by IPSec.

But really the big question is, when you are on cloud and you don't own any of the infrastructure or its monitoring, how can you make sure you are being charged fairly. My £40 bill for the egress is not huge but makes me wonder, what if it happens again? What would I do?

There are also other questions: would that have been different on another provider? I am not really sure [although at least they could have opened a file with Linux line ending :) ] but the usage of a cloud platform requires building a trust relationship which is essential. I really appreciate the general attitude of Azure (and Microsoft) towards Open Source in embracing everything non-Windows and I think it is the right direction, but I think the support model should be also developed in line with that. AWS is a more mature platform but have you seen anything like this there? And if yes, how was your experience?


Monday, 2 June 2014

BeeHive Series - Part 2 - Importing file from blob storage to ElasticSearch sample

[Level T1]

In the previous post, we introduced BeeHive and talked about an example usage where we check news feeds and send a notification if a keyword is found. In this post, we look at another example. You can find the source code in the BeeHive Github repo. Just open up BeeHive.Samples.sln file.

Processing files

Let's imagine we receive files in a particular blob location and we need to import/process them into the system. These files arrive in a particular folder structure and we need to watch the root folder. Then we need to pick them up, extract each row and send each record to be processed - in this case to be loaded onto an ElasticSearch cluster.
ElasticSearch is a horizontally-scalable and highly-available indexing and search technology. It runs on Windows, Linux and OSX, easy to setup and free to use. You can download the installer from http://www.elasticsearch.org/

NewFileArrived

So here, we design a system that watches the location and when it finds the files, it raises NewFileArrived event. This is a simple enough process yet what if we have multiple actors watching the location (very likely for a cloud scenario where the same process runs on many machines)? In this case we will receive multiple NewFileArrived events.
BeeHive provides pulsers that help you with your concurrency problems. FolderWatcherActor can subscribe to a topic that is fed by a pulser. In fact, in a BeeHive world, you could have pulsers that raise events at different intervals and raise events such as FiveMinutesPassedAnHourPassedADayPassed, etc and based on the requirement, your actors could be subscribing to any of these. Beauty ofmessage-based scheduling is that only a single instance of the actor will be receiving the message.
Raising the NewFileArrived event is not enough. When the actor wakes up again by receiving the next message and the file is there, it will send another NewFileArrived error. We can protect against this by:
1) Making processing Idempotent 2) Keep track of files received 3) Mark files by creating a status file next to them
We choose the last option so we can use the same status file further down. So after identifying the file, we create a file with the same name plus .status and write the status number, here 1.

public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var events = new List<Event>();
    var items = (await _dynamoStore.ListAsync(
        _configurationValueProvider.GetValue(Constants.SweepRootPathKey)))
        .ToArray();

    var notProcessed = items.Where(x => !x.IsVirtualFolder)
        .GroupBy(z => z.Id.Replace(Constants.StatusPostfix, ""))
        .Where(f => f.Count() == 1)
        .Select(w => w.Single());

    foreach (var blob in notProcessed)
    {
        events.Add(new Event(new NewFileArrived()
        {
            FileId = blob.Id
        }));
        await _dynamoStore.InsertAsync(new SimpleBlob()
        {
            Id = blob.Id + Constants.StatusPostfix,
            Body = new MemoryStream(BitConverter.GetBytes(1)) // status 1
        });
    }

    return events;
}

Process the file: fan-out the records

After receiving the NewFileArrived, we copy the file locally and split the file to the records and fan out the records with ImportRecordExtracted. We also send a ImportFileProcessed event.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var newFileArrived = evnt.GetBody<NewFileArrived>();
    var blob = await _dynamoStore.GetAsync(newFileArrived.FileId);
    var reader = new StreamReader(blob.Body);
    string line = string.Empty;
    var events = new List<Event>();
    while ((line = reader.ReadLine())!= null)
    {
        var fields = line.Split(new []{','},StringSplitOptions.RemoveEmptyEntries);
        events.Add(new Event( new ImportRecordExtracted()
        {
            Id = fields[0],
            Content = fields[2],
            IndexType = fields[1]
        }));
    }

    events.Add(new Event(new ImportFileProcessed()
    {
        FileId = newFileArrived.FileId
    }));

    return events;
}

ImportFileProcessed

The actor receiving this event will delete the file and the status file.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var importFileProcessed = evnt.GetBody<ImportFileProcessed>();
    var statusFile = importFileProcessed.FileId + Constants.StatusPostfix;

    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = importFileProcessed.FileId
    });
    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = statusFile
    });

    return new Event[0];
}

ImportRecordExtracted

Based on the type of the record, we "upsert" the record in the appropriate index in our ElasticSearch cluster.
public async Task> ProcessAsync(Event evnt)
{
    var importRecordExtracted = evnt.GetBody();
    var elasticSearchUrl = _configurationValueProvider.GetValue(Constants.ElasticSearchUrlKey);

    var client = new HttpClient();
    var url = string.Format("{0}/import/{1}/{2}", elasticSearchUrl,
        importRecordExtracted.IndexType,
        importRecordExtracted.Id);
    var responseMessage = await client.PutAsJsonAsync(url, importRecordExtracted);

    if (!responseMessage.IsSuccessStatusCode)
    {
        throw new ApplicationException("Indexing failed. " 
            + responseMessage.ToString());
    }

    return new[]
    {
        new Event(new NewIndexUpserted()
        {
            IndexUrl = url
        }) 
    };
}

NewIndexUpserted

While we currently do not need to know when we add or update an index in the ElasticSearch, this can later be used by other processes, so it is best to provision the event. As we said before, BeeHive events are meaningful business milestones that may or may not be used by your current system.

Here are our indexes when browsing to http://localhost:9200/import/_search

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 14,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "import",
      "_type" : "D",
      "_id" : "4",
      "_score" : 1.0, "_source" : {"Id":"4","IndexType":"D","Content":"These are controlled by min_term_freq"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "9",
      "_score" : 1.0, "_source" : {"Id":"9","IndexType":"E","Content":"There are other parameters such as min_word_length"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "11",
      "_score" : 1.0, "_source" : {"Id":"11","IndexType":"E","Content":"In order to give more weight to more interesting terms"}
    }, {
      "_index" : "import",
      "_type" : "A",
      "_id" : "2",
      "_score" : 1.0, "_source" : {"Id":"2","IndexType":"A","Content":"clauses in a bool query of interesting terms extracted from some provided text. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "7",
      "_score" : 1.0, "_source" : {"Id":"7","IndexType":"D","Content":"controlled by percent_terms_to_match. The terms are extracted from like_text "}
    }, {
      "_index" : "import",
      "_type" : "H",
      "_id" : "14",
      "_score" : 1.0, "_source" : {"Id":"14","IndexType":"H","Content":"score times some boosting factor boost_terms."}
    }, {
      "_index" : "import",
      "_type" : "B",
      "_id" : "3",
      "_score" : 1.0, "_source" : {"Id":"3","IndexType":"B","Content":"The interesting terms are selected with respect to their tf-idf scores. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "8",
      "_score" : 1.0, "_source" : {"Id":"8","IndexType":"D","Content":"which is analyzed by the analyzer associated with the field"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "10",
      "_score" : 1.0, "_source" : {"Id":"10","IndexType":"E","Content":"max_word_length or stop_words to control what terms should be considered as interesting. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "5",
      "_score" : 1.0, "_source" : {"Id":"5","IndexType":"D","Content":"The number of interesting terms is controlled by max_query_terms. "}
    } ]
  }
}


Cleanup processes

In the absence of transactions, business processes have to design the processes for failure. BeeHive promotes an approach that every process is broken down to its smallest elements and each implemented in an actor.

Sometimes it is necessary to design processes that look for the highly unlikely (yet possible) event of a failure when actor has done its work but the events returned never make it back to the service bus. In the case of inserting the new index, this is not a problem since we use PUT and the process is idempotent. However, this could be a problem in case of processing file where a status file is created but NewFileArrived never makes it back to the service bus. In this case, a crash unlocker process that checks the timestamp of the status file and deletes the file if older than e.g. 1 day, is all that is needed.

Conclusion

We can use pulsers to solve the inherent concurrency problem of multiple folder watcher actors watching the same folder. The fan-out process of breaking a file down to its record and parallilising the processing is one of the key benefits of cloud actors.



Sunday, 13 April 2014

Reactive Cloud Actors: no-nonsense MicroServices

[Level C3]

This post is not directly about MicroServices. If that is why you are reading it, might as well stop now. Apparently, we are still waiting as for the definition to be finally ratified. The definition, as it stands now, is blurry - Martin Fowler admits. This post is about Actors - the cloud ones - you know. After finishing reading it, I hope I have made it effortlessly clear how Reactive Cloud Actors are the real MicroServices, rather than albeit light RESTful Imperative MicroServices.

Watching Fred George delivering an excellent talk on High-Performance Bus inspired me to start working on the actors. I am still working on a final article on the subject but this post is basically a primer on that - as well as announcement of BeeHive mini-framework. The next section on actors is taken from that article which covers the essential theoretical background. Before we start, let's make it clear that the term Reactive is not used in the context of Reactive Extensions (Rx) or Frameworks, only in contrast to imperative (RPC-based) actors. Also RPC-based is not used in contrast to RESTful, but it simply means a system which relies on command and query messages rather than events.

UPDATE: The article is now published on infoq here

Actors

Carl Hewitt, along with Peter Bishop and Richard Steiger, published an article back in 1973 that proposed a formalism that identified a single class of objects, i.e. Actors, as the building blocks of systems designed to implement Artificial Intelligence algorithms.

According to Hewitt an actor, in response to a message, can:
  1. Send a finite number of messages to other actors
  2. create a finite number of other actors
  3. decide on the behaviour to be used for the next message it receives
Any combination of these actions can occur concurrently and in response to messages arriving in any order - as such, there is no constraint with regard to ordering and an actor implementation must be able to handle messages arriving out of band. However, I believe it is best to separate these responsibilities as below.

Processor Actor

In a later description of the Actor Model, first constraint is re-defined as "send a finite number of messages to the address of other actors". Addressing is an integral part of the model that decouples actors and limits the knowledge of actors from each other to mere a token (i.e. address). Familiar implementation of addressing includes Web Services endpoints, Publish/Subscribe queue endpoints and email addresses. Actors that respond to a message by using the first constraint can be called Processor Actors.

Factory Actor

Second constraint makes actors capable of creating other actors that we conveniently call Factory Actors. Factory actors are important elements of a message-driven system where an actor is consuming from a message queue and create handlers based on the message type. Factory actors control the lifetime of the actors they create and have a deeper knowledge of the actors they create - compared to processor actors knowing mere an address. It is useful to separate factory actors from processing ones - in line with the single responsibility principle.

Stateful Actor

Third constraint is the Stateful Actor. Actors capable of the third constraint have a memory that allows them to react differently on subsequent messages. Such actors can be subject to a myriad of side-effects. Firstly, when we talk about "subsequent messages" we inherently assume an ordering while as we said, there is no constraint with regard to ordering: an out of band message arrival can lead to complications. Secondly, all aspects of CAP applies to this memory making a consistent yet highly available and partition tolerant impossible to achieve. In short, it is best to avoid stateful actors.

Modelling a Processor Actor

"Please open your eyes, Try to realise, I found out today we're going wrong, We're going wrong" - Cream
[Mind you there is only a glimpse of Ginger Baker visible while the song is heavily reliant on Ginger's drumming. And yeah, this goes back to a time when Eric played Gibson and not his signature Strat]


This is where most of us can go wrong. We do that, sometimes for 4 years - without realising it. This is by no means a reference to a certain project [... cough ... Orleans ... cough] that has been brewing (Strange Brew pun intended) for 4 years and coming up with an imperative, RPC-based, RESTfully coupled Micro-APIs. We know it, doing simple is hard - and we go wrong, i.e. we do the opposite: build really complex frameworks. 

I was chatting away on twitter with a few friends and I was saying "if you need a full-blown and complex framework to do actors, you are probably doing it wrong". All you need is a few interfaces, and some helpers doing the boilerplate stuff. This stuff ain't no rocket science, let's not turn it into.

The essence of the Reactive Cloud Actor is the interface below (part of BeeHive mini-framework introduced below):
    /// <summary>
    /// Processes an event.
    /// 
    /// Queue name containing the messages that the actor can process messages of.
    /// It can be in the format of [queueName] or [topicName]-[subscriptionName]
    /// </summary>
    public interface IProcessorActor : IDisposable
    {
        /// <summary>
        /// Asynchrnous processing of the message
        /// </summary>
        /// <param name="evnt">Event to process</param>
        /// <returns>Typically contains 0-1 messages. Exceptionally more than 1</returns>
        Task<IEnumerable<Event>> ProcessAsync(Event evnt);

    }

Yes, that is all. All of your business can be captured by the universal method above. Don't you believe that? Just have a look a non-trivial eCommerce example implemented using this single method.

So why Reactive (Event-based) and not Imperative (RPC-based)? Because in a reactive actor system, each actor only knows about its own Step and what itself does and has no clue about the next steps or the rest of the system - i.e. actors are decoupled leading to independence which facilitates actor Application Lifecycle Management and DevOps deployment.
As can be seen above, Imperative actors know about their actor dependencies while Reactive actors have no dependency other than the queues, basic data structure stores and external systems. Imperative actors communicate with other actors via a message store/bus and invoke method calls. We have been this for years, in different Enterprise Service Bus integrations, this one only brings it to a micro level which makes the pains event worse.

So let's bring an example: fraud check of an order.

Imperative
PaymentActor, after a successful payment for an order, calls the FraudCheckActor. FraudCheckActor calls external fraud check systems. After identifying a fraud, it calls CancelOrderActor to cancel the order. So as you can see, PaymentActor knows about and depends on FraudCheckActor. In the same way, FraudCheckActor depends on CancelOrderActor. They are coupled.

Reactive
PaymentActor, upon successful payment, raises PaymentAuthorised event. FraudCheckActor is one of its subscribers and after receiving this event checks for fraud and if one detected, raises FraudDetected event. CancelOrderActor subscribers to some events, including FraudDetected upon receiving which it cancels the order. None of these actors know about the other. They are decoupled.

So which one is better? By the way, none of this is new - we have been doing it for years. But it is important to identify why we should avoid the first and not to "go wrong".

Reactive Cloud Actors proposal


After categorising the actors, here I propose the following constraints for Reactive Cloud Actors:
  • A reactive system that communicates by sending events
  • Events are defined as a time-stamped, immutable, unique and eternally-true piece of information
  • Events have types
  • Events are stored in a Highly-Available cloud storage queues allowing topics
  • Queues must support delaying
  • Processor Actors react to receiving a single and then do some processing and then send back usually one (sometimes zero and rarely more than one) event
  • Processor Actors have type - implemented as a class
  • Processing should involve minimal number of steps, almost always a single step
  • Processing of the events are designed to be Idempotent
  • Each Processor Actor can receive one or more event types - all of which defined by Actor Description
  • Factory Actors responsible for managing the lifetime of processor actors
  • Actors are deployed to cloud nodes. Each node contains one Factory Actor and can create one or more Processor Actor depending on its configuration. Grouping of actors depends on cost vs. ease of deployment.
  • In addition to events, there are other Basic Data Structures that contain state and are stored in Highly-Available cloud storage (See below on Basic Data Structures)
  • There are no Stateful Actors. All state is managed by the Basic Data Structures and events.
  • This forms an evolvable web of events which can define flexible workflows
Breaking down all the processes into single steps is very important. A Highly-Available yet Eventually-Consistent system can handle delays but cannot easily bind multiple steps into a single transaction.

So how can we implement this? Is this gonna work?

Introducing BeeHive


 BeeHive is a vendor-agnostic Reactive Actor mini-framework I have been working over the last three months. It is implemented in C# but frankly could be done in any language supporting Asynchronous programming (promises) such as Java or node.

The cloud implementation has been only implemented for Azure but implementing another cloud vendor is basically implementing 4-5 interfaces. It also comes with an In-Memory implementation too which is only targeted at implementing demos. This framework is not meant to be used as an in-process actor framework.

It implements Prismo eCommerce example which is an imaginary eCommerce system and has been implemented for both In-Memory and Azure. This example is non-trivial has some tricky scenarios that have to implement Scatter-Gather sagas. There is also a Boomerang pattern event that turns a multi-step process into regurgitating an event a few times until all steps are done (this requires another post).

An event is model as:

[Serializable]
public sealed class Event : ICloneable
{

    public static readonly Event Empty = new Event();

    public Event(object body)
        : this()
    {        
       ...
    }

    public Event()
    {
        ...
    }

    /// <summary>
    /// Mrks when the event happened. Normally a UTC datetime.
    /// </summary>
    public DateTimeOffset Timestamp { get; set; }

    /// <summary>
    /// Normally a GUID
    /// </summary>
    public string Id { get; private set; }

    /// <summary>
    /// Optional URL to the body of the message if Body can be retrieved 
    /// from this URL
    /// </summary>
    public string Url { get; set; }

    /// <summary>
    /// Content-Type of the Body. Usually a Mime-Type
    /// Typically body is a serialised JSON and content type is application/[.NET Type]+json
    /// </summary>
    public string ContentType { get; set; }
        
    /// <summary>
    /// String content of the body.
    /// Typically a serialised JSON
    /// </summary>
    public string Body { get; set; }

    /// <summary>
    /// Type of the event. This must be set at the time of creation of event before PushAsync
    /// </summary>
    public string EventType { get; set; }

    /// <summary>
    /// Underlying queue message (e.g. BrokeredMessage in case of Azure)
    /// </summary>
    public object UnderlyingMessage { get; set; }

    /// <summary>
    /// This MUST be set by the Queue Operator upon Creation of message usually in NextAsync!!
    /// </summary>
    public string QueueName { get; set; }


    public T GetBody<T>()
    {
       ...
    }

    public object Clone()
    {
        ...
    }
}

As can be seen, Body has been defined as a string since BeeHive uses JSON serialisation. This can be made flexible but in reality events should normally contain small amount of data and mainly basic data types such as GUID, integer, string, boolean and DateTime. Any binary data should be stored in Azure Blob Storage or S3 and then path referenced here.

BeeHive Basic Data Structures

This is a work-in-progress but nearly done part of the BeeHive to define a minimal set of Basic Data Structures (and their stores) to cover all required data needs of Reactive Actors. These structures are defined as interfaces that can be implemented for different cloud platforms. This list as it stands now:
  • Topic-based and simple Queues
  • Key-Values
  • Collections
  • Keyed Lists
  • Counters

Some of these data structures hold entities within the system which should implement a simple interface:

public interface IHaveIdentity
{
    Guid Id { get; }
}

Optionally, entities could be Concurrency-Aware for updates and deletes in which case they will implement an additional interface:

public interface IConcurrencyAware
{
    DateTimeOffset? LastModofied { get; }

    string ETag { get; }
}


Prismo eCommerce sample

Prismo eCommerce is an imaginary eCommerce company that receives orders and processes them. The order taking relies on an eventually consistent (and delayed) read model of the inventory hence orders can be accepted for which items are out of stock. Process waits until all items are in stock or if out of stock, they arrive back in stock until it sends them to fulfilment. 

Prismo eCommerce states (solid), transitions (arrows) and processes (dashed)

This sample has been implemented both In-Memory and for Windows Azure. In both cases, the tracing can be used to see what is happening outside. In this sample all actors are configured to run in a single worker role, although they can each run in their own roles. I might provide a UI to show the status of each order as they go through statuses.

Conclusion

Reactive Cloud Actors are the way to implement decoupled MicroServices. By individualising actor definitions (Processor Actor and Factory Actor) and avoiding Stateful Actors, we can build resilient and Highly Available cloud based systems. Such systems will comprise a evolvable webs of events which each web defines a business capability. I don't know about you but this is how I am going to build my cloud systems.

Watch the space, the article is on its way.