Wednesday 11 June 2014

BeeHive Series - Part 3: BeeHive 0.5, RabbitMQ and more

Level [T4]

BeeHive is a friction-free library to do Reactor Cloud Actors - effortlessly. It defines abstractions for the message, queue and the actors and all you have to do is to define your actors and connect their dots using subscriptions. If it is the first time you read about BeeHive, you could have a look at previous posts but basically a BeeHive Actor (technically Processor Actor) is very simple:
[ActorDescription("TopicName-SubscriptionName")]
public class MyActor : IProcessorActor
{
  public Task<IEnumerable<Event>> ProcessAsync(Event evnt)
  {
    // impl
  }
}
All you do is to consume a message, do some work and then return typically one, sometimes zero and rarely many events back.
A few key things to note here.

Event

First of all Event, is an immutable, unique and timestamped message which documents a significant business event. It has a string body which normally is a JSON serialisation of actual message object - but it does not have to be.

So usually messages are arbitrary bytes, why here it is a string? While it was possible to use byte[], if you need to send binary blobs or you need custom serialisation, you are probably doing something wrong. Bear in mind, BeeHive is targeted at solutions that require scale, High Availability and linearisation. If you need to attach a big binary blob, just drop it in a key value store using IKeyValueStore and put the link in your message. If it is small, use Base64 encoding. Also your messages need to very simple DTOs (and by simple I do not mean small, but a class with getters and setters), if you are having problem serialising them then again, you are doing something wrong.

Queue naming

BeeHive uses a naming conventional for queues, topics and subscriptions. Basically it is in the format of TopicName-SubscriptionName. So there are a few rules with this:
  • Understandably, TopicName or SubscriptionName should not contain hyphens
  • If the value of TopicName and SubscriptionName is the same, it is a simple queue and not a publish-subscribe queue. For example, "OrderArrived-OrderArrived"
  • If you leave off the SubscriptionName then you are referring to the topic. For example "OrderArrived-".
Queue name is represented by the class QueueName. If you need to construct queue names using static methods:

var n1 = QueueName.FromSimpleQueueName("SimpleQ"); // "SimpleQ-SimpleQ"
var n2 = QueueName.FromTopicName("Topic"); // "Topic-"
var n3 = QueueName.FromTopicAndSubscriptionName("topic", "Sub"); // "Topic-Sub"

There is a QueueName property on the Event class. This property defines where to send the event message. The queue name must be the name of the topic or simple queue name.

IEventQueueOperator

This interface got some make over in this release. I have not been happy the interface as it had some inconsistencies - especially in terms of creating . Thanks to Adam Hathcock who reminded me, now this is done.

With QueueName ability of differentiating topics and simple queue, this value needs to be either name of the simple queue (in the example above "SimpleQ") or the conventional topic name (in the example above "Topic-").

So here is the interface(s) as it stands now:

public interface ISubscriptionOperator<T>
{
    Task<PollerResult<T>> NextAsync(QueueName name);
    Task AbandonAsync(T message);
    Task CommitAsync(T message);
    Task DeferAsync(T message, TimeSpan howLong);
}

public interface ITopicOperator<T>
{
    Task PushAsync(T message);
    Task PushBatchAsync(IEnumerable<T> messages);
}

public interface IQueueOperator<T> : ITopicOperator<T>, ISubscriptionOperator<T>
{
    Task CreateQueueAsync(QueueName name);
    Task DeleteQueueAsync(QueueName name);
    Task<bool> QueueExists(QueueName name);
}

public interface IEventQueueOperator : IQueueOperator<Event>
{
}
Main changes were made to IQueueOperator<T> passing the QueueName which made it simpler.

RabbitMQ Roadmap

BeeHive targets cloud frameworks. IEventQueueOperator and main data structures have been implemented for Azure. Next is AWS.

Amazon Web Services (AWS) provides Simple Queue Service (SQS) which only supports simple send-receive scenarios and not Publish-Subscribe cases. With this in mind, it is most likely that other message brokers will be used although a custom implementation of pub-sub based on Simple Notification Service (SNS) has been reported. Considering RabbitMQ is by far the most popular message broker out there (is it not?) it is sensible to pick this implementation first.

RabbitMQ client for .NET has a very simple API and working with it is very easy. However, the connection implementation has a lot to be desired. EasyNetQ has a sophisticated connection implementation that covers dead connection refreshes and catering for round-robin in case of High-Availability scenario. Using a full framework to just the connection is not really an option hence I need to implement something similar.

