Real time sentiment analysis example with Apache Storm

Real Time Sentiment Analysis refers to processing streams of natural language text (or voice) in order to extract subjective information. The trivial use case is for building a recommendation engine or for finding social media trends.

I have selected Apache Storm as real time processing engine. Storm is very robust (we are using it on production) and very easy to implement custom logic on top of it.

I have written a very simple project (source code) that performs sentiment analysis on real time (using random sentences as input data). The scope is to get random sentences as input and then perform some sentiment analysis. Finally decide if current sentence has a positive or negative score and persist results.

sentiment analysis storm pipeline

Implementation logic is the following:

  1. (Dummy Spout) Feed pipeline with random sentences.
  2. (Stemming Bolt) Stem any word that is useless for scoring. Create a new sentence that does not contain useless words (e.g articles) and pass it to next component.
  3. (Positive Scoring Bolt) Get stemmed (modified) sentence and provides a positive score.
  4. (Negative Scoring Bolt) Get stemmed sentence and provides a negative score.
  5. (Final Scoring Bolt) Compare positive and negative score and decide if this sentence is positive or negative.
  6. (Persistence Bolt) Persist processed data:
    original sentence, modified sentence and final, positive and negative scores
    As persistent store, I have selected Apache HBase (just for reference), where it stores events in batch mode. Batch persistence is triggered every 1 second (configurable), using an internal triggering mechanism of Storm (Tick Tuple). Additional to HBase, we can easily use Redis, Cassandra, MongoDB or Elasticsearch (all these are valid for this use case).

Each of the Stemming and Scoring Bolts are using a dummy in memory database that contains all relative words that can use in order to score/stem each sentence.

In order to run this example in single node or cluster, you can use Storm project Flux. The whole topology pipeline is defined using a single configuration file (topology.yaml).

Example run:

Local execution:
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --local src/test/resources/flux/topology.yaml -s 10000

Cluster execution:
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote src/test/resources/flux/topology.yaml --c nimbus.host=localhost

Alternatevily, there is a simple JUnit test (SentimentAnalysisTopologyTest) that executes the same topology locally.

You can check README for details. As for prerequisites, you can check my last post in order to install a single local HBase instance and a local Storm cluster with 2 Workers.

This is a very simplistic approach of how to use Apache Storm to perform sentiment analysis. I hope to have more free time to write a new post with a realistic solution for sentiment analysis.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in Big Data, Java, Software Development | Tagged , , | Leave a comment

Local installation of standalone HBase and Apache Storm simple cluster

We mainly use Apache Storm for streaming processing and Apache HBase as NoSQL wide-column database.

Even if Apache Cassandra is a great NoSQL database, we mostly prefer HBase because of Cloudera distribution and as it is more consistent (check CAP theorem) than Cassandra.

HBase is based on HDFS, but it can be easy installed as standalone for testing purposes. You just need to download latest version, extract compressed file, start standalone node and then start an HBase shell and play.

$> tar zxvf hbase-1.1.2-bin.tar.gz
$> cd hbase-1.1.2/bin/
$> ./start-hbase.sh
$> ./hbase shell
hbase(main):001:0> create 'DummyTable', 'cf'
hbase(main):001:0> scan 'DummyTable'

When you start HBase in standalone mode, then it automatically starts a local Zookeeper node too (running in default port 2181).

$> netstat -anp|grep 2181

Zookeeper is used by HBase and Storm as a distributed coordinator mechanism. Now, as you have already running a local Zookeeper node, then you are ready to configure and run a local Storm cluster.

  • Download latest Storm
  • Extract
  • Configure “STORM_HOME/conf/storm.yaml” (check below)
  • Start local cluster:
    • $> cd STORM_HOME/bin
    • $> ./storm nimbus
    • $> ./storm supervisor
    • $> ./storm ui
  • Logs are located at “STORM_HOME/logs/” directory
  • Check local Storm UI at: localhost:8080

Contents of new “storm.yaml” configuration file:

storm.zookeeper.servers:
- "localhost"

nimbus.host: "localhost"

supervisor.slots.ports:
- 6701
- 6702

You can also set parameter “worker.childopts” to set JVM options for each Worker (processing nodes). Here is a simple example for my local JVMs, where I set min/max heap size, garbage collection strategy, enable JXM and GC logs.

worker.childopts: "-server -Xms512m -Xmx2560m -XX:PermSize=128m -XX:MaxPermSize=512m -XX:+UseParallelOldGC -XX:ParallelGCThreads=3 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/tmp/gc-storm-worker-%ID%.log -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1%ID% -XX:+PrintFlagsFinal -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true"

Parameter “worker.childopts” is loaded by all the Worker JVM nodes. Variable “%ID%” corresponds to port (6701 or 6702) assigned to each Worker. As you can see, I have used it to enable different JMX port for each worker and different GC log file.

We are using Storm using JDK 7, but JDK 8 seems to be compatible too. Latest Storm has switched from Logback to Log4j2 (check full release notes here and here).

