Wednesday, 6 August 2014

Thank you Microsoft, nine months on ...

Level [C1]

I felt that almost a year after my blog post Thank you Microsoft and so long, it is a right time to look back and contemplate on the decision I made back then. If you have not read the post, well in a nutshell, I decided to gradually move towards non-Microsoft technologies - mainly due to Microsoft's lack of innovation, especially in the Big Data space.

A few things have changed since then. TLDR; Generally it really feels I made the right decision to diversify. Personally, I have learnt a lot and at the same time I had a lot of fun. I have built a bridge to the other side and I can easily communicate with non-Microsoft peers and translate my skills. On the technology landscape, however, there has been some major changes that makes me feel having a hybrid skillset is much more important than a complete shift to any particular platform.

I will first look at the technology landscape as it stands now and then will share my personal journey so far in adopting alternative platforms and technologies.

A point on the predictions

OK, I made some predictions in the previous post such as "In 5 years, all data problems become Big Data problems". Some felt this is completely wrong - which could be - and left some not very nice messages. At the end of the day, predictions are free (and that is why I like them) and you could do the same. I am sharing my views, take it as it is worth for you. I have a track record of making predictions, some came true and some did not. I predicted a massive financial crash in 2011 which did not happen and lead to one of the biggest bull markets ever (well my view is they pumped money into the economy and artificially made the bull market) and I lost some money. On the other hand back in 2010 I predicted in my StackOverflow profile something that I think it is called Internet Of Things now, so I guess I was lucky (by the way, I am predicting a financial crash in the next couple of months). Anyway, take it easy :)

Technology Horizon

The New Microsoft

Since I wrote the blog post, a lot has changed, especially in Microsoft. It now has a new CEO and a radically different view on the Open Source. Releasing the source of a big chunk of the .NET Framework is harbinger of a shift whose size is difficult to guess at the moment. Is it mere a gesture? I doubt it, this adoption was the result of years of internal campaign from the likes of Phil Haack and Scott Hanselman and it has finally worked its way up the hierarchy.

But adopting Open Source is not just a community service gesture, it has an important financial significance. With the rate of change in the industry, you need to keep an army of developers to constantly work and improve products at this scale. No company is big enough on its own to build what can be built by an organic and healthy ecosystem. So crowd-sourcing is an important technique to improve your product without paying for the time spent. It is probably true that the community around your product is the real IP of most cloud platforms and not so much the actual code.

Microsoft is also relinquishing its push strategy towards its Operating System and to be honest, I am not surprised at all. Many have talked about the WebOS but reality is we have already had it for the last couple of years. Your small smartphone or tablet come to life when they are connected - enabling you to do most of what you can do on the laptop/pc with the only limitation being the screen size. On the other hand, Microsoft has released the web version of the office and to be fair it is capable of doing pretty much everything you can do in the desktop versions, and sometimes it does it better. So for the majority of consumers, all you need is the WebOS. It feels that the value of a desktop operating system becomes of less and less importance when most of the applications you use daily are web-based or cloud-based.


Cloud and Azure

I have been doing a lot of Azure both at work and outside it. Apart from HDInsight, I think Azure is expanding at a phenomenal rate in both feature and reliability and this is where I feel Microsoft is closing in the Innovation Gap. It is mind-blowing to look at the list of new features that are coming out of Azure every month.

Focusing mainly on the PaaS products, I think future of Azure in terms of adoption by the smaller companies is looking more and more attractive compared to AWS which has traditionally been IaaS platform of choice. Companies like Netflix have built all their software empire on AWS but they had an army of great developers to write the tooling and integration stuff.

All-in-all I feel Azure is here to stay and might even overtake AWS in the next 5 years. What will be a decider is the innovation pace.

Non-Hadoop platforms

A recent trend that could change the balance is the proliferation of non-Hadoop approaches to Big Data which will favour Microsoft and Google. With Hadoop 2.0 trying to abstract away even more the algorithm from the resource management, I think there is an opportunity for Microsoft to jump in and plug-in a whole host of Microsoft languages in a real way - it was possible to use C# and F# before but no one really used it.

Microsoft announced the release AzureML which is the PaaS offering of Machine Learning on the Azure Platform. It is early to say but it looks like this could be used for smaller-than-big-date machine learning and analysis. This platform is basically productionising of the Machine Learning platform behind the Bing search engine.

Another sign that the Hadoop's elephant is getting old is Google's announcement to drop MapReduce: "We invented it and now we are retiring it". Basically in-memory processing looks more and more appealing due to the need for quicker feedback cycle and speeding up processes. Also it seems that there is resurgence of focus towards in-memory grid computing, perhaps as a result of Actor Frameworks popularity.

In terms of technologies, Spark and to a degree Storm are getting a lot of traction and the next few months are essential to confirm the trend. These still very much come from a JVM ecosystem but there is potential in building competitor products.

Personal progress

MacBook

This is the first thing I did after making the decision 9 months ago: I bought a MacBook. I was probably the farthest thing away from being an Apple fanboy, but well it has put its hooks in me too now. I wasn't sure if I should get a Windows laptop and run a Linux VM on it, or buy a MacBook and run Windows VM. Funny enough, and despite my presumptions, I found the second option to be cheaper. In fact I could not find an Windows UltraBook with 16 GB of RAM and that is what I needed to be able to comfortably run a VIM. So buying a 13.3" MacBook pro proved both economical (in the light of what you get back for the money) and the right choice - since you want your VM to be your secondary platform.

Initially I did not like OSX but it helped me to get better at using the command line - be it the OSX variant of Linux commands. Six months on, similar to what some of my twitter friends had said, I don't think I will ever go back to Windows.

I have used Mac for everything apart from using Visual Studio and occasional Visio - also using some Azure tools had to be on Windows. I think I now spend probably only 20% of my time in Windows, the rest in Linux (Azure VM) and OSX.

Linux, Shell scripting and command line

