Implementing Top 10 Most Popular Articles in Real Time with Storm and MongoDB

In this post I will share how to implement 10 most popular articles using rolling count algorithm and exciting distributed computation platform Storm.
It is different from traditional BI, off-line approach, where data aggregated on some period of time. We will do analytics in real-time,
telling our users what’s hot and popular on the site right now, in last 5 minutes. It is similar to what Twitter does with “trending topics“.

We will be using open-source technologies – Storm and MongoDB.
Every time user interact with the content of the site – for example – clicks, likes on facebook, leave comments – our system capture those events. We save clickstream data in MongoDB and process it with Storm in real-time.
Why MongoDB? MongoDB is free, open-source, scalable database system. It has some features than we will be using in our implementation. Another alternative could be Redis.

I will not be covering how to capture events with REST API but any modern web framework provide this functionality. My favorite are Spring and Vert.x. Spring is robust enterprise framework, and Vert.x is a little Node.js clone in Java world. So choice is yours.

We want to get stream of event from some source. One of the source could be MongoDB capped collection. Capped collection can be used as a message queue, and we will be writing events to the end of collection and then reading messages back in order as they arrive. You can use real messaging product as well. Capped collections are fixed-size collections that support high-throughput operations that insert, retrieve, and delete documents based on insertion order. There is also so called tailable cursor that can be open against capped collection – cursor that remains open after the client exhausts the results in the initial cursor. Tailable cursors are conceptually equivalent to the tail Unix command with the -f option (i.e. with “follow” mode.) After clients insert new additional documents into a capped collection, the tailable cursor will continue to retrieve documents.

What is Storm:
Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, is used by many companies, and is a lot of fun to use!
For more information about storm, please see resorces section.

What is “Top 10 most popular article now?” In other word, it is measuring of “What’s hot?” in a user community. We are interested to measure what is most popular for a given time span, for example, what Most popular article in last 5 min or last hour.
We consider Article A is more popular than Article B for a given time span if we receive more events in clickstream for article A than for Article B. For example, if more users view, comment, like, share Article A than Article B in last 1 hour, then Article A is more popular and will be on top of our list.
Eventually, we want our Storm topology to to periodically produce the top N articles similar to the following example below:

Time 1:
mostPopular1
Time 2 (one minute later):
mostpopular2
Time 3 (another minute later):
mostpopular3

Our storm topology will collect data for specified period of time for each individual article and then every N sec emit updated counters to the MongoDB database.

We will be using Storm-starter – this is a project on github to help you to start with storm development. Luckily this project provides many nesessary blocks to implement our task. We are interesting in “RollingWordCount” topology. SlotBasedCounter and SlidingWindowCounter from Storm-starter project are our data structures. Detailed description of this data structures can be found here: Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm

Topology.
Our topology consist of following elements MongoCappedCollectionSpout, ArticleExtractorBolt, RollingCountBolt, IntermediateRankingsBolt, TotalRankingsBolt and MongoWriterBolt.
trendingtopictopology
MongoCappedCollectionSpout.
This spout reads from capped collection using mongoDB tailable cursor.
We also have to keep track of last processed event timestamp in case we need to restart our topology. Then we will be able to proceed from last processed record.
In this code I am reading from mongoDB tailable cursor and put documents in local queue, so they became available in another thread.