So for now, I am realising an alpha version without the HA and connection refresh to get community feedback. So please do ping me what you think.

Since this is a pre-release, you need to use -Pre to get it installed:

PM> Install-Package BeeHive.RabbitMQ -Pre

Tuesday 3 June 2014

Cancelling an async HTTP request Task sends TCP RESET packet

Level [T4]

This blog post did not just happen. In fact, never, if ever, something just happens. There is a story behind everything and this one is no different. Looking back, it feels like a nice find but as the story was unfolding, I was running around like a headless chicken. Here we have the luxury of the hindsight so let's take advantage of it.

TLDR; If you are a sensible HTTP client and make your HTTP requests using cancellable async Tasks by passing a CancellationToken, you could find your IP blocked by legacy bridge devices blacklisting clients sending TCP RESET packets.

So here is how it started ...

So we were supposed to go live on Monday - some Monday. Talking of live, it was not really live - it was only to internal users but considering the high profile of the project, it felt like the D-Day. All VPs knew of the release and were waiting to see a glimpse of the project. Despite the high profile, it was not properly resourced, I despite being so called architect , pretty much singled handedly did all the API and the middleware connecting the Big Data outputs with the Single Page Application.

We could not finish going live on Monday so it moved to Tuesday. Now on Tuesday morning we were all ready and I set up my machine's screen like traders with all performance monitors up on the screen looking at users. With using the cloud Azure, elasticity was the option although the number of internal users could hardly make a dent on the 3 worker roles. So we did go live, and, I could see traffic building up and all looked fine. Until ... it did not.

I saw requests queuing up and loading the page taking longer and longer. Until it was completely frozen. And we had to take the site down. And that was not good.

Server Analysis

I brought up DebugView and was lucky to see this (actual IP and site names anonymised):

[1240] w3wp.exe Error: 0 :
[1240] <html>
[1240] <h1>Access Administratively Blocked</h1>
[1240] <br>URL : 'http://www.xyz.com/services/blahblah'
[1240] <br>Client IP address : 'xyz.xx.yy.zzz'
[1240] </html>

So we are being blocked! Something is blocking us and this could be because we used an UI data endpoint as a Data API. Well I knew it is not good but as I said we had a limited time and in reality that data endpoint was meant to support our live traffic.

So after a lot of to and fro with our service delivery and some third party support, we were told that our software was recognised as malicious since it was sending way too many TCP RESET packets. Whaa?? No one ain't sending no TCP whatever packets, we are using a high level language (C#) and it is the latest HttpClient implementation. We are actually using many optimising techniques such as async calls, parallelisation, etc to make the code as efficient as possible. We also used short timeout+ retry which is Netflix's approach to improve performance.

But what is TCP RESET packets? Basically a RESET packet is one that has the RESET flag set (which is otherwise unset) and tells the server to drop the TCP connection immediately and reclaim all the resources associated with it. There is an RFC from back in 2002 that considers RESET harmful. Wikipedia's article argues that when used as designed, it is useful but forged RESET can disrupt the communication between the client and server. And Microsoft's technet blog on the topic says "RESET is actually a good thing".

And in essence, I would agree with the Microsoft (and Wikipedia's) account that sending RESET packet is what a responsible client would do. Let's imagine you are browsing a site using a really bad wifi connection. The loading of the page takes too long and you frustrated by the slow connection, cancel browsing by pressing the X button. At this point, a responsible browser should tell the server it has changed its mind and is not interested in the response. This will let the server use its resources for a client that is actually waiting for the server's response.

Now going back to the problem at hand, I am not a TCP expert by any stretch - I have always used higher level constructs and never had to go down so deep in the OSI model. But my surprise was, what is different now with my code that with a handful calls I was getting blocked while the live clients work well with no problem with significantly larger number of calls?

I had a hunch that it probably has to do with the some of the patterns I have been using on the server. And to shorten the suspense, the answer came from the analysis of TCP packets when cancelling an async HTTP Task. The live code uses the traditional synchronous calls - none of the fancy patterns I used. So let's look at some sample code that cancels the task if it takes too long:

var client = new HttpClient();
var buffer = new byte[5 * 1000 * 1000];
// you might have to use different timeout or address
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); /
try
{
    var result = client.GetAsync("http://www.google.com",
    cts.Token).Result;
    var s = result.Content.ReadAsStreamAsync().Result;

    var result1 = s.ReadAsync(buffer, 0, buffer.Length, cts.Token).Result;
    ConsoleWriteLine(ConsoleColor.Green, "Got it");
}
catch (Exception e)
{
    ConsoleWriteLine(ConsoleColor.Red, "error! " + e);
}

