Your First Real Time Big Data Analytics Application


Search XAP 9.0
Searching XAP 9.0 Documentation
Browse XAP 9.0

                                                              

Introduction

We live almost every aspect of our lives in a real-time world. Think about our social communications; we update our friends online via social networks and micro-blogging, we text from our cellphones, or message from our laptops. But it's not just our social lives; we shop online whenever we want, we search the web for immediate answers to our questions, we trade stocks online, we pay our bills, and do our banking. All online and all in real time.

Real time doesn't just affect our personal lives. Enterprises and government agencies need real-time insights to be successful, whether they are investment firms that need fast access to market views and risk analysis, or retailers that need to adjust their online campaigns and recommendations to their customers. Even homeland security has come to increasingly rely on real-time monitoring.
The amount of data that flows in these systems is huge. Twitter, for example, 250 million Tweets per day, which is nearly 3,000 Tweets per second, on average. At various peak moments through 2011, Twitter did as high as 8,000+ tps, with at least one instance of over 25,000 tps. Facebook gets 100 billion hits per day! Google get 2 billion searches a day. These numbers are growing as more and more users join the service.

This tutorial explains the challenges of a Real-time (RT) Analytics system using Twitter as an example, and show in details how these challenges can be met using GigaSpaces XAP.

The Challenge

Twitter users aren't just interested in reading tweets of the people they follow; they are also interested in finding new people and topics to follow based on popularity. This poses several challenges to the Twitter architecture due to the vast volume of tweets. In this example, we focus on the challenges relating to calculating the word count use case. The challenge here is straightforward:

  1. Tens of thousands of tweets need to be stored and parsed every second.
  2. Word counters need to be aggregated continuously. Since tweets are limited to 140 characters, we are dealing with hundreds of thousands of words per second.
  3. Finally, the system needs to scale linearly so the stream can grow as the business grows.

These challenges are simple to deal with as there are knock-on effects from the volume and analysis of the data, as follows:

  • Tens of thousands of tweets to tokenize every second, meaning hundreds of thousands of words to filter -> CPU bottleneck
  • Tens/hundreds of thousands of counters to update -> Counters contention
  • Tens/hundreds of thousands of counters to persist -> Database bottleneck
  • Tens of thousands of tweets to store in the database every second -> Database bottleneck

Solution Architecture

In designing a solution, we need to consider the various challenges we must address.

The first challenge is providing unlimited scalability - therefore, we are talking about dynamically increasing resources to meet demand, and hence implementing a distributed solution using parallelized processing approach.

The second challenge is providing low latency - we can't afford to use a distributed file system such as Hadoop HDFS, a relational datbase or a distributed disk-based structured data store such as NoSQL database. All of these use physical I/O that becomes a bottleneck when dealing with massive writes. Furthermore, we want the business logic collocated with the data on a single platform for faster processing, with minimal network hops and integration issues.

To overcome the latency challenge, we use an in-memory system of record. GigaSpaces XAP is built just for that. Its core component is in-memory data grid (IMDG, a.k.a. the Space) that partitions the data based on a specified attribute within the data object. The data grid uses a share nothing policy, and each primary node has consistent backup. In addition the grid keeps its SLA by self-healing crashed nodes, so it's completely consistent and highly-available.

The third challenge is the efficient processing of the data in a distributed system. To achieve this, we use the Map / Reduce algorithm for distributed computing on large data sets on clusters of computers. In the Map step, we normalize the data so we can create local counters. In the Reduce step, we aggregate the entire set of interim results into a single set of results.

In our Twitter example, we need to build a flow that provides the Map / Reduce flow in real time. For this we use XAP's Processing and Messaging features collocated with its corresponding data.

Our solution therefore uses 2 modules for persisting and processing data, as follows:

  • The feeder module persists raw tweets (the data) in the Space (IMDG)--The feeder partitions the Tweets using their ID (assumed to be globally unique) to achieve a scalable solution with rapid insertion of tweets into the Space.
  • The processor module implements the Map / Reduce algorithm that processes tweets in the Space, resulting in real-time word counts. The tweets are then moved from the Space to the historical data store located in an Apache Cassandra Big-Data Store --The processor implements a workflow of events using the IMDG ability to store transient data in-memory and trigger processing activity in-memory.