this.collection = this.db.getCollection(collectionName);
    // provide the query object
    this.cursor = this.collection.find(query)
            .sort(new BasicDBObject("$natural", 1))
            .addOption(Bytes.QUERYOPTION_TAILABLE)
            .addOption(Bytes.QUERYOPTION_AWAITDATA)
            .addOption(Bytes.QUERYOPTION_NOTIMEOUT);

    // While the thread is set to running
    while (running.get()) {
      try {
        // Check if we have a next item in the collection
        if (this.cursor.hasNext()) {
          if (LOG.isInfoEnabled()) LOG.info("Fetching a new item from MongoDB cursor");
          // Fetch the next object and push it on the queue
          this.queue.put(this.cursor.next());
        } else {
          // Sleep for 50 ms and then wake up
          Thread.sleep(50);
        }

The snippet below emits document to the next element in the topology. I am not adding any filtering here to keep the spout lightweight.
@Override
  protected void processNextTuple() {
    DBObject object = this.queue.poll();
    // If we have an object, let's process it, map and emit it
    if (object != null) {
      // Map the object to a tuple
      List<Object> tuples = this.mapper.map(object);

      // Fetch the object Id
      ObjectId objectId = (ObjectId) object.get("_id");

      // Emit the tuple collection
      this.collector.emit(tuples, objectId);
    }
  }

ArticleExtractorBolt emit articleId from event document.
 @Override
    public void execute(Tuple tuple) {
       
        if (TupleHelpers.isTickTuple(tuple)) {
            outputCollector.emit(new Values(tuple));

        }  else {
           DBObject object=(DBObject) tuple.getValueByField("document");
           String articleId=(String)object.get("articleId");
           outputCollector.emit(tuple, new Values(articleId));
        }
    }

RollingCountBolt,AbstractRankerBolt,IntermediateRankingsBolt and TotalRankingsBolt are described in great details here.
MongoWriterBolt simply get ranking object as tuple from TotalRankingBolt and insert it into mongoDB.
I use so called tick tuple, which allow us to save data to MongoDB at predefined interval, for example every 10 sec.

configuring Topology:

Last step is to configure and execute the topology. Sample configuration to create local topology:

MongoCappedCollectionSpout mongoSpout=new MongoCappedCollectionSpout(url, collectionName)
// Build a topology

String spoutId = "eventReader";
String articleExtractorId="articleIdReader"
String counterId = "counter";
String intermediateRankerId = "intermediateRanker";
String totalRankerId = "finalRanker";
String totalsavetomongoId="savetoMongoDBBolt"

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(spoutId, mongoSpout)
builder.setBolt(articleExtractorId,new ArticleExtractorBolt()).shuffleGrouping(spoutId)
builder.setBolt(counterId, new RollingCountBolt(60, 10), 2).fieldsGrouping(articleExtractorId, new Fields("articleId"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId,
        new Fields("obj"));
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
builder.setBolt(totalsavetomongoId,new MongoWriterBolt(url, "cp","ranking")).shuffleGrouping(totalRankerId);


// Set debug config
Config conf = new Config();
conf.setDebug(true);
int tickFrequencyInSeconds = 10;
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);




LocalCluster cluster = new LocalCluster();

visualization
As soon as data became available in MongoDB, it is a trivial task to expose the information as JSON to client code.
I use Google Chart Tools to display information in nice looking form.

 <script type='text/javascript'>
        google.load('visualization', '1', {packages:['table']});
        google.setOnLoadCallback(drawTable);
        function drawTable(ranking) {
            var obj = JSON.parse(ranking);
            var articles = new Array();
            for (var key in obj) {
                var article = new Array();
                article.push(key, obj[key])
                articles.push(article)
                //console.log(article)
            }
            var data = new google.visualization.DataTable();
            data.addColumn('string', '10 Most Popular Articles at this moment');
            data.addColumn('number', 'views');
            data.addRows(articles);

            var table = new google.visualization.Table(document.getElementById('table_div'));
            table.draw(data, {showRowNumber: true});
        }
    </script>

This will produce nice looking table. I would recommend to explore Google Chart for different way to visualize your data.
Deployment.
Last step is top deploy project. Using storm-deploy project it should be very easy to spin instances on Amazon AWS and deploy Storm cluster with our topology. (make sure to reconfigure it for clustered deployment!)
In this short exercise we were able quickly assemble scalable, fault-tolerant application to count “What’s Hot now?” in real time.
Resources:
Nathan Marz explain Storm, a distributed fault-tolerant and real-time computational system currently used by Twitter to keep statistics on user clicks for every URL and domain.
Awesome article “Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm” by Michael G. Noll.
Good documentation written by Storm creator – Nathan Marz
Getting Started with Storm Book

MongoDB capped collections
Google Chart Tools
MongoDB Pub/Sub with Capped Collections
Source Code:
Trending Topic Storm Topology on GitHub
visualization with Google Graph on Github

Submit a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>