In this snippet, we are calling the google server and set a 300ms timeout (which you might have to modify the timeout or the address based on your connection speed, in order to see the cancellation). Here is a WireShark proof:



As you can see above a TCP RESET packet has been sent - if you have set the parameters in a way that the request does not complete before its timeout and gets cancelled. You can try this with a longer timeout or use a WebClient which is synchronous and make sure you will never ever see this RST packet.

Now the question is, should a network appliance pick on this responsible cancellation and treat it as an attack? By no means. But in my case, it did and it is very likely that it could do that with yours.

My solution came by whitelisting my IP against "TCP RESET attacks". After all, I was only trying to help the server.

Conclusion

Cancelling an HTTP async Task in the HttpClient results in sending TCP RESET which is considered malicious by some network appliances resulting in blacklisting your IP.

PS. The network appliance belonged to our infrastructure 3rd party provider whose security managed by another third party - it was not in Azure. The real solution would have been to remove such crazy rule, but anyhow, we developers don't always get what we want.


Monday 2 June 2014

BeeHive Series - Part 2 - Importing file from blob storage to ElasticSearch sample

[Level T1]

In the previous post, we introduced BeeHive and talked about an example usage where we check news feeds and send a notification if a keyword is found. In this post, we look at another example. You can find the source code in the BeeHive Github repo. Just open up BeeHive.Samples.sln file.

Processing files

Let's imagine we receive files in a particular blob location and we need to import/process them into the system. These files arrive in a particular folder structure and we need to watch the root folder. Then we need to pick them up, extract each row and send each record to be processed - in this case to be loaded onto an ElasticSearch cluster.
ElasticSearch is a horizontally-scalable and highly-available indexing and search technology. It runs on Windows, Linux and OSX, easy to setup and free to use. You can download the installer from http://www.elasticsearch.org/

NewFileArrived

So here, we design a system that watches the location and when it finds the files, it raises NewFileArrived event. This is a simple enough process yet what if we have multiple actors watching the location (very likely for a cloud scenario where the same process runs on many machines)? In this case we will receive multiple NewFileArrived events.
BeeHive provides pulsers that help you with your concurrency problems. FolderWatcherActor can subscribe to a topic that is fed by a pulser. In fact, in a BeeHive world, you could have pulsers that raise events at different intervals and raise events such as FiveMinutesPassedAnHourPassedADayPassed, etc and based on the requirement, your actors could be subscribing to any of these. Beauty ofmessage-based scheduling is that only a single instance of the actor will be receiving the message.
Raising the NewFileArrived event is not enough. When the actor wakes up again by receiving the next message and the file is there, it will send another NewFileArrived error. We can protect against this by:
1) Making processing Idempotent 2) Keep track of files received 3) Mark files by creating a status file next to them
We choose the last option so we can use the same status file further down. So after identifying the file, we create a file with the same name plus .status and write the status number, here 1.

public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var events = new List<Event>();
    var items = (await _dynamoStore.ListAsync(
        _configurationValueProvider.GetValue(Constants.SweepRootPathKey)))
        .ToArray();

    var notProcessed = items.Where(x => !x.IsVirtualFolder)
        .GroupBy(z => z.Id.Replace(Constants.StatusPostfix, ""))
        .Where(f => f.Count() == 1)
        .Select(w => w.Single());

    foreach (var blob in notProcessed)
    {
        events.Add(new Event(new NewFileArrived()
        {
            FileId = blob.Id
        }));
        await _dynamoStore.InsertAsync(new SimpleBlob()
        {
            Id = blob.Id + Constants.StatusPostfix,
            Body = new MemoryStream(BitConverter.GetBytes(1)) // status 1
        });
    }

    return events;
}

Process the file: fan-out the records

After receiving the NewFileArrived, we copy the file locally and split the file to the records and fan out the records with ImportRecordExtracted. We also send a ImportFileProcessed event.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var newFileArrived = evnt.GetBody<NewFileArrived>();
    var blob = await _dynamoStore.GetAsync(newFileArrived.FileId);
    var reader = new StreamReader(blob.Body);
    string line = string.Empty;
    var events = new List<Event>();
    while ((line = reader.ReadLine())!= null)
    {
        var fields = line.Split(new []{','},StringSplitOptions.RemoveEmptyEntries);
        events.Add(new Event( new ImportRecordExtracted()
        {
            Id = fields[0],
            Content = fields[2],
            IndexType = fields[1]
        }));
    }

    events.Add(new Event(new ImportFileProcessed()
    {
        FileId = newFileArrived.FileId
    }));

    return events;
}

