Monday 2 June 2014

BeeHive Series - Part 2 - Importing file from blob storage to ElasticSearch sample

[Level T1]

In the previous post, we introduced BeeHive and talked about an example usage where we check news feeds and send a notification if a keyword is found. In this post, we look at another example. You can find the source code in the BeeHive Github repo. Just open up BeeHive.Samples.sln file.

Processing files

Let's imagine we receive files in a particular blob location and we need to import/process them into the system. These files arrive in a particular folder structure and we need to watch the root folder. Then we need to pick them up, extract each row and send each record to be processed - in this case to be loaded onto an ElasticSearch cluster.
ElasticSearch is a horizontally-scalable and highly-available indexing and search technology. It runs on Windows, Linux and OSX, easy to setup and free to use. You can download the installer from http://www.elasticsearch.org/

NewFileArrived

So here, we design a system that watches the location and when it finds the files, it raises NewFileArrived event. This is a simple enough process yet what if we have multiple actors watching the location (very likely for a cloud scenario where the same process runs on many machines)? In this case we will receive multiple NewFileArrived events.
BeeHive provides pulsers that help you with your concurrency problems. FolderWatcherActor can subscribe to a topic that is fed by a pulser. In fact, in a BeeHive world, you could have pulsers that raise events at different intervals and raise events such as FiveMinutesPassedAnHourPassedADayPassed, etc and based on the requirement, your actors could be subscribing to any of these. Beauty ofmessage-based scheduling is that only a single instance of the actor will be receiving the message.
Raising the NewFileArrived event is not enough. When the actor wakes up again by receiving the next message and the file is there, it will send another NewFileArrived error. We can protect against this by:
1) Making processing Idempotent 2) Keep track of files received 3) Mark files by creating a status file next to them
We choose the last option so we can use the same status file further down. So after identifying the file, we create a file with the same name plus .status and write the status number, here 1.

public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var events = new List<Event>();
    var items = (await _dynamoStore.ListAsync(
        _configurationValueProvider.GetValue(Constants.SweepRootPathKey)))
        .ToArray();

    var notProcessed = items.Where(x => !x.IsVirtualFolder)
        .GroupBy(z => z.Id.Replace(Constants.StatusPostfix, ""))
        .Where(f => f.Count() == 1)
        .Select(w => w.Single());

    foreach (var blob in notProcessed)
    {
        events.Add(new Event(new NewFileArrived()
        {
            FileId = blob.Id
        }));
        await _dynamoStore.InsertAsync(new SimpleBlob()
        {
            Id = blob.Id + Constants.StatusPostfix,
            Body = new MemoryStream(BitConverter.GetBytes(1)) // status 1
        });
    }

    return events;
}

Process the file: fan-out the records

After receiving the NewFileArrived, we copy the file locally and split the file to the records and fan out the records with ImportRecordExtracted. We also send a ImportFileProcessed event.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var newFileArrived = evnt.GetBody<NewFileArrived>();
    var blob = await _dynamoStore.GetAsync(newFileArrived.FileId);
    var reader = new StreamReader(blob.Body);
    string line = string.Empty;
    var events = new List<Event>();
    while ((line = reader.ReadLine())!= null)
    {
        var fields = line.Split(new []{','},StringSplitOptions.RemoveEmptyEntries);
        events.Add(new Event( new ImportRecordExtracted()
        {
            Id = fields[0],
            Content = fields[2],
            IndexType = fields[1]
        }));
    }

    events.Add(new Event(new ImportFileProcessed()
    {
        FileId = newFileArrived.FileId
    }));

    return events;
}

ImportFileProcessed

The actor receiving this event will delete the file and the status file.
public async Task<IEnumerable<Event>> ProcessAsync(Event evnt)
{
    var importFileProcessed = evnt.GetBody<ImportFileProcessed>();
    var statusFile = importFileProcessed.FileId + Constants.StatusPostfix;

    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = importFileProcessed.FileId
    });
    await _dynamoStore.DeleteAsync(new SimpleBlob()
    {
        Id = statusFile
    });

    return new Event[0];
}