The processor's Map phase has the following logical steps:

  1. Tokenizes tweets into maps of tokens and writes them to the Space (triggered by the writing of raw tweets to the Space).
  2. Filters unwanted words from the maps of tokens and writes the filtered maps to the Space (triggered by the writing of maps of tokens to the Space).
  3. Generates a token counter per word, distributing the counters across the grid partitions for scalability and performance (triggered by the writing of filtered maps of tokens to the Space).

The processor's Reduce phase aggregates the local results into global word counters.

The following diagram shows the Map / Reduce flow.

Implementing the Solution as a XAP Application

To implement our solution, we use Cassandra as the historical data tier and build a XAP application the process and persist the data in real-time using the following modules:

  • The processor module is a XAP processing unit that contains the Space and performs the real-time workflow of processing the incoming tweets. The processing of data objects is performed using event containers.
  • The feeder module is implemented as a processing unit thereby enabling the dynamic deployment of multiple instances of the feeder across multiple nodes, increasing the load it can manage, and thus the ability handle larger tweet streams. The processing unit contains the following feeder strategies:
    • The TwitterHomeTimelineFeederTask class, which feeds in tweets from Twitter's public timeline using Spring Social, converting them to a canonical Space Document representation, and writes them to the Space ,which in turn invokes the relevant event processors of the processor module.
    • The ListBasedFeederTask class is a simulation feeder for testing purposes, which simulates tweets locally, avoiding the need to connect to the Twitter API over the Internet.
  • Optionally, the common module for including items that are shared between the feeder and the processor modules (e.g. common interfaces, shared data model, etc.). Currently, this module is empty.
  • The rt_app directory contains the recipes and other scripts required to automatically deploy, monitor and manage the entire application together with the Cassandra back-end using Cloudify.

Building the Application

The following are step-by-step instructions building the application.

1. Download and install XAP. XAP 9.0 comes with a built-in license that's good for 3 months after the download.

2. Getting the application
The source files for this application are maintained in Github Gigaspaces rt-analytics repository. If you're a github user and have already setup the github client, you can fork the repository and clone it to your local machine, as follows:

cd <project root directory> 
git clone <your new repository URL>

We welcome your contributions and suggestions for improvements, and invite you to submit them by performing a pull request. We will review your recommendations and have relevant changes merged. Alternatively, you can download the source files in zip format from the repository home on github.

3. Installing Maven and the GigaSpaces Maven plug-in
The application uses Apache Maven. If you don't have Apache Maven installed, please install it. Once installed, make sure that you set the MVN_HOME environment variable and add $MVN_HOME/bin to your path.

Maven as Part of the XAP Distribution
Maven is included in the distribution, so if you've already downloaded XAP you can find it under <XAP installation root>/tools/maven/apache-maven-3.0.x.