I felt like an ignorant to find out the wealth of command line tools at my disposal in OSX and Linux. Find, Grep, Sort, Sed, tail, head, etc just amazing stuff. I admit, for some there might be windows equivalent that I have not heard of (which I doubt) but it really makes the life so easier to automate and manage your servers. So been working on understanding services on Linux and OSX, learning about Apache and how to configure it... I am no expert by any stretch but it has been fun and learnt a lot.
And yes, I did use VIM - and yes, I did find it difficult to exit it the first time :) I am not mad about it, I just have to use it on Linux VMs I manage configs, etc but cannot see myself using it for development - at least anytime soon.

Languages

As I said the, I had decided to start with some JVM languages. Scala felt the right choice then and with knowing more about it now, even more so. It is powerful yet all the wealth of Java libraries are at your fingertip. It is widely adopted (and Clojure the second candidate not so much). Erlang probably not appropriate now and go is non-JVM. so I am happy with that decision.

Having said that, I could not learn a lot of it. Instead I had to focus on Python for a personal NLP project - well, as you know most NLP and data science tools are on Python. I had to learn to code, understand its OOP and functional side, its versioning and distribution and finally above all being able to serve REST APIs (using Flask and RESTful-Flask) for interop with my other C# code.
My view on it? Python is simple and has a nice built-in support for important data structures (list, map, tuple, etc) making it ideal for working with the data. So it is a very useful language but it is not anywhere near as elegant as Scala or even C#. So for complex stuff, I would still rather coding in C#, until I properly pick up Scala again. I am also not very comfortable with distributing non-compiled code - although that is what we normally do in JavaScript (minimising aside), perhaps another point of similarity between these two.

Apart from these, I have still been doing a ton of C#, as I had predicted in the previous blog post. I have been working on a Cloud Actor Mini-Framework called BeeHive which I am currently using myself. I still enjoy writing C# and am planning to try out Mono as well (.NET on OSX and Linux). Having said that, I feel tools and languages best to be used in their native platform and ecosystem, so I am not sure if Mono would be a viable option for me.

Conclusion

All-in-all I think by embracing the non-Microsoft world, I have made the right decision. A new world has been suddenly opened up for me, a lot of exciting things to learn and to do. I wish I had done this earlier.

Would I think I will completely abandon my previous skills? I really doubt it: The future is not mono-colour, it is a democratised hybrid one, where different skillsets will result in cross-pollinisation and producing better software. It feel having a hybrid skill is becoming more and more important, and if you are looking to position yourself better as a developer/architect, this is the path you need to take. Currently cross-platform/hybrid skills is a plus, in 5 years it will be a necessity.

Wednesday, 11 June 2014

BeeHive Series - Part 3: BeeHive 0.5, RabbitMQ and more

Level [T4]

BeeHive is a friction-free library to do Reactor Cloud Actors - effortlessly. It defines abstractions for the message, queue and the actors and all you have to do is to define your actors and connect their dots using subscriptions. If it is the first time you read about BeeHive, you could have a look at previous posts but basically a BeeHive Actor (technically Processor Actor) is very simple:
[ActorDescription("TopicName-SubscriptionName")]
public class MyActor : IProcessorActor
{
  public Task<IEnumerable<Event>> ProcessAsync(Event evnt)
  {
    // impl
  }
}
All you do is to consume a message, do some work and then return typically one, sometimes zero and rarely many events back.
A few key things to note here.

Event

First of all Event, is an immutable, unique and timestamped message which documents a significant business event. It has a string body which normally is a JSON serialisation of actual message object - but it does not have to be.

So usually messages are arbitrary bytes, why here it is a string? While it was possible to use byte[], if you need to send binary blobs or you need custom serialisation, you are probably doing something wrong. Bear in mind, BeeHive is targeted at solutions that require scale, High Availability and linearisation. If you need to attach a big binary blob, just drop it in a key value store using IKeyValueStore and put the link in your message. If it is small, use Base64 encoding. Also your messages need to very simple DTOs (and by simple I do not mean small, but a class with getters and setters), if you are having problem serialising them then again, you are doing something wrong.

Queue naming

BeeHive uses a naming conventional for queues, topics and subscriptions. Basically it is in the format of TopicName-SubscriptionName. So there are a few rules with this:
  • Understandably, TopicName or SubscriptionName should not contain hyphens
  • If the value of TopicName and SubscriptionName is the same, it is a simple queue and not a publish-subscribe queue. For example, "OrderArrived-OrderArrived"
  • If you leave off the SubscriptionName then you are referring to the topic. For example "OrderArrived-".
Queue name is represented by the class QueueName. If you need to construct queue names using static methods:

var n1 = QueueName.FromSimpleQueueName("SimpleQ"); // "SimpleQ-SimpleQ"
var n2 = QueueName.FromTopicName("Topic"); // "Topic-"
var n3 = QueueName.FromTopicAndSubscriptionName("topic", "Sub"); // "Topic-Sub"

There is a QueueName property on the Event class. This property defines where to send the event message. The queue name must be the name of the topic or simple queue name.

IEventQueueOperator

This interface got some make over in this release. I have not been happy the interface as it had some inconsistencies - especially in terms of creating . Thanks to Adam Hathcock who reminded me, now this is done.

With QueueName ability of differentiating topics and simple queue, this value needs to be either name of the simple queue (in the example above "SimpleQ") or the conventional topic name (in the example above "Topic-").

So here is the interface(s) as it stands now:

public interface ISubscriptionOperator<T>
{
    Task<PollerResult<T>> NextAsync(QueueName name);
    Task AbandonAsync(T message);
    Task CommitAsync(T message);
    Task DeferAsync(T message, TimeSpan howLong);
}

public interface ITopicOperator<T>
{
    Task PushAsync(T message);
    Task PushBatchAsync(IEnumerable<T> messages);
}

public interface IQueueOperator<T> : ITopicOperator<T>, ISubscriptionOperator<T>
{
    Task CreateQueueAsync(QueueName name);
    Task DeleteQueueAsync(QueueName name);
    Task<bool> QueueExists(QueueName name);
}

public interface IEventQueueOperator : IQueueOperator<Event>
{
}
Main changes were made to IQueueOperator<T> passing the QueueName which made it simpler.

RabbitMQ Roadmap