Using the above instructions, you will be able to run HBase and Storm mini cluster in your laptop without any problem.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in Administration, Big Data | Tagged , , , | 1 Comment

Install Skype on Debian Jessie amd64

I have just installed Debian Jessie (testing) amd64 in my new laptop (it is really stable and the only problem I have is with dual screen).

One of the rare closed source software I use is Skype (only for my day job).
I downloaded skype for “Ubuntu 12.0.4 (multiarch)” and I tried to install in my Jessie.
Skype is built only for i386 architectures, so I need to add (as root) i386 architecture in my system.

dpkg --add-architecture i386

View dependencies of Skype package:

dpkg -I skype-ubuntu-precise_4.3.0.37-1_i386.deb
 Architecture: i386
 Depends: libc6 (>= 2.3.6-6~), libc6 (>= 2.7), libgcc1 (>= 1:4.1.1), libqt4-dbus (>= 4:4.5.3), libqt4-network (>= 4:4.8.0), libqt4-xml (>= 4:4.5.3), libqtcore4 (>= 4:4.7.0~beta1), libqtgui4 (>= 4:4.8.0), libqtwebkit4 (>= 2.2~2011week36), libstdc++6 (>= 4.2.1), libx11-6, libxext6, libxss1, libxv1, libssl1.0.0, libpulse0, libasound2-plugins

Try to install dependencies, but I got an error:

apt-get -s install libc6:i386 libggc1:i386  libqt4-dbus:i386 libqt4-network:i386 libqt4-xml:i386 libqtcore4:i386 libqtgui4:i386 libqtwebkit4:i386 libstdc++6:i386 libx11-6:i386 libxext6:i386 libxss1:i386 libxv1:i386 libssl1.0.0:i386 libpulse0:i386 libasound2-plugins:i386

E: Unable to locate package libggc1

Exclude libggc1 and try again, but got another error for libaudio2 :

apt-get -s install libc6:i386 libqt4-dbus:i386 libqt4-network:i386 libqt4-xml:i386 libqtcore4:i386 libqtgui4:i386 libqtwebkit4:i386 libstdc++6:i386 libx11-6:i386 libxext6:i386 libxss1:i386 libxv1:i386 libssl1.0.0:i386 libpulse0:i386 libasound2-plugins:i386

Unpacking libaudio2:i386 (1.9.4-1+b1) ...
dpkg: error processing archive /var/cache/apt/archives/libaudio2_1.9.4-1+b1_i386.deb (--unpack):
 trying to overwrite shared '/usr/share/doc/libaudio2/changelog.Debian.gz', which is different from other instances of package libaudio2:i386

Errors were encountered while processing:
 /var/cache/apt/archives/libaudio2_1.9.4-1+b1_i386.deb
E: Sub-process /usr/bin/dpkg returned an error code (1)

Library libaudio2 is an indirect dependency from libqtgui4 package. The problem was the packaging of libaudio2. I had already installed libaudio2:amd64. Then I tried to install it for i386 architecture too (as Skype needs it). I got this error because i386 package tried to modify an existing file (‘/usr/share/doc/libaudio2/changelog.Debian.gz’) that belongs to another package (libaudio2:amd64). I overcome this problem by renaming problematic file and let package libaudio2:i386 to finish its job.

mv /usr/share/doc/libaudio2/changelog.Debian.gz /usr/share/doc/libaudio2/changelog.Debian.amd64.gz

Then try to install Skype dependencies again:

apt-get install libc6:i386 libqt4-dbus:i386 libqt4-network:i386 libqt4-xml:i386 libqtcore4:i386 libqtgui4:i386 libqtwebkit4:i386 libstdc++6:i386 libx11-6:i386 libxext6:i386 libxss1:i386 libxv1:i386 libssl1.0.0:i386 libpulse0:i386

After successfully installed dependencies, I installed Skype package:

dpkg -i skype-ubuntu-precise_4.3.0.37-1_i386.deb 

Skype installation completes. Now, my only problem with Skype is that it does not works well with system proxy. As a workaround I start it from command line.

Please avoid Skype as it is a proprietary Voice-over-IP program that uses a proprietary protocol. Using proprietary phone software means that we can’t be sure who is listening in, because we can’t see the code (check corresponding FSF project).
Out of my day job I use Jitsi (and less frequently Ekiga). I guess WebRTC is the future and we must work on that direction.

Finally, Debian Jessie (testing) amd64 looks really stable.

My only problem is when using dual screen (laptop Dell Latitude E5540). When both monitors are active (not mirrored) the refresh rate of the external monitor is 60Hz, but when only external monitor is active, then refresh rate of the external monitor is 75Hz!!! Any ideas???

Regards,
Adrianos Dadis

Posted in Administration, GNU/Linux | Tagged , , | 2 Comments

Build home mini PC with Cubietruck and Linux

Four months ago, I decided to build a home mini PC. Its main purpose is to act as NAS server, but it should also support additional light services too.

Home mini PC requirements:

  • ARM board (low power consumption and very good performance)
  • SATA 3.5″ hard disk(much more stable than 2.5″)
  • GNU/Linux (as always)