ImportRecordExtracted

Based on the type of the record, we "upsert" the record in the appropriate index in our ElasticSearch cluster.
public async Task> ProcessAsync(Event evnt)
{
    var importRecordExtracted = evnt.GetBody();
    var elasticSearchUrl = _configurationValueProvider.GetValue(Constants.ElasticSearchUrlKey);

    var client = new HttpClient();
    var url = string.Format("{0}/import/{1}/{2}", elasticSearchUrl,
        importRecordExtracted.IndexType,
        importRecordExtracted.Id);
    var responseMessage = await client.PutAsJsonAsync(url, importRecordExtracted);

    if (!responseMessage.IsSuccessStatusCode)
    {
        throw new ApplicationException("Indexing failed. " 
            + responseMessage.ToString());
    }

    return new[]
    {
        new Event(new NewIndexUpserted()
        {
            IndexUrl = url
        }) 
    };
}

NewIndexUpserted

While we currently do not need to know when we add or update an index in the ElasticSearch, this can later be used by other processes, so it is best to provision the event. As we said before, BeeHive events are meaningful business milestones that may or may not be used by your current system.

Here are our indexes when browsing to http://localhost:9200/import/_search

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 14,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "import",
      "_type" : "D",
      "_id" : "4",
      "_score" : 1.0, "_source" : {"Id":"4","IndexType":"D","Content":"These are controlled by min_term_freq"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "9",
      "_score" : 1.0, "_source" : {"Id":"9","IndexType":"E","Content":"There are other parameters such as min_word_length"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "11",
      "_score" : 1.0, "_source" : {"Id":"11","IndexType":"E","Content":"In order to give more weight to more interesting terms"}
    }, {
      "_index" : "import",
      "_type" : "A",
      "_id" : "2",
      "_score" : 1.0, "_source" : {"Id":"2","IndexType":"A","Content":"clauses in a bool query of interesting terms extracted from some provided text. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "7",
      "_score" : 1.0, "_source" : {"Id":"7","IndexType":"D","Content":"controlled by percent_terms_to_match. The terms are extracted from like_text "}
    }, {
      "_index" : "import",
      "_type" : "H",
      "_id" : "14",
      "_score" : 1.0, "_source" : {"Id":"14","IndexType":"H","Content":"score times some boosting factor boost_terms."}
    }, {
      "_index" : "import",
      "_type" : "B",
      "_id" : "3",
      "_score" : 1.0, "_source" : {"Id":"3","IndexType":"B","Content":"The interesting terms are selected with respect to their tf-idf scores. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "8",
      "_score" : 1.0, "_source" : {"Id":"8","IndexType":"D","Content":"which is analyzed by the analyzer associated with the field"}
    }, {
      "_index" : "import",
      "_type" : "E",
      "_id" : "10",
      "_score" : 1.0, "_source" : {"Id":"10","IndexType":"E","Content":"max_word_length or stop_words to control what terms should be considered as interesting. "}
    }, {
      "_index" : "import",
      "_type" : "D",
      "_id" : "5",
      "_score" : 1.0, "_source" : {"Id":"5","IndexType":"D","Content":"The number of interesting terms is controlled by max_query_terms. "}
    } ]
  }
}


Cleanup processes

In the absence of transactions, business processes have to design the processes for failure. BeeHive promotes an approach that every process is broken down to its smallest elements and each implemented in an actor.

Sometimes it is necessary to design processes that look for the highly unlikely (yet possible) event of a failure when actor has done its work but the events returned never make it back to the service bus. In the case of inserting the new index, this is not a problem since we use PUT and the process is idempotent. However, this could be a problem in case of processing file where a status file is created but NewFileArrived never makes it back to the service bus. In this case, a crash unlocker process that checks the timestamp of the status file and deletes the file if older than e.g. 1 day, is all that is needed.

Conclusion

We can use pulsers to solve the inherent concurrency problem of multiple folder watcher actors watching the same folder. The fan-out process of breaking a file down to its record and parallilising the processing is one of the key benefits of cloud actors.



No comments:

Post a Comment

Note: only a member of this blog may post a comment.