ImportFileProcessed

The actor receiving this event will delete the file and the status file.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var importFileProcessed = evnt.GetBody<ImportFileProcessed>();
    var statusFile = importFileProcessed.FileId + Constants.StatusPostfix;

    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = importFileProcessed.FileId
    });
    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = statusFile
    });

    return new Event[0];
}

ImportRecordExtracted

Based on the type of the record, we "upsert" the record in the appropriate index in our ElasticSearch cluster.
public async Task> ProcessAsync(Event evnt)
{
    var importRecordExtracted = evnt.GetBody();
    var elasticSearchUrl = _configurationValueProvider.GetValue(Constants.ElasticSearchUrlKey);

    var client = new HttpClient();
    var url = string.Format("{0}/import/{1}/{2}", elasticSearchUrl,
        importRecordExtracted.IndexType,
        importRecordExtracted.Id);
    var responseMessage = await client.PutAsJsonAsync(url, importRecordExtracted);

    if (!responseMessage.IsSuccessStatusCode)
    {
        throw new ApplicationException("Indexing failed. " 
            + responseMessage.ToString());
    }

    return new[]
    {
        new Event(new NewIndexUpserted()
        {
            IndexUrl = url
        }) 
    };
}

NewIndexUpserted

While we currently do not need to know when we add or update an index in the ElasticSearch, this can later be used by other processes, so it is best to provision the event. As we said before, BeeHive events are meaningful business milestones that may or may not be used by your current system.

Here are our indexes when browsing to http://localhost:9200/import/_search

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 14,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "import",
      "_type" : "D",
      "_id" : "4",
      "_score" : 1.0, "_source" : {"Id":"4","IndexType":"D","Content":"These are controlled by min_term_freq"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "9",
      "_score" : 1.0, "_source" : {"Id":"9","IndexType":"E","Content":"There are other parameters such as min_word_length"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "11",
      "_score" : 1.0, "_source" : {"Id":"11","IndexType":"E","Content":"In order to give more weight to more interesting terms"}
    }, {
      "_index" : "import",
      "_type" : "A",
      "_id" : "2",
      "_score" : 1.0, "_source" : {"Id":"2","IndexType":"A","Content":"clauses in a bool query of interesting terms extracted from some provided text. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "7",
      "_score" : 1.0, "_source" : {"Id":"7","IndexType":"D","Content":"controlled by percent_terms_to_match. The terms are extracted from like_text "}
    }, {
      "_index" : "import",
      "_type" : "H",
      "_id" : "14",
      "_score" : 1.0, "_source" : {"Id":"14","IndexType":"H","Content":"score times some boosting factor boost_terms."}
    }, {
      "_index" : "import",
      "_type" : "B",
      "_id" : "3",
      "_score" : 1.0, "_source" : {"Id":"3","IndexType":"B","Content":"The interesting terms are selected with respect to their tf-idf scores. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "8",
      "_score" : 1.0, "_source" : {"Id":"8","IndexType":"D","Content":"which is analyzed by the analyzer associated with the field"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "10",
      "_score" : 1.0, "_source" : {"Id":"10","IndexType":"E","Content":"max_word_length or stop_words to control what terms should be considered as interesting. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "5",
      "_score" : 1.0, "_source" : {"Id":"5","IndexType":"D","Content":"The number of interesting terms is controlled by max_query_terms. "}
    } ]
  }
}


Cleanup processes

In the absence of transactions, business processes have to design the processes for failure. BeeHive promotes an approach that every process is broken down to its smallest elements and each implemented in an actor.

Sometimes it is necessary to design processes that look for the highly unlikely (yet possible) event of a failure when actor has done its work but the events returned never make it back to the service bus. In the case of inserting the new index, this is not a problem since we use PUT and the process is idempotent. However, this could be a problem in case of processing file where a status file is created but NewFileArrived never makes it back to the service bus. In this case, a crash unlocker process that checks the timestamp of the status file and deletes the file if older than e.g. 1 day, is all that is needed.

Conclusion

We can use pulsers to solve the inherent concurrency problem of multiple folder watcher actors watching the same folder. The fan-out process of breaking a file down to its record and parallilising the processing is one of the key benefits of cloud actors.