Home mini PC services:

  1. NAS server (1st level requirement)
  2. Light server-side music player (2nd level requirement)
  3. Light BitTorrent client (2nd level requirement)
  4. No need for X window (not a requirement)
  5. Apache HTTP server (3rd level requirement)
  6. Light wiki server (3rd level requirement)
  7. Light database (3rd level requirement)

Candidates:

  • Raspberry Pi: Unbelievable community support and hundreds of applied examples, but has limited RAM and mainly does not support SATA.
  • BeagleBone Black: Stable and much more Open Hardware than Raspberry Pi, but much less features (CPU, RAM, …)
  • Cubietruck (aka Cubieboard 3): Much more features than all the above, but with much smaller community. Specifications summary here.

As you understand, my requirements are not very light for a small board 🙂
So, I decided to use Cubietruck (CT), which is the strongest board, but getting the risk of small community (as I am not a hardware or Linux expert).

I received my Cubietruck a few weeks ago!!!

cubietruck_details

Final hardware:

  • Cubietruck
  • Cubieboard HDD Addon (to support SATA 3.5″, which needs external power input 12V)
  • Power Adaptor 5V 3A (for CT)
  • Power Adaptor 12V 4A (for HDD addon)
  • Western Digital Red 3TB
  • SD Card 16 GB

ATTENION: You have to use different power adapters to power Cubietruck and HDD Addon. Output USB connection of HDD Addon (which work as power adapters) cannot supply enough Amperes to your Cubietruck, when are needed. Extra details here.

Final Software:

Final view…

cubietruck hdd sata details

Cubietruck with HDD SATA and all cables

Details…

cubietruck hdd addon base

Cubietruck and HDD Addon base

In next blog post, I will try to write about Debian and any extra hints I found.
If you need extra details please leave a comment to answer (or to include this info in next post).

UPDATE (20/12/2014)
I just did a fast read/write test on SATA disk of CT via NFS (requested via comment).
The network topology is as follows:
laptop_wireless --- simple_router --- CT_wired

Copy file (256MB) from laptop to CT:
$> time dd if=/dev/zero of=/mynfs/store/test_dd_128k_2048 bs=128k count=2048
2048+0 records in
2048+0 records out
268435456 bytes (268 MB) copied, 128.897 s, 2.1 MB/s
real 2m10.458s
user 0m0.000s
sys 0m0.188s

Copy this file (256MB) back to laptop:
$> time cp /mynfs/store/test_dd_128k_2048 /tmp/fff
real 1m45.158s
user 0m0.008s
sys 0m0.384s

There are two important factors that affect performance of the above test:
a) If laptop was connected via wire instead of wireless the speed will be much faster.
b) CT was running other things concurrently and was not idle.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in GNU/Linux, Personal | Tagged , , , | 4 Comments

Storm event processor – GC log file per worker

In the last three months, I am working with a new team building a product for Big Data analytics on Telecom domain.

Storm event processor is one of the main frameworks we use and it is really great. You can read more details on its official documentation (which has been improved).

Storm uses Workers to do your job, where each of them is a single JVM and is administrated internally by Storm (start, restart if no responsive, move Worker to another node of cluster, etc.). For a single job you can run many Workers on your cluster (Storm decides how to distribute your Workers in cluster nodes). As “node” I mean a running OS, either running on VM or on a physical machine.

The tricky point here is that all Workers in a node read the same configuration file (STORM_HOME/conf/storm.yaml) even they are running/processing a different kind of job. Additionally, there is a single parameter (worker.childopts) in this file, which is used for all Workers (of the same node) to initialize theirs JVMs (how to set JVM Options).

As we want to know how GC performs in each worker we need to monitor GC log of each Worker/JVM.

As I said, the problem is that as all Workers, in a node, read the same parameter from the same configuration file in order to initialize theirs JVMs, so it is not trivial to use a different GC logging file for each Worker/JVM.

Fortunately, Storm developers have expose a “variable” that solves this problem. This variable is named “ID” and it is unique for each Worker on each node (same Worker ID could exist in different nodes).

For Workers JVM Options, we use this entry in our “storm.yaml” file:

worker.childopts: "-Xmx1024m -XX:MaxPermSize=256m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/opt/storm/logs/gc-storm-worker-%ID%.log"

Be aware, that you have to add “%” before and after “ID” string (in order to be identified as an internal Storm variable).

Additionally, for Supervisor JVM Options (one process on each node), we use this entry in our “storm.yaml” file:

supervisor.childopts: "-Xmx512m -XX:MaxPermSize=256m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/opt/storm/logs/gc-storm-supervisor.log"

I have also included a kind of memory settings (“-Xmx” and “-XX:MaxPermSize”) too, but it is just an example.

Please keep in mind that Storm requires Oracle Hotspot JDK 6 (JDK 7/8 is not yet supported). This is a strong drawback, but we hope it will be fixed soon.

Hope it helps,
Adrianos Dadis.

Democracy Requires Free Software

Posted in Big Data, Java, Software Development | Tagged , | 1 Comment