BeeHive targets cloud frameworks. IEventQueueOperator and main data structures have been implemented for Azure. Next is AWS.

Amazon Web Services (AWS) provides Simple Queue Service (SQS) which only supports simple send-receive scenarios and not Publish-Subscribe cases. With this in mind, it is most likely that other message brokers will be used although a custom implementation of pub-sub based on Simple Notification Service (SNS) has been reported. Considering RabbitMQ is by far the most popular message broker out there (is it not?) it is sensible to pick this implementation first.

RabbitMQ client for .NET has a very simple API and working with it is very easy. However, the connection implementation has a lot to be desired. EasyNetQ has a sophisticated connection implementation that covers dead connection refreshes and catering for round-robin in case of High-Availability scenario. Using a full framework to just the connection is not really an option hence I need to implement something similar.

So for now, I am realising an alpha version without the HA and connection refresh to get community feedback. So please do ping me what you think.

Since this is a pre-release, you need to use -Pre to get it installed:

PM> Install-Package BeeHive.RabbitMQ -Pre

Tuesday, 3 June 2014

Cancelling an async HTTP request Task sends TCP RESET packet

Level [T4]

This blog post did not just happen. In fact, never, if ever, something just happens. There is a story behind everything and this one is no different. Looking back, it feels like a nice find but as the story was unfolding, I was running around like a headless chicken. Here we have the luxury of the hindsight so let's take advantage of it.

TLDR; If you are a sensible HTTP client and make your HTTP requests using cancellable async Tasks by passing a CancellationToken, you could find your IP blocked by legacy bridge devices blacklisting clients sending TCP RESET packets.

So here is how it started ...

So we were supposed to go live on Monday - some Monday. Talking of live, it was not really live - it was only to internal users but considering the high profile of the project, it felt like the D-Day. All VPs knew of the release and were waiting to see a glimpse of the project. Despite the high profile, it was not properly resourced, I despite being so called architect , pretty much singled handedly did all the API and the middleware connecting the Big Data outputs with the Single Page Application.

We could not finish going live on Monday so it moved to Tuesday. Now on Tuesday morning we were all ready and I set up my machine's screen like traders with all performance monitors up on the screen looking at users. With using the cloud Azure, elasticity was the option although the number of internal users could hardly make a dent on the 3 worker roles. So we did go live, and, I could see traffic building up and all looked fine. Until ... it did not.

I saw requests queuing up and loading the page taking longer and longer. Until it was completely frozen. And we had to take the site down. And that was not good.

Server Analysis

I brought up DebugView and was lucky to see this (actual IP and site names anonymised):

[1240] w3wp.exe Error: 0 :
[1240] <html>
[1240] <h1>Access Administratively Blocked</h1>
[1240] <br>URL : 'http://www.xyz.com/services/blahblah'
[1240] <br>Client IP address : 'xyz.xx.yy.zzz'
[1240] </html>

So we are being blocked! Something is blocking us and this could be because we used an UI data endpoint as a Data API. Well I knew it is not good but as I said we had a limited time and in reality that data endpoint was meant to support our live traffic.

So after a lot of to and fro with our service delivery and some third party support, we were told that our software was recognised as malicious since it was sending way too many TCP RESET packets. Whaa?? No one ain't sending no TCP whatever packets, we are using a high level language (C#) and it is the latest HttpClient implementation. We are actually using many optimising techniques such as async calls, parallelisation, etc to make the code as efficient as possible. We also used short timeout+ retry which is Netflix's approach to improve performance.

But what is TCP RESET packets? Basically a RESET packet is one that has the RESET flag set (which is otherwise unset) and tells the server to drop the TCP connection immediately and reclaim all the resources associated with it. There is an RFC from back in 2002 that considers RESET harmful. Wikipedia's article argues that when used as designed, it is useful but forged RESET can disrupt the communication between the client and server. And Microsoft's technet blog on the topic says "RESET is actually a good thing".

And in essence, I would agree with the Microsoft (and Wikipedia's) account that sending RESET packet is what a responsible client would do. Let's imagine you are browsing a site using a really bad wifi connection. The loading of the page takes too long and you frustrated by the slow connection, cancel browsing by pressing the X button. At this point, a responsible browser should tell the server it has changed its mind and is not interested in the response. This will let the server use its resources for a client that is actually waiting for the server's response.

Now going back to the problem at hand, I am not a TCP expert by any stretch - I have always used higher level constructs and never had to go down so deep in the OSI model. But my surprise was, what is different now with my code that with a handful calls I was getting blocked while the live clients work well with no problem with significantly larger number of calls?

I had a hunch that it probably has to do with the some of the patterns I have been using on the server. And to shorten the suspense, the answer came from the analysis of TCP packets when cancelling an async HTTP Task. The live code uses the traditional synchronous calls - none of the fancy patterns I used. So let's look at some sample code that cancels the task if it takes too long:

var client = new HttpClient();
var buffer = new byte[5 * 1000 * 1000];
// you might have to use different timeout or address
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); /
try
{
    var result = client.GetAsync("http://www.google.com",
    cts.Token).Result;
    var s = result.Content.ReadAsStreamAsync().Result;

    var result1 = s.ReadAsync(buffer, 0, buffer.Length, cts.Token).Result;
    ConsoleWriteLine(ConsoleColor.Green, "Got it");
}
catch (Exception e)
{
    ConsoleWriteLine(ConsoleColor.Red, "error! " + e);
}

In this snippet, we are calling the google server and set a 300ms timeout (which you might have to modify the timeout or the address based on your connection speed, in order to see the cancellation). Here is a WireShark proof:



As you can see above a TCP RESET packet has been sent - if you have set the parameters in a way that the request does not complete before its timeout and gets cancelled. You can try this with a longer timeout or use a WebClient which is synchronous and make sure you will never ever see this RST packet.

Now the question is, should a network appliance pick on this responsible cancellation and treat it as an attack? By no means. But in my case, it did and it is very likely that it could do that with yours.

My solution came by whitelisting my IP against "TCP RESET attacks". After all, I was only trying to help the server.

Conclusion

Cancelling an HTTP async Task in the HttpClient results in sending TCP RESET which is considered malicious by some network appliances resulting in blacklisting your IP.