4. Building the Application
Go to the <applicationRoot> folder (contains the application's project files), and then at the command (Windows) or shell (*nix) prompt, type:

mvn package

If you are getting No gslicense.xml license file was found in current directory error, please run the following:

mvn package -DargLine="-Dcom.gs.home="<XapInstallationRoot>"

Where XapInstallationRoot should be XAP root folder - example:

mvn package -DargLine="-Dcom.gs.home="c:\gigaspaces-xap-premium-9.0.0-ga"

The Maven build will download the required dependencies, compile the source files, run the unit tests, and build the required jar files. In our example, the following processing unit jar files are built:

  • <project root>/feeder/target/rt-feeder-XAP-9.0.jar
  • <project root>/processor/target/rt-processor-XAP-9.0.jar

Once the build is complete, a summary message similar to the following is displayed:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] rt-analytics ...................................... SUCCESS [0.001s]
[INFO] rt-common ......................................... SUCCESS [2.196s]
[INFO] rt-processor ...................................... SUCCESS [11.301s]
[INFO] rt-feeder ......................................... SUCCESS [3.102s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.768s
[INFO] Finished at: Sun May 13 13:38:06 IDT 2012
[INFO] Final Memory: 14M/81M
[INFO] ------------------------------------------------------------------------

Running and Debugging the Application within an IDE

Since the application is a Maven project, you can load it using your Java IDE and thus automatically configure all module and classpath configurations. With IntelliJ, simply click "File -> Open Project" and point to <applicationRoot>/pom.xml. IntelliJ will load the project and present the modules for you. If you're using Eclipse, you can go to the application root folder, and then at the command (Windows) or shell (*nix) prompt, type: mvn eclipse:eclipse to generate an Eclipse project.

Once the project is loaded in your IDE, you can run the application, as follows:

rt-processor project run configuration:

rt-feeder project run configuration:

For more information about the IntegratedProcessingUnitContainer class (runs the processing units within your IDE), see Running and Debugging Within Your IDE.

You should set the GigaSpaces home folder using the com.gs.home system property with the rt-processor project. Here is how:
-Dcom.gs.home="-Dcom.gs.home=c:\gigaspaces-xap-premium-9.0.0-ga"
Make sure you have updated gslicense.xml located under the GigaSpaces XAP root folder with the license key provided as part of the email sent to you after downloading GigaSpaces XAP.

To run the application, run the processor configuration, and then the feeder configuration. An output similar to the following is displayed:

2012-05-13 13:44:11,877  INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ...
2012-05-13 13:44:12,777  INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ...
2012-05-13 13:44:13,777  INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ...
2012-05-13 13:44:14,777  INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ...
2012-05-13 13:44:15,778  INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ...
2012-05-13 13:44:15,804  INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 4
2012-05-13 13:44:15,805  INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 1
2012-05-13 13:44:15,805  INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 3
2012-05-13 13:44:15,806  INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 5
2012-05-13 13:44:15,806  INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 2
2012-05-13 13:44:15,893  INFO [org.openspaces.bigdata.processor.LocalTokenCounter] - local counting of a bulk of 5 tweets
2012-05-13 13:44:15,894  INFO [org.openspaces.bigdata.processor.LocalTokenCounter] - writing 22 TokenCounters across the cluster
2012-05-13 13:44:15,921  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token Report by 1
2012-05-13 13:44:15,925  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token never by 1
2012-05-13 13:44:15,925  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token find by 3
2012-05-13 13:44:15,925  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token the by 1
2012-05-13 13:44:15,925  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token pee by 1
2012-05-13 13:44:15,926  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token have by 1
2012-05-13 13:44:15,926  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token close by 1
2012-05-13 13:44:15,926  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token didn by 1
2012-05-13 13:44:15,927  INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token door by 1
Switching between TwitterHomeTimelineFeederTask and ListBasedFeederTask
For testing purposes, you can always switch between the TwitterHomeTimelineFeederTask class and the ListBasedFeederTask class. The former uses real-time Twitter time line data, while the latter uses simulated tweet data. By default, TwitterHomeTimelineFeederTask is enabled. To switch to ListBasedFeederTask, comment out the @Component line at the top of the TwitterHomeTimelineFeederTask source file, and uncomment the same line in the ListBasedFeederTask source file. Then rebuild the project using the following command: mvn package.
Since the Twitter API permits only 150 requests per hour for unauthenticated calls, your feeder might stop if you will be running it for some time.

Running the Project with XAP runtime Environment

The following are step-by-step instructions for running the application in XAP:

  1. Download and install XAP.
  2. Edit <XapInstallationRoot>/gslicense.xml> and place the license key file provided with the email sent to you after downloading GigaSpaces XAP as the <licensekey> value.
  3. Go to the <XapInstallationRoot>/bin> folder, and start a Grid Service Agent by running the gs-agent.sh/bat script. This will start two GSCs (GSCs are the container JVMs for your processing units) and a GSM.
  4. At the *nix shell prompt (for Windows use the equivalent batch files), type:
    ./gs.sh deploy <applicationRoot>/processor/target/rt-processor-XAP-9.0

    You should see the following output:

    Uploading [rt-processor-XAP-9.0] to [http://127.0.0.1:58313/]
    Waiting indefinitely for [4] processing unit instances to be deployed...
    [rt-processor-XAP-9.0.2] [1] deployed successfully on [127.0.0.1]
    [rt-processor-XAP-9.0.2] [1] deployed successfully on [127.0.0.1]
    [rt-processor-XAP-9.0.2] [2] deployed successfully on [127.0.0.1]
    [rt-processor-XAP-9.0.2] [2] deployed successfully on [127.0.0.1]
    Finished deploying [4] processing unit instances

Next - deploy the feeder:

./gs.sh deploy <applicationRoot>/feeder/target/rt-feeder-XAP-9.0

You should see the following output:

Uploading [rt-feeder-XAP-9.0] to [http://127.0.0.1:58313/]
SLA Not Found in PU.  Using Default SLA.
Waiting indefinitely for [1] processing unit instances to be deployed...
[rt-feeder-XAP-9.0] [1] deployed successfully on [127.0.0.1]
Finished deploying [1] processing unit instances

Once the application is running, you can use the XAP UI tools to view your application , access the data and the counters and manage the application:

  • For the Web Based UI run gs-webui.bat/sh and point your browser to localhost:8099
  • For the Rich Based UI run gs-ui.bat/sh
More Deployment Options
To learn about additional options for deploying your XAP processing units, please see Deploying onto the Service Grid

Viewing Most Popular words on Twitter

To view the most popular words on Twitter , start the GS-UI using the gs-ui.bat/sh , click the Query icon as demonstrated below and execute the following SQL Query by clicking the button:

select uid,* from com.j_spaces.map.Envelope order by value DESC

You should see the top most popular words on twitter ordered by their popularity:

You can re-execute the query just by clicking the button again. This will give you real-time view on the most popular words on Twitter.

Persisting to Cassandra

Once raw tweets are processed, they are moved from the Space to the historical data backend store. By default, this points to a simple flat file storage. The example application also includes a Cassandra driver CassandraExternalPersistence which implements the ExternalPersistence interface.

The following are step-by-step instructions for configuring te application to persist to Cassandra.
1. Edit the <applicationRoot>/processor/src/main/resources/META-INF/spring/pu.xml file, comment out the fileExternalPersistence line and uncomment the cassandraExternalPersister line, as follows:

<!-- fileExternalPersistence is meant for testing purposes, and persists to a file in the local file system -->
<!--bean id="fileExternalPersistence" class="org.openspaces.bigdata.processor.FileExternalPersistence">
    <constructor-arg index="0" value="tweetRepo.txt"/>
</bean-->

<!-- cassandraExternalPersister persists to a cassandra DB -->
<bean id="cassandraExternalPersister" class="org.openspaces.bigdata.processor.CassandraExternalPersistence"/>

2. Download, install, and start the Cassandra database. For more information, see Cassandra's Getting Started page.
3. Run the cassandra_schema.txt script to define keyspace and column family. At the *nix shell prompt, type:

<cassandra home>/bin/cassandra-cli --host <cassandra host name> --file <project home>/processor/cassandra-schema.txt

4. Run the application within XAP, as described in the previous section.

Running the Example on any Cloud using the Cloudify component

To run the application with the Cassandra DB as one application on any cloud, we will use the new set of cloud features introduced with XAP 9.0.0. A key concept in this approach is deploying and managing the entire application life cycle using a recipe. This approach provides total application life-cycle automation without any code or architecture change. Furthermore, it is cloud neutral so you don't get locked-in to a specific cloud vendor.

The following snippet shows our example application's recipe:

application {
	name="rt_app"

	service {
		name = "feeder"
		dependsOn = ["processor"]
	}
	service {
		name = "processor"
		dependsOn = ["rt_cassandra"]		
	}
	service {
		name = "rt_cassandra"	
	}
}

The following snippet shows the life-cycle events described in the Cassandra service recipe:

service {
  name "rt_cassandra"
  icon "Apache-cassandra-icon.png"
  numInstances 1
  type "NOSQL_DB"
  lifecycle{
		init 		"cassandra_install.groovy"
		preStart 	"cassandra_prestart.groovy"
		start 		"cassandra_start.groovy"
		postStart 	"cassandra_poststart.groovy"

	}
...
}

The following snippet shows the processing unit described in the processor recipe:

service {
  icon "icon.png"
  name "processor"
  numInstances 2
  statefulProcessingUnit {
    binaries "rt-analytics-processor.jar"   		
    sla {
      memoryCapacity 32
      maxMemoryCapacity 32
      highlyAvailable true
      memoryCapacityPerContainer 16 
    }
  }	
}

The application recipe is packaged, as follows:

Testing the application on a Local Cloud

XAP 9.0 comes with a cloud emulator called localcloud that allows you to test the recipe execution on your local machine.
The following are step-by-step instructions for installing our application on localcloud.

  1. Go to the <XapInstallationRoot>/tools/cli/ folder, and at the command (Windows) prompt, type: cloudify.bat (or at the shell *nix prompt, type: cloudify.sh).
  2. To start the localcloud services, at the prompt, type bootstrap-localcloud. This may take few minutes.
  3. To deploy the application, at the prompt, type:
    install-application c:/rt-app
    Tracking installation progress
    You can track the progress on the shell and on the web management console (localhost:8099).

For more information, see Deploying Applications page.

Running on clouds

To run the application on one of the supported clouds, proceed the following steps:

  1. Configure the cloud driver configuration file and get the cloud certificate. For more information, see Post-Installation Configuration page.
  2. Bootstrap the cloud. For more information, see The Bootstrapping Process page.
  3. To install the application, use the install-application command, as described in the previous section.
    Running XAP on the Cloud

    In order to use your license on the cloud environment you should perform the following:

    • cd to <XAP installation root>/tools/cli/plugins/ecs/<cloud name>/upload
    • create directory cloudify-overrides
    • copy your license(<XAP installation root>/gslicense.xml) to the above directory.

The Design In Details

Now let's take a closer look at the components of the solution. Our solution is designed to efficiently cope with getting and processing the large volume of tweets. First, we partition the tweets so that we can process them in parallel, but we have to decide on how to partition them efficiently. Partitioning by user might not be sufficiently balanced, therefore we decided to partition by the tweet ID, which we assume to be globally unique. Then we need persist and process the data with low latency, and for this we store the tweets in memory.

This section describes the following components of the solution that implements these design decisions:
Getting the Tweets
Parsing the Tweets
Filtering the Tweets
Generating the Local Counters
Generating Global Counters
Persisting the Tweets to the Big Data Store

Getting the Tweets

First, we need to get the tweets and store them in the Space (IMDG). In this example, we use Spring Social to provide a Java interface to the Twitter API and get the tweets, and the SpaceDocument API of XAP to store the tweets. Using a SpaceDocument allows for a more flexible data model, the SpaceDocument being like a Map. The partitioning used the default 'ID' attribute.

The following snippet shows the relevant TwitterHomeTimelineFeederTask sections.

public class TwitterHomeTimelineFeederTask implement Runnable {

    ...
    public SpaceDocument buildTweet(Tweet tweet) {
     return new SpaceDocument("Tweet", new DocumentProperties() 
         .setProperty("Id", tweet.getId()) 
         .setProperty("Text", tweet.getText()) 
         .setProperty("CreatedAt", tweet.getCreatedAt()) 
         .setProperty("FromUserId", tweet.getFromUserId()) 
         .setProperty("ToUserId", tweet.getToUserId()) 
         .setProperty("Processed", Boolean.FALSE));
    }

    /**
     * Return all the tweets from the Twitter API
     */
    private List<Tweet> getPublicTimeline() {
        return new TwitterTemplate() //
                .timelineOperations() //
                .getPublicTimeline();
    }

    ...
}

Parsing the Tweets

Now the real fun begins. We have the raw data but we need to tokenize and filter it, and then update the local counters - these are the taks performed by the Map phase of the Map / Reduce algorithm.

To generate this real-time flow, XAP uses the event driven architecture of the event container. Specifically, we use a Polling Container to listen for events relating to the writing of raw tweets to the Space. These events are configured using the SQLQuery returned by the unprocessedTweet method marked as @EventTemplate. Then, we tokenize & filter the tweet using the @SpaceDataEvent to mark the event handling method. The result is an object of type TokenizedTweet written to the Space.

The following snippet shows the relevant TweetParser sections.

@EventDriven
@Polling(gigaSpace = "gigaSpace", concurrentConsumers = 2, maxConcurrentConsumers = 2, receiveTimeout = 60)
@TransactionalEvent(timeout = 100)
public class TweetParser {
    ...

    /**
     * Event handler that receives a Tweet instance, processes its text and generates a listing of the tokens appearing in the text 
     * and their respective count of appearance in the text, instantiates an instance of {@link TokenizedTweet} with this data, 
     * and writes it to the space.
     * 
     * @param tweet
     * @return {@link TokenizedTweet} containing a mapping of {token->count}
     */
    @SpaceDataEvent
    public SpaceDocument eventListener(SpaceDocument tweet) {
        log.info("parsing tweet " + tweet);

        Long id = (Long) tweet.getProperty("Id");
        String text = tweet.getProperty("Text");
        if (text != null) {
            gigaSpace.write(new TokenizedTweet(id, tokenize(text)));
        }

        tweet.setProperty("Processed", true);
        return tweet;
    }

    protected Map<String, Integer> tokenize(String text) {
        Map<String, Integer> tokenMap = newHashMap();
        StringTokenizer st = new StringTokenizer(text, "\"{}[]:;|<>?`'.,/~!@#$%^&*()_-+= \t\n\r\f\\");

        while (st.hasMoreTokens()) {
            String token = st.nextToken();
            if (token.length() < MIN_TOKEN_LENGTH) {
                continue;
            }
            Integer count = tokenMap.containsKey(token) ? tokenMap.get(token) + 1 : 1;
            tokenMap.put(token, count);
        }
        return tokenMap;
    }
}

Filtering the Tweets

The TokenFilter event handler is triggered by the writing of maps of tokens (TokenizedTweet objects) to the Space (marked as Non-Filtered). It's implemented as a batch polling container with a batch size of 100 entries. This class is responsible for filtering out a default list of irrelevant words like prepositions, and can be extended by applying additional values lists stored in the Space as "black lists". The filter updates the TokenizedTweet objects, removing the irrelevant words and writes them to the Space.

The following snippet shows the relevant TokenFilter sections.

@EventDriven
@Polling(gigaSpace = "gigaSpace", concurrentConsumers = 2, maxConcurrentConsumers = 2, receiveTimeout = 5000)
@TransactionalEvent
public class TokenFilter {
    ...
  
    /**
     * Event handler that receives a {@link TokenizedTweet} and filters out non-informative tokens. Filtering is performed using
     * {@link #isTokenRequireFilter(String)}
     * 
     * @param tokenizedTweet
     * @return the input tokenizedTweet after modifications
     */
    @SpaceDataEvent
    public TokenizedTweet eventListener(TokenizedTweet tokenizedTweet) {
        log.info("filtering tweet " + tokenizedTweet.getId());
        Map<String, Integer> tokenMap = newHashMap(tokenizedTweet.getTokenMap());
        int numTokensBefore = tokenMap.size();
        Iterator<Entry<String, Integer>> it = tokenMap.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Integer> entry = it.next();
            if (isTokenRequireFilter(entry.getKey())) {
                it.remove();
            }
        }
        int numTokensAfter = tokenMap.size();
        tokenizedTweet.setTokenMap(tokenMap);
        tokenizedTweet.setFiltered(true);
        log.fine("filtered out " + (numTokensBefore - numTokensAfter) + " tokens from tweet " + tokenizedTweet.getId());
        return tokenizedTweet;
    }

    private boolean isTokenRequireFilter(final String token) {
        return filterTokensSet.contains(token);
    }

    private static final Set<String> filterTokensSet = newHashSet("aboard", "about", "above", "across", "after", "against", 
            "along", "amid", "among", "anti", "around", "as", "at", "before", "behind", "below", "beneath", "beside", "besides", 
            "between", "beyond", "but", "by", "concerning", "considering", "despite", "down", "during", "except", "excepting", 
            "excluding", "following", "for", "from", "in", "inside", "into", "like", "minus", "near", "of", "off", "on", "onto", 
            "opposite", "outside", "over", "past", "per", "plus", "regarding", "round", "save", "since", "than", "through", "to",
            "toward", "under", "underneath", "unlike", "until", "up", "upon", "versus", "via", "with", "within", "without");
}

Generating the Local Counters

This step completes the Map phase by taking the filtered maps of tokens, normalizing them, and counting the occurrences of relevant words per tweet. This is achieved using a PollingContainer named LocalTokenCounter that reads batches of filtered TokenizedTweet objects, and updates the counters which are TokenCounter objects in the Space. Note that TokenCounter objects are partitioned by the token for which they are aggregating the count.

The following snippet shows the relevant LocalTokenCounter sections.

@EventDriven
@Polling(gigaSpace = "gigaSpace", passArrayAsIs = true, concurrentConsumers = 1, maxConcurrentConsumers = 1, receiveTimeout = 1000)
@TransactionalEvent
public class LocalTokenCounter {
    ...
   
    /**
     * Event handler that takes a bulk of {@link TokenizedTweet}, counts appearances of tokens in the bulk, and generates a 
     * corresponding {@link TokenCounter} for each token.
     * 
     * @param tokenizedTweets
     *            array of {@link TokenizedTweet} matching the event template
     */
    @SpaceDataEvent
    public void eventListener(TokenizedTweet[] tokenizedTweets) {
        log.info("local counting of a bulk of " + tokenizedTweets.length + " tweets");
        Map<String, Integer> tokenMap = newHashMap();
        for (TokenizedTweet tokenizedTweet : tokenizedTweets) {
            log.fine("--processing " + tokenizedTweet);
            for (Entry<String, Integer> entry : tokenizedTweet.getTokenMap().entrySet()) {
                String token = entry.getKey();
                Integer count = entry.getValue();
                int newCount = tokenMap.containsKey(token) ? tokenMap.get(token) + count : count;
                log.finest("put token " + token + " with count " + newCount);
                tokenMap.put(token, newCount);
            }
        }

        log.info("writing " + tokenMap.size() + " TokenCounters across the cluster");
        for (Entry<String, Integer> entry : tokenMap.entrySet()) {
            String token = entry.getKey();
            Integer count = entry.getValue();
            log.fine("writing new TokenCounter: token=" + token + ", count=" + count);
            clusteredGigaSpace.write(new TokenCounter(token, count), LEASE_TTL, WRITE_TIMEOUT, UPDATE_OR_WRITE);
        }
    }
}

Generating Global Counters

Now, the Reduce phase aggregates the local counters into global integer counters. This is achieved using another polling PollingContainer named GlobalTokenCounter listening for filtered TokenCounter objects. The container reads a batch of TokenCounter objects and updates the global count for each word. The global counter is an entry in the Space where the key is the token and the value is the aggregated count.

The following snippet shows the relevant GlobalTokenCounter sections.

@EventDriven
@Polling(gigaSpace = "gigaSpace", concurrentConsumers = 2, maxConcurrentConsumers = 2, receiveTimeout = 1000)
@TransactionalEvent
public class GlobalTokenCounter {    
    ...
    
    @SpaceDataEvent
    public void eventListener(TokenCounter counter) {
        incrementLocalToken(counter.getToken(), counter.getCount());
    }

    @SuppressWarnings("unchecked")
    @Transactional(readOnly = false, propagation = REQUIRED, isolation = READ_COMMITTED)
    private void incrementLocalToken(String token, Integer count) {
        log.info("incrementing local token " + token + " by " + count);
        Integer globalCount = gigaMap.containsKey(token) ? (Integer) gigaMap.get(token) + count : count;
        gigaMap.put(token, globalCount);
        log.fine("+++ token=" + token + " count=" + globalCount);
    }

}

Persisting the Tweets to the Big Data Store

In this example, we use Apache Cassandra as a historical Big Data store enabling future slicing and dicing of the raw tweets data. Similarly, we could use any database to persist the data, for example, another NoSQL data store or even to Hadoop HDFS.

TweetPersister is a batch PollingContainer that uses the ExternalPersistence interface. TweetPersister writes batches of 100 parsed tweets to the NoSQL data store. In this case we use the CassandraExternalPersistence implementation that uses the Hector Cassandra client for java.

public class CassandraExternalPersistence implements ExternalPersistence {
    ...
    @PostConstruct
    public void init() throws Exception {
        log.info(format("initializing connection to Cassandra DB: host=%s port=%d keyspace=%s column-family=%s\n" //
                , host, port, keyspaceName, columnFamily));
        cluster = getOrCreateCluster(keyspaceName, host + ":" + port);
        keyspace = createKeyspace(keyspaceName, cluster);
    }

    @Override
    public void write(Object data) {
        if (!(data instanceof SpaceDocument)) {
            log.log(Level.WARNING, "Received non document event");
            return;
        }
        SpaceDocument document = (SpaceDocument) data;
        Long id = document.getProperty("Id");
        log.info("persisting data with id=" + id);
        Mutator<String> mutator = createMutator(keyspace, stringSerializer);
        for (String key : document.getProperties().keySet()) {
            Object value = document.getProperty(key);
            if (value != null) {
                mutator.addInsertion(String.valueOf(id), //
                        columnFamily, //
                        createColumn(key, value.toString(), stringSerializer, stringSerializer));
            }
        }
        mutator.execute();
    }
    ...
}
This documentation refers to product version 9.0

Labels

 
(None)