Monday 19 March 2018

Business and Log Events, Azure EventHub and Psyfon

TLDR; If you need to send large number of events to Azure EventHub from a .NET process or passthru API, consider using psyfon.

Over the last two decades, many businesses have transformed themselves and modeled their processes and operations as software (bespoke or customising off-the-shelf products). These systems would turn business processes and transactions into data that can be stored, queried or exchanged - ROI for such data is very high and the challenges of building/evolving such systems have been widely known. These systems typically generate business events.

Businesses have been turning their attention to the next goal: capturing (and analysing in near real-time) the information that commonly not considered as valuable data, such as minute user interactions with sites/apps down to the level of mouse movements and scrolls, sensor outputs in vehicles or factories, CCTV streams from municipal cameras to predict/forecast traffic, shopper interactions/behaviour in supermarkets to gain insight/provide recommendations, etc. These systems generate what I - for better or worse - call log events which I have explained in the past here but would be useful to re-cap their differences with business events in the table below.

While log events could have been historically stored and then analysed in batch mode, there is growing need to make some sense of the data in real-time in addition to in-depth analysis in offline mode. That is essentially stream processing.

Stream processing is hard. Building resilient processes to be able to reliably process tons of data in parallel while handling back-pressure, point failures, peaks of activity - all of which with few seconds or even sub-second latency - is not trivial. There are such systems already available such as Apache Flink, Kafka Streams or Spark Streaming. These systems typically work on top of an Event Store such as Kafka or Azure EventHubs.

Azure EventHub has been built for publishing and consuming events at high-scale. The design is not dissimilar to that of Kafka: a replicated/Highly-Available log per arbitrary (but constant) number of partitions where ordering can be guaranteed only at the partition level. You can read from the beginning of the log or from any point in the stream but remembering where you last read events from (checkpointing) is completely left to the consumers.

Typically only a single consumer is meant to read from a partition hence having more partitions is important for improving scalability. In terms of publishing, this is much more laxed: a high number of producers can send events to EventHub.

How does EventHub assign events to partitions? You can optionally send a Partition Key which gets hashed and used for assigning to partitions. To make sure you get the best out of your system, the Partition Key needs to be evenly distributed. If you are sending device events, you would most likely use the DeviceId. For customer events, Customer ID is a natural choice. This will ensure all events for a device or customer are ordered according to the time they are arrived at the EventHub.

Usually there are data pipelines that receive and funnel the incoming data (usually through a passthru API) to these stores but the key point is these data pipelines exhibit the same challenges shared by the stream processing. While initially exposing EventHub directly to the outside world was advocated by Microsoft, you would most likely want to hide your EventHub behind a passthru API that does authentication and optimises delivery of the events to the EventHub by batching. This layer is also useful to handle back-pressure by buffering events so you can deal with spikes gracefully. One thing you cannot do here at the API is to keep the caller waiting for event to be successfully committed to the EventHub for a few reasons. First of all, EventHub can sometimes have latency in the order of 100-150ms. While this is completely acceptable for most purposes, (other than High-Frequency Trading!), keeping clients waiting means more power consumption for publishers many of whom are phones and other low-power devices, sending many events per hour. Another reason is that EventHub works best if you send events in batches hence waiting until your buffer is full and then committing the batch of events.

Batching is already supported built-in with the EventHub:
var batch = new EventDataBatch("myPartitionKey");
batch.TryAdd(eventData); // keep adding until method returns false 
await client.SenAsync(batch);

But did you notice something? All events within a batch must have the same partition key. While it is understandable Microsoft made this decision for performance reasons - since all such events will be sent to the same partition otherwise batch has to wait for all partitions involved to respond successfully - it essentially renders batching remarkably less useful. As said earlier, Partition Key must have widely diverse value such as Customer ID or Device ID. There is no guarantee that an event arrived from a customer at the API is followed by enough events from the same customer in a reasonable amount of time to fill the batch and make batching worthwhile - let alone those events arriving at exactly the same web-head.

Solution is to essentially send the events directly to partitions. But the hashing takes place at the EventHub, how could we know what Partition Key gets allocated to which partition? This implementation is opaque and is not possible to reproduce it outside EventHub. That is why we have to hash the Partition Keyes ourselves and send batches directy to the partitions. All we need is a hashing algorithm capable of uniformly hash Partition Keyes across partitions. It turns out that most hashing algorithms including MD5 can easily achieve this, although some might be cryptographically broekn. MD5 is a very quick and efficient algorithm hence is a good fit.


Now, all of what I have said so far - batching, buffering and hashing - have been implemented in an Open Source project called Psyfon. Using this library supporting both .NET Standard 2.0 and .NET 4.52, all you have to do is to create a single instance of BufferingEventDispatcher per process, start it and add events to it:

var singletonDispatcher = BufferingEventDispatcher("<connection string>");

// and somewhere else in the code where events generated
var ed = new EventData(mySerialisedEventAsByteArray);

You can set a maximum byte size (according to the size of your events) and maximum number of seconds before committing the batches, whichever is reached earlier batch will be committed to the partition. I have tested it under high scale and essentially a single process had no issue sending 5000 EPS to EventHub. I will be publishing the results of a more extended test soon.

While my use case was a passthru API, this cane be equally used for dispatching monitoring and instrumentation events to the EventHub. PerfIt, another Open Source library will benefit from this very soon - watch the space.

No comments:

Post a Comment

Note: only a member of this blog may post a comment.