PS. The network appliance belonged to our infrastructure 3rd party provider whose security managed by another third party - it was not in Azure. The real solution would have been to remove such crazy rule, but anyhow, we developers don't always get what we want.


Monday, 2 June 2014

BeeHive Series - Part 2 - Importing file from blob storage to ElasticSearch sample

[Level T1]

In the previous post, we introduced BeeHive and talked about an example usage where we check news feeds and send a notification if a keyword is found. In this post, we look at another example. You can find the source code in the BeeHive Github repo. Just open up BeeHive.Samples.sln file.

Processing files

Let's imagine we receive files in a particular blob location and we need to import/process them into the system. These files arrive in a particular folder structure and we need to watch the root folder. Then we need to pick them up, extract each row and send each record to be processed - in this case to be loaded onto an ElasticSearch cluster.
ElasticSearch is a horizontally-scalable and highly-available indexing and search technology. It runs on Windows, Linux and OSX, easy to setup and free to use. You can download the installer from http://www.elasticsearch.org/

NewFileArrived

So here, we design a system that watches the location and when it finds the files, it raises NewFileArrived event. This is a simple enough process yet what if we have multiple actors watching the location (very likely for a cloud scenario where the same process runs on many machines)? In this case we will receive multiple NewFileArrived events.
BeeHive provides pulsers that help you with your concurrency problems. FolderWatcherActor can subscribe to a topic that is fed by a pulser. In fact, in a BeeHive world, you could have pulsers that raise events at different intervals and raise events such as FiveMinutesPassedAnHourPassedADayPassed, etc and based on the requirement, your actors could be subscribing to any of these. Beauty ofmessage-based scheduling is that only a single instance of the actor will be receiving the message.
Raising the NewFileArrived event is not enough. When the actor wakes up again by receiving the next message and the file is there, it will send another NewFileArrived error. We can protect against this by:
1) Making processing Idempotent 2) Keep track of files received 3) Mark files by creating a status file next to them
We choose the last option so we can use the same status file further down. So after identifying the file, we create a file with the same name plus .status and write the status number, here 1.

public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var events = new List<Event>();
    var items = (await _dynamoStore.ListAsync(
        _configurationValueProvider.GetValue(Constants.SweepRootPathKey)))
        .ToArray();

    var notProcessed = items.Where(x => !x.IsVirtualFolder)
        .GroupBy(z => z.Id.Replace(Constants.StatusPostfix, ""))
        .Where(f => f.Count() == 1)
        .Select(w => w.Single());

    foreach (var blob in notProcessed)
    {
        events.Add(new Event(new NewFileArrived()
        {
            FileId = blob.Id
        }));
        await _dynamoStore.InsertAsync(new SimpleBlob()
        {
            Id = blob.Id + Constants.StatusPostfix,
            Body = new MemoryStream(BitConverter.GetBytes(1)) // status 1
        });
    }

    return events;
}

Process the file: fan-out the records

After receiving the NewFileArrived, we copy the file locally and split the file to the records and fan out the records with ImportRecordExtracted. We also send a ImportFileProcessed event.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var newFileArrived = evnt.GetBody<NewFileArrived>();
    var blob = await _dynamoStore.GetAsync(newFileArrived.FileId);
    var reader = new StreamReader(blob.Body);
    string line = string.Empty;
    var events = new List<Event>();
    while ((line = reader.ReadLine())!= null)
    {
        var fields = line.Split(new []{','},StringSplitOptions.RemoveEmptyEntries);
        events.Add(new Event( new ImportRecordExtracted()
        {
            Id = fields[0],
            Content = fields[2],
            IndexType = fields[1]
        }));
    }

    events.Add(new Event(new ImportFileProcessed()
    {
        FileId = newFileArrived.FileId
    }));

    return events;
}

ImportFileProcessed

The actor receiving this event will delete the file and the status file.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var importFileProcessed = evnt.GetBody<ImportFileProcessed>();
    var statusFile = importFileProcessed.FileId + Constants.StatusPostfix;

    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = importFileProcessed.FileId
    });
    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = statusFile
    });

    return new Event[0];
}

ImportRecordExtracted

Based on the type of the record, we "upsert" the record in the appropriate index in our ElasticSearch cluster.
public async Task> ProcessAsync(Event evnt)
{
    var importRecordExtracted = evnt.GetBody();
    var elasticSearchUrl = _configurationValueProvider.GetValue(Constants.ElasticSearchUrlKey);

    var client = new HttpClient();
    var url = string.Format("{0}/import/{1}/{2}", elasticSearchUrl,
        importRecordExtracted.IndexType,
        importRecordExtracted.Id);
    var responseMessage = await client.PutAsJsonAsync(url, importRecordExtracted);

    if (!responseMessage.IsSuccessStatusCode)
    {
        throw new ApplicationException("Indexing failed. " 
            + responseMessage.ToString());
    }

    return new[]
    {
        new Event(new NewIndexUpserted()
        {
            IndexUrl = url
        }) 
    };
}

NewIndexUpserted

While we currently do not need to know when we add or update an index in the ElasticSearch, this can later be used by other processes, so it is best to provision the event. As we said before, BeeHive events are meaningful business milestones that may or may not be used by your current system.

Here are our indexes when browsing to http://localhost:9200/import/_search

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 14,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "import",
      "_type" : "D",
      "_id" : "4",
      "_score" : 1.0, "_source" : {"Id":"4","IndexType":"D","Content":"These are controlled by min_term_freq"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "9",
      "_score" : 1.0, "_source" : {"Id":"9","IndexType":"E","Content":"There are other parameters such as min_word_length"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "11",
      "_score" : 1.0, "_source" : {"Id":"11","IndexType":"E","Content":"In order to give more weight to more interesting terms"}
    }, {
      "_index" : "import",
      "_type" : "A",
      "_id" : "2",
      "_score" : 1.0, "_source" : {"Id":"2","IndexType":"A","Content":"clauses in a bool query of interesting terms extracted from some provided text. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "7",
      "_score" : 1.0, "_source" : {"Id":"7","IndexType":"D","Content":"controlled by percent_terms_to_match. The terms are extracted from like_text "}
    }, {
      "_index" : "import",
      "_type" : "H",
      "_id" : "14",
      "_score" : 1.0, "_source" : {"Id":"14","IndexType":"H","Content":"score times some boosting factor boost_terms."}
    }, {
      "_index" : "import",
      "_type" : "B",
      "_id" : "3",
      "_score" : 1.0, "_source" : {"Id":"3","IndexType":"B","Content":"The interesting terms are selected with respect to their tf-idf scores. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "8",
      "_score" : 1.0, "_source" : {"Id":"8","IndexType":"D","Content":"which is analyzed by the analyzer associated with the field"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "10",
      "_score" : 1.0, "_source" : {"Id":"10","IndexType":"E","Content":"max_word_length or stop_words to control what terms should be considered as interesting. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "5",
      "_score" : 1.0, "_source" : {"Id":"5","IndexType":"D","Content":"The number of interesting terms is controlled by max_query_terms. "}
    } ]
  }
}


Cleanup processes

In the absence of transactions, business processes have to design the processes for failure. BeeHive promotes an approach that every process is broken down to its smallest elements and each implemented in an actor.

Sometimes it is necessary to design processes that look for the highly unlikely (yet possible) event of a failure when actor has done its work but the events returned never make it back to the service bus. In the case of inserting the new index, this is not a problem since we use PUT and the process is idempotent. However, this could be a problem in case of processing file where a status file is created but NewFileArrived never makes it back to the service bus. In this case, a crash unlocker process that checks the timestamp of the status file and deletes the file if older than e.g. 1 day, is all that is needed.

Conclusion

We can use pulsers to solve the inherent concurrency problem of multiple folder watcher actors watching the same folder. The fan-out process of breaking a file down to its record and parallilising the processing is one of the key benefits of cloud actors.



Thursday, 22 May 2014

BeeHive Series - Part 1 - Getting started with BeeHive

[Level T1]

I feel that BeeHive has been the most important Open Source project I have been involved so far. Not because it is my latest project, but because I feel the potential is huge.

The infoq article that came out on Monday is pretty much a theoretical braindump on the Reactor Actor Model. I had expected this to stir up so much controversy as the claims I am making are pretty big - this has not happened yet. Maybe the text does not flow well or it is just early. In any case, the idea is clear: sticking to Processor Actors and build a web of loosely connected events to fulfil a system's business requirement while maintaining fluid evolvability. However, I fear the article was perhaps too dry and long and did not fully demonstrate the potential.

Hence I have set out to start a series on BeeHive and show the idea in some tangible scenarios "Show me the codz" - with the code easily accessible from GitHub. So now let's roll on.

BeeHive

BeeHive makes it ridiculously simple (and not necessarily easy) to build decoupled actors that together achieve a business goal. By using topic-based subscription, you can easily add actors that feed on an existing event and do something extra.
Here we will build such systems will a few lines of code. You can find the full solution in the samples folder in the source. The code below is mainly is a snippet (e.g does not have the Dispose() methods for removing clutter).

Scenario 1: News Feed Keyword Notification

Let's imagine you are interested in to know all breaking news from one or several news feeds and would like to be notified when a certain keyword occurs in the news. In this case we design a series of reactive actors that achieve this while allowing for other functionality to be built on top of existing topic based queues. We use Windows Azure for this example. In order to achieve this we need to regularly (e.g. every 5 minutes), check the news feed (RSS, ATOM, etc) and examine new items arrived and look for the interested keyword(s) and then perhaps tweet, send email or SMS text.

Pulsers: Activities on regular intervals

BeeHive works on a reactive event-based model. Instead of building components that regularly do some work, we can have generic components that regularly fire an event. These events then can be subscribed to by one or more actors to fire off the processing by a chain of decoupled actors.
BeeHive Pulsers do exactly that. On regular intervals, they are woken up to fire off their events. Simplest of pulsers, are assembly attribute ones:
[assembly: SimpleAutoPulserDescription("NewsPulse", 5 * 60)]
The code above sets up a pulser that every 5 minutes sends an event of type NewsPulse with empty body.

Dissemination from a list: Fan-out pattern

Next we set up an actor to look up a list of feeds (one per each line) and send an event per each feed. We have stored this list in a blob storage which is abstracted as IKeyValueStore in BeeHive.
[ActorDescription("NewsPulse-Capture")]
public class NewsPulseActor : IProcessorActor
{
  private IKeyValueStore _keyValueStore;
  public NewsPulseActor(IKeyValueStore keyValueStore)
  {
    _keyValueStore = _keyValueStore;
  }
  public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
  {
    var events = new List<Event>();
    var blob = await _keyValueStore.GetAsync("newsFeeds.txt");
    var reader = new StreamReader(blob.Body);
    string line = string.Empty;
    while ((line = reader.ReadLine())!=null)
    {
      if (!string.IsNullOrEmpty(line))
      {
        events.Add(new Event(new NewsFeedPulsed(){Url = line}));
      }
    }
    return events;
  }
}
ActorDescription attribute used above signifies that the actor will receive its events from the Capture subscription of the NewsPulse topic. We will be setting up all topics later using a single line of code.
So we are publishing NewsFeedPulsed event for each news feed. When we construct an Event object using the typed event instance, EventType and QueueName will be set to the type of the object passed - which is the preferred approach for consistency.

Feed Capture: another Fan-out

Now we will be consuming these events in the next actor in the chain. NewsFeedPulseActor will subscribe to NewsFeedPulsed event and will use the URL to get the lastest RSS feed and look for latest news. To prevent from duplicate notifications, we need to know what was the most recent tem we checked last time. We will store this offset in a storage. For this use case, we choose ICollectionStore<T> which its Azure implementation uses Azure Table Storage.
[ActorDescription("NewsFeedPulsed-Capture")]
public class NewsFeedPulseActor : IProcessorActor
{
    private ICollectionStore<FeedChannel> _channelStore;
    public NewsFeedPulseActor(ICollectionStore<FeedChannel> channelStore)
    {
        _channelStore = channelStore;
    }
    public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
    {
      var newsFeedPulsed = evnt.GetBody<NewsFeedPulsed>();
      var client = new HttpClient();
      var stream = await client.GetStreamAsync(newsFeedPulsed.Url);
      var feedChannel = new FeedChannel(newsFeedPulsed.Url);
      var feed = SyndicationFeed.Load(XmlReader.Create(stream));
      var offset = DateTimeOffset.MinValue;
      if (await _channelStore.ExistsAsync(feedChannel.Id))
      {
        feedChannel = await _channelStore.GetAsync(feedChannel.Id);
        offset = feedChannel.LastOffset;
      }
      feedChannel.LastOffset = feed.Items.Max(x => x.PublishDate);
      await _channelStore.UpsertAsync(feedChannel);
      return feed.Items.OrderByDescending(x => x.PublishDate)
        .TakeWhile(y => offset < y.PublishDate)
        .Select(z => new Event(new NewsItemCaptured(){Item = z}));
    }
  }
}
Here we read the URL from the event and capture the RSS and then get the last offset from the strorage. We then send the captured feed items back as events for whoever is interested. At the end, we set the offset.

Keyword filtering and Notification

At this stage we need to subscribe to NewsItemCaptured and check the content for specific keywords. This is only one potential subscription out of many. For example one actor could be subscribing to the event to store these for further retrieval, another to process for trend analysis, etc.
So for the sake of simplicity, let's hardcode the keyword (in this case "Ukraine") but it could have been equally loaded from a storage or a file - as we did with the list of feeds.
[ActorDescription("NewsItemCaptured-KeywordCheck")]
public class NewsItemKeywordActor : IProcessorActor
{
    private const string Keyword = "Ukraine";
    public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
    {
        var newsItemCaptured = evnt.GetBody<NewsItemCaptured>();
        if (newsItemCaptured.Item.Title.Text.ToLower()
            .IndexOf(Keyword.ToLower()) >= 0)
        {
            return new Event[]
            {
                new Event(new NewsItemContainingKeywordIentified()
                {
                    Item = newsItemCaptured.Item,
                    Keyword = Keyword
                }),
            };
        }
        return new Event[0];
    }
}
Now we can have several actors listening for NewsItemContainingKeywordIentified and send different notifications, here we implement a simple Trace-based one:

[ActorDescription("NewsItemContainingKeywordIentified-TraceNotification")]
public class TraceNotificationActor : IProcessorActor
{
  public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
  {
    var keywordIentified = evnt.GetBody<NewsItemContainingKeywordIentified>();
    Trace.TraceInformation("Found {0} in {1}",
        keywordIentified.Keyword, keywordIentified.Item.Links[0].Uri);
    return new Event[0];
  }
}

Setting up the worker role

If you have an Azure account, you need a storage account, Azure Service Bus and a worker role (even an Extra Small instance would suffice). If not, you can use development emulators although for the Service Bus you need to use Service Bus for windows. Just bear in mind, with local emulators and Service Bus for Windows, you have to use special versions of Azure SDK - latest versions usually do not work.
We can get a list of assembly pulsers by the code below:

_pulsers = Pulsers.FromAssembly(Assembly.GetExecutingAssembly())
    .ToList()

Also we need to create an Orchestartor to set up factory actors. We need to call SetupAsync() to set up all the topics and subscriptions.
Also we need to register our classes against Dependency Injection framework.

Now we are ready!

After running the application in debug mode, here is what you see in the output window:
Found Ukraine
Obviously, we can add another actor to subscribe to this event to send email, SMS messages, you name it. Point being, it is a piece of cake. Checking the links, and they indeed are about Ukraine:


And this one:


In the next post, we will discuss another scenario.

Saturday, 19 April 2014

CacheCow 0.5 Released

[Level T1]

OK, finally CacheCow 0.5 (actually 0.5.1 as 0.5 was released by mistake and pulled out quickly) is out. So here is the list of changes, some of which are breaking. Breaking changes, however, are very easy to fix and if you do not need new features, you can happily stay at 0.4 stable versions. If you have moved to 0.5 and you see issues, please let me know ASAP. For more information, see my other post on Alpha release.

So here are the features:

Azure Caching for CacheCow Server and Client

Thanks to Ugo Lattanzi, we have now CacheCow storage in Azure Caching. This is both client and server. Considering calls to Azure Cache takes around 1ms (based on some ad hoc tests I have done, do not quote this as a proper benchmark), this makes a really good option if you have deployed your application in Azure. You just need to specify the cacheName, otherwise "default" cache will be used.

Complete re-design of cache invalidation

I have now put some sense into cache invalidation. Point is that strong ETag is generated for a particular representation of the resource while cache invalidation happens on the resource including all its representation. For example, if you send application/xml representation of the resource, ETag is generated for this particular representation. As such, application/json representation will get its own ETag. However, on PUT both need to be invalidated. On the other hand, in case of PUT or DELETE on a resource (let's say /api/customer/123) the collection resource (/api/customer) needs to be invalidated since the collection will be different. 

But how we would find out if a resource is collection or single? I have implemented some logic that infers whether the item is collection or not - and this is based on common and conventional routes design in ASP.NET Web API. If your routing is very custom this will not work. 

Another aspect is hierarchical routes. When a resource is invalidated, its parent resource will be invalidated as well. For example in case of PUT /api/customer/123/order/456 , customer resource will change too - if orders are returned as part of customer.  So in this case, not only the order collection resource but the customer needs to be invalidated. This is currently done in CacheCow.Server and will work for conventional routes.

Using MemoryCache for InMemory stores (both server and client)

Previously I have been using dictionaries for InMemory stores. The problem with Dictionary is that it just grows until system runs out of memory while MemoryCache limits its use of memory when system deals with a memory pressure.

Dependency of CachingHandler to HttpConfiguration

As I announced before, you need to pass the configuration to he CachingHandler on server. This should be an easy change but a breaking one.


Future roadmap and 0.6

I think we are now in a point where CacheCow requires a re-write for the features I want to introduce. One of such features is enabling Content-based ETag generation and cache invalidation. Currently Content-based ETag generation is there and can be used now but without content-based invalidation is not much use especially that in fact this now generates a weak ETag. Considering the changes required for achieving this, almost a re-write is required.

Please keep the feedbacks coming. Thanks for using and supporting CacheCow!


Sunday, 13 April 2014

Reactive Cloud Actors: no-nonsense MicroServices

[Level C3]

This post is not directly about MicroServices. If that is why you are reading it, might as well stop now. Apparently, we are still waiting as for the definition to be finally ratified. The definition, as it stands now, is blurry - Martin Fowler admits. This post is about Actors - the cloud ones - you know. After finishing reading it, I hope I have made it effortlessly clear how Reactive Cloud Actors are the real MicroServices, rather than albeit light RESTful Imperative MicroServices.

Watching Fred George delivering an excellent talk on High-Performance Bus inspired me to start working on the actors. I am still working on a final article on the subject but this post is basically a primer on that - as well as announcement of BeeHive mini-framework. The next section on actors is taken from that article which covers the essential theoretical background. Before we start, let's make it clear that the term Reactive is not used in the context of Reactive Extensions (Rx) or Frameworks, only in contrast to imperative (RPC-based) actors. Also RPC-based is not used in contrast to RESTful, but it simply means a system which relies on command and query messages rather than events.

UPDATE: The article is now published on infoq here

Actors

Carl Hewitt, along with Peter Bishop and Richard Steiger, published an article back in 1973 that proposed a formalism that identified a single class of objects, i.e. Actors, as the building blocks of systems designed to implement Artificial Intelligence algorithms.

According to Hewitt an actor, in response to a message, can:
  1. Send a finite number of messages to other actors
  2. create a finite number of other actors
  3. decide on the behaviour to be used for the next message it receives
Any combination of these actions can occur concurrently and in response to messages arriving in any order - as such, there is no constraint with regard to ordering and an actor implementation must be able to handle messages arriving out of band. However, I believe it is best to separate these responsibilities as below.

Processor Actor

In a later description of the Actor Model, first constraint is re-defined as "send a finite number of messages to the address of other actors". Addressing is an integral part of the model that decouples actors and limits the knowledge of actors from each other to mere a token (i.e. address). Familiar implementation of addressing includes Web Services endpoints, Publish/Subscribe queue endpoints and email addresses. Actors that respond to a message by using the first constraint can be called Processor Actors.

Factory Actor

Second constraint makes actors capable of creating other actors that we conveniently call Factory Actors. Factory actors are important elements of a message-driven system where an actor is consuming from a message queue and create handlers based on the message type. Factory actors control the lifetime of the actors they create and have a deeper knowledge of the actors they create - compared to processor actors knowing mere an address. It is useful to separate factory actors from processing ones - in line with the single responsibility principle.

Stateful Actor

Third constraint is the Stateful Actor. Actors capable of the third constraint have a memory that allows them to react differently on subsequent messages. Such actors can be subject to a myriad of side-effects. Firstly, when we talk about "subsequent messages" we inherently assume an ordering while as we said, there is no constraint with regard to ordering: an out of band message arrival can lead to complications. Secondly, all aspects of CAP applies to this memory making a consistent yet highly available and partition tolerant impossible to achieve. In short, it is best to avoid stateful actors.

Modelling a Processor Actor

"Please open your eyes, Try to realise, I found out today we're going wrong, We're going wrong" - Cream
[Mind you there is only a glimpse of Ginger Baker visible while the song is heavily reliant on Ginger's drumming. And yeah, this goes back to a time when Eric played Gibson and not his signature Strat]


This is where most of us can go wrong. We do that, sometimes for 4 years - without realising it. This is by no means a reference to a certain project [... cough ... Orleans ... cough] that has been brewing (Strange Brew pun intended) for 4 years and coming up with an imperative, RPC-based, RESTfully coupled Micro-APIs. We know it, doing simple is hard - and we go wrong, i.e. we do the opposite: build really complex frameworks. 

I was chatting away on twitter with a few friends and I was saying "if you need a full-blown and complex framework to do actors, you are probably doing it wrong". All you need is a few interfaces, and some helpers doing the boilerplate stuff. This stuff ain't no rocket science, let's not turn it into.

The essence of the Reactive Cloud Actor is the interface below (part of BeeHive mini-framework introduced below):
    /// <summary>
    /// Processes an event.
    /// 
    /// Queue name containing the messages that the actor can process messages of.
    /// It can be in the format of [queueName] or [topicName]-[subscriptionName]
    /// </summary>
    public interface IProcessorActor : IDisposable
    {
        /// <summary>
        /// Asynchrnous processing of the message
        /// </summary>
        /// <param name="evnt">Event to process</param>
        /// <returns>Typically contains 0-1 messages. Exceptionally more than 1</returns>
        Task<IEnumerable<Event>> ProcessAsync(Event evnt);

    }

Yes, that is all. All of your business can be captured by the universal method above. Don't you believe that? Just have a look a non-trivial eCommerce example implemented using this single method.

So why Reactive (Event-based) and not Imperative (RPC-based)? Because in a reactive actor system, each actor only knows about its own Step and what itself does and has no clue about the next steps or the rest of the system - i.e. actors are decoupled leading to independence which facilitates actor Application Lifecycle Management and DevOps deployment.
As can be seen above, Imperative actors know about their actor dependencies while Reactive actors have no dependency other than the queues, basic data structure stores and external systems. Imperative actors communicate with other actors via a message store/bus and invoke method calls. We have been this for years, in different Enterprise Service Bus integrations, this one only brings it to a micro level which makes the pains event worse.

So let's bring an example: fraud check of an order.

Imperative
PaymentActor, after a successful payment for an order, calls the FraudCheckActor. FraudCheckActor calls external fraud check systems. After identifying a fraud, it calls CancelOrderActor to cancel the order. So as you can see, PaymentActor knows about and depends on FraudCheckActor. In the same way, FraudCheckActor depends on CancelOrderActor. They are coupled.

Reactive
PaymentActor, upon successful payment, raises PaymentAuthorised event. FraudCheckActor is one of its subscribers and after receiving this event checks for fraud and if one detected, raises FraudDetected event. CancelOrderActor subscribers to some events, including FraudDetected upon receiving which it cancels the order. None of these actors know about the other. They are decoupled.

So which one is better? By the way, none of this is new - we have been doing it for years. But it is important to identify why we should avoid the first and not to "go wrong".

Reactive Cloud Actors proposal


After categorising the actors, here I propose the following constraints for Reactive Cloud Actors:
  • A reactive system that communicates by sending events
  • Events are defined as a time-stamped, immutable, unique and eternally-true piece of information
  • Events have types
  • Events are stored in a Highly-Available cloud storage queues allowing topics
  • Queues must support delaying
  • Processor Actors react to receiving a single and then do some processing and then send back usually one (sometimes zero and rarely more than one) event
  • Processor Actors have type - implemented as a class
  • Processing should involve minimal number of steps, almost always a single step
  • Processing of the events are designed to be Idempotent
  • Each Processor Actor can receive one or more event types - all of which defined by Actor Description
  • Factory Actors responsible for managing the lifetime of processor actors
  • Actors are deployed to cloud nodes. Each node contains one Factory Actor and can create one or more Processor Actor depending on its configuration. Grouping of actors depends on cost vs. ease of deployment.
  • In addition to events, there are other Basic Data Structures that contain state and are stored in Highly-Available cloud storage (See below on Basic Data Structures)
  • There are no Stateful Actors. All state is managed by the Basic Data Structures and events.
  • This forms an evolvable web of events which can define flexible workflows
Breaking down all the processes into single steps is very important. A Highly-Available yet Eventually-Consistent system can handle delays but cannot easily bind multiple steps into a single transaction.

So how can we implement this? Is this gonna work?

Introducing BeeHive


 BeeHive is a vendor-agnostic Reactive Actor mini-framework I have been working over the last three months. It is implemented in C# but frankly could be done in any language supporting Asynchronous programming (promises) such as Java or node.

The cloud implementation has been only implemented for Azure but implementing another cloud vendor is basically implementing 4-5 interfaces. It also comes with an In-Memory implementation too which is only targeted at implementing demos. This framework is not meant to be used as an in-process actor framework.

It implements Prismo eCommerce example which is an imaginary eCommerce system and has been implemented for both In-Memory and Azure. This example is non-trivial has some tricky scenarios that have to implement Scatter-Gather sagas. There is also a Boomerang pattern event that turns a multi-step process into regurgitating an event a few times until all steps are done (this requires another post).

An event is model as:

[Serializable]
public sealed class Event : ICloneable
{

    public static readonly Event Empty = new Event();

    public Event(object body)
        : this()
    {        
       ...
    }

    public Event()
    {
        ...
    }

    /// <summary>
    /// Mrks when the event happened. Normally a UTC datetime.
    /// </summary>
    public DateTimeOffset Timestamp { get; set; }

    /// <summary>
    /// Normally a GUID
    /// </summary>
    public string Id { get; private set; }

    /// <summary>
    /// Optional URL to the body of the message if Body can be retrieved 
    /// from this URL
    /// </summary>
    public string Url { get; set; }

    /// <summary>
    /// Content-Type of the Body. Usually a Mime-Type
    /// Typically body is a serialised JSON and content type is application/[.NET Type]+json
    /// </summary>
    public string ContentType { get; set; }
        
    /// <summary>
    /// String content of the body.
    /// Typically a serialised JSON
    /// </summary>
    public string Body { get; set; }

    /// <summary>
    /// Type of the event. This must be set at the time of creation of event before PushAsync
    /// </summary>
    public string EventType { get; set; }

    /// <summary>
    /// Underlying queue message (e.g. BrokeredMessage in case of Azure)
    /// </summary>
    public object UnderlyingMessage { get; set; }

    /// <summary>
    /// This MUST be set by the Queue Operator upon Creation of message usually in NextAsync!!
    /// </summary>
    public string QueueName { get; set; }


    public T GetBody<T>()
    {
       ...
    }

    public object Clone()
    {
        ...
    }
}

As can be seen, Body has been defined as a string since BeeHive uses JSON serialisation. This can be made flexible but in reality events should normally contain small amount of data and mainly basic data types such as GUID, integer, string, boolean and DateTime. Any binary data should be stored in Azure Blob Storage or S3 and then path referenced here.

BeeHive Basic Data Structures

This is a work-in-progress but nearly done part of the BeeHive to define a minimal set of Basic Data Structures (and their stores) to cover all required data needs of Reactive Actors. These structures are defined as interfaces that can be implemented for different cloud platforms. This list as it stands now:
  • Topic-based and simple Queues
  • Key-Values
  • Collections
  • Keyed Lists
  • Counters

Some of these data structures hold entities within the system which should implement a simple interface:

public interface IHaveIdentity
{
    Guid Id { get; }
}

Optionally, entities could be Concurrency-Aware for updates and deletes in which case they will implement an additional interface:

public interface IConcurrencyAware
{
    DateTimeOffset? LastModofied { get; }

    string ETag { get; }
}


Prismo eCommerce sample

Prismo eCommerce is an imaginary eCommerce company that receives orders and processes them. The order taking relies on an eventually consistent (and delayed) read model of the inventory hence orders can be accepted for which items are out of stock. Process waits until all items are in stock or if out of stock, they arrive back in stock until it sends them to fulfilment. 

Prismo eCommerce states (solid), transitions (arrows) and processes (dashed)

This sample has been implemented both In-Memory and for Windows Azure. In both cases, the tracing can be used to see what is happening outside. In this sample all actors are configured to run in a single worker role, although they can each run in their own roles. I might provide a UI to show the status of each order as they go through statuses.

Conclusion

Reactive Cloud Actors are the way to implement decoupled MicroServices. By individualising actor definitions (Processor Actor and Factory Actor) and avoiding Stateful Actors, we can build resilient and Highly Available cloud based systems. Such systems will comprise a evolvable webs of events which each web defines a business capability. I don't know about you but this is how I am going to build my cloud systems.

Watch the space, the article is on its way.