Configurable ETL processing using Apache Storm and Kite SDK Morphlines

From the first days I have worked as software engineer, I always hear the same request by many sides:

We want to have everything configurable, we want to change everything on runtime and we want to have a visual tool to apply all this logic in order to non-developer people use and configure our application.

I like this generic scope too, but as we all know software systems are not so adaptable and customer requests are not stable.

In previous years, we have built such configurable applications (not 100% configurable) using traditional frameworks/techniques (JMX, distributed cache, Spring or JEE and more).

In recent years, there is an additional concept that have to be included in our architecture, this is the concept of Big Data (or 3V or 4V or whatever words fit better). This new concept deprecates various solutions or workarounds that we were familiar and applied in old 3 tiers applications.

The funny thing is that many times I find myself in the same position as 10 years back. This is the rule on software development, it never ends and so personal excellence and new adventures never end too :-)

The main problem remains the same, how to build a configurable ETL distributed application.

For this reason, I have built a mini adaptable solution that might be helpful in many use cases.
I have used 3 common tools in big data world: Java, Apache Storm and Kite SDK Morplines.
Java as the main programming language, Apache Storm as the distributed streaming processing engine and Kite SDK Morphlines as the configurable ETL engine.

Kite SDK Morplines

Copied from its description: Morphlines is an open source framework that reduces the time and efforts necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.

Additional to builtin commands, you can easily implement your own Command and use it in your morphline configuration file.

Sample Morphline configuration that read a JSON string, parse it and then just log a particular JSON element:

morphlines : [{
	id : json_terminal_log
	importCommands : ["org.kitesdk.**"]
	
	commands : [
			# read the JSON blob
			{ readJson: {} }

			# extract JSON objects into head fields
			{ extractJsonPaths {
			  flatten: true
			  paths: {
				name: /name
				age: /age
			  }
			} }

			# log data
			{ logInfo {
				format : "name: {}, record: {}"
  				args : ["@{name}", "@{}"]
			}}
	]
}]

Storm Morphlines Bolt

In order to use Morphlines inside Storm, I have implemented a custom MorphlinesBolt. The main responsibilities of this Bolt are:

  • Initialize Morphlines handler via a configuration file
  • Initialize mapping instructions:
    a) from Tuple to Morphline input and
    b) from Morphline output to new output Tuple
  • Process each incoming event using the already initialized Morplines context
  • If Bolt is not Terminal, then using the provided Mapper (type “b”), emit a new Tuple using the output of Morphline execution

Simple Configurable ETL topologies

In order to test custom MorphlinesBolt, I have written 2 simple tests. In these tests you can see how MorphlinesBolt is initialized and then the result of each execution. As input, I have used a custom Spout (RandomJsonTestSpout) that just emit new JSON strings every 100ms (configurable).

DummyJsonTerminalLogTopology

A simple topology that configure Morphline context via a configuration file and the execute Morphline handler for each incoming Tuple. On this topology, MorphlinesBolt is configured as terminal bolt, which means that for each input Tuple does not emit new Tuple.

public class DummyJsonTerminalLogTopology {
    public static void main(String[] args) throws Exception {
        Config config = new Config();

        RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);

        String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();
        tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);

        MorphlinesBolt morphBolt = new MorphlinesBolt()
                .withTupleMapper(tuppleMapper)
                .withMorphlineId("json_terminal_log")
                .withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("WORD_SPOUT", spout, 1);
        builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("MyDummyJsonTerminalLogTopology");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 1) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");
        }
    }
}

 

DummyJson2StringTopology

A simple topology that configure Morphline context via a configuration file and the execute Morphline handler for each incoming Tuple. On this topology, MorphlinesBolt is configured as normal bolt, which means that for each input Tuple it emits a new Tuple.

public class DummyJson2StringTopology {

    public static void main(String[] args) throws Exception {
        Config config = new Config();

        RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);

        String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();
        tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);

        MorphlinesBolt morphBolt = new MorphlinesBolt()
                .withTupleMapper(tuppleMapper)
                .withMorphlineId("json2string")
                .withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")
                //.withOutputProcessors(Arrays.asList(resultRecordHandlers));
                .withOutputFields(CmnStormCons.TUPLE_FIELD_MSG)
                .withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));

        LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("WORD_SPOUT", spout, 1);
        builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");
        builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("MyDummyJson2StringTopology");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 1) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            System.out.println("Usage: DummyJson2StringTopology <topology_name>");
        }
    }
}

 

Final thoughts

MorphlinesBolt can be used as part of any configurable ETL “solution” (as single processing Bolt, as Terminal Bolt, as part of complex pipeline, etc.).

morphlines_storm_topology_examples

Source code is provided as a maven module (sv-etl-storm-morphlines) within my collection of sample projects in github.

A great combination would be to use MorphlinesBolt with Flux. This might give you a fully configurable ETL topology!!!
I have not added as option yet, in order to keep it with less dependencies (I may added with scope “test”).

This module is not final and I will try to improve it, so you many find various bugs in this first implementation.

For any additional thoughts or clarifications, then please write a comment :)

This is my first post in 2016! I hope you good health and with better thoughts and actions. The first virtues/values of everything is the human and the respect to the environment we all live (society, earth, animals, plants, etc.). All the others are secondary priorities and should not ruin what is implied by first priorities. Keep your most important virtues always in your mind and consider them in any action or thought you do.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in Big Data, Software Development | Tagged , , , , | Leave a comment

New github repository as collection of my sample projects

I have just created a new github repository to act as a collection for my sample projects (sourcevirtues-samples).

This repository is consisted of various sample projects and code examples, mostly related to big data. Further details for each new sample project (module) will be explained in this blog.

Most of these projects are random thoughts and proof-of-concepts that I am interested and play with them.

Please feel free to collaborate, share, ask for help or report issues.

All module names will start with “sv-“ prefix, which is just a helper prefix to avoid any naming collisions in my/your workspace with other projects.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in Software Development | Tagged | Leave a comment

Real time sentiment analysis example with Apache Storm

Real Time Sentiment Analysis refers to processing streams of natural language text (or voice) in order to extract subjective information. The trivial use case is for building a recommendation engine or for finding social media trends.

I have selected Apache Storm as real time processing engine. Storm is very robust (we are using it on production) and very easy to implement custom logic on top of it.

I have written a very simple project (source code) that performs sentiment analysis on real time (using random sentences as input data). The scope is to get random sentences as input and then perform some sentiment analysis. Finally decide if current sentence has a positive or negative score and persist results.

sentiment analysis storm pipeline

Implementation logic is the following:

  1. (Dummy Spout) Feed pipeline with random sentences.
  2. (Stemming Bolt) Stem any word that is useless for scoring. Create a new sentence that does not contain useless words (e.g articles) and pass it to next component.
  3. (Positive Scoring Bolt) Get stemmed (modified) sentence and provides a positive score.
  4. (Negative Scoring Bolt) Get stemmed sentence and provides a negative score.
  5. (Final Scoring Bolt) Compare positive and negative score and decide if this sentence is positive or negative.
  6. (Persistence Bolt) Persist processed data:
    original sentence, modified sentence and final, positive and negative scores
    As persistent store, I have selected Apache HBase (just for reference), where it stores events in batch mode. Batch persistence is triggered every 1 second (configurable), using an internal triggering mechanism of Storm (Tick Tuple). Additional to HBase, we can easily use Redis, Cassandra, MongoDB or Elasticsearch (all these are valid for this use case).

Each of the Stemming and Scoring Bolts are using a dummy in memory database that contains all relative words that can use in order to score/stem each sentence.

In order to run this example in single node or cluster, you can use Storm project Flux. The whole topology pipeline is defined using a single configuration file (topology.yaml).

Example run:

Local execution:
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --local src/test/resources/flux/topology.yaml -s 10000

Cluster execution:
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote src/test/resources/flux/topology.yaml --c nimbus.host=localhost

Alternatevily, there is a simple JUnit test (SentimentAnalysisTopologyTest) that executes the same topology locally.

You can check README for details. As for prerequisites, you can check my last post in order to install a single local HBase instance and a local Storm cluster with 2 Workers.

This is a very simplistic approach of how to use Apache Storm to perform sentiment analysis. I hope to have more free time to write a new post with a realistic solution for sentiment analysis.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in Big Data, Java, Software Development | Tagged , , | Leave a comment

Local installation of standalone HBase and Apache Storm simple cluster

We mainly use Apache Storm for streaming processing and Apache HBase as NoSQL wide-column database.

Even if Apache Cassandra is a great NoSQL database, we mostly prefer HBase because of Cloudera distribution and as it is more consistent (check CAP theorem) than Cassandra.

HBase is based on HDFS, but it can be easy installed as standalone for testing purposes. You just need to download latest version, extract compressed file, start standalone node and then start an HBase shell and play.

$> tar zxvf hbase-1.1.2-bin.tar.gz
$> cd hbase-1.1.2/bin/
$> ./start-hbase.sh
$> ./hbase shell
hbase(main):001:0&gt; create 'DummyTable', 'cf'
hbase(main):001:0&gt; scan 'DummyTable'

When you start HBase in standalone mode, then it automatically starts a local Zookeeper node too (running in default port 2181).

$> netstat -anp|grep 2181

Zookeeper is used by HBase and Storm as a distributed coordinator mechanism. Now, as you have already running a local Zookeeper node, then you are ready to configure and run a local Storm cluster.

  • Download latest Storm
  • Extract
  • Configure “STORM_HOME/conf/storm.yaml” (check below)
  • Start local cluster:
    • $> cd STORM_HOME/bin
    • $> ./storm nimbus
    • $> ./storm supervisor
    • $> ./storm ui
  • Logs are located at “STORM_HOME/logs/” directory
  • Check local Storm UI at: localhost:8080

Contents of new “storm.yaml” configuration file:

storm.zookeeper.servers:
- "localhost"

nimbus.host: "localhost"

supervisor.slots.ports:
- 6701
- 6702

You can also set parameter “worker.childopts” to set JVM options for each Worker (processing nodes). Here is a simple example for my local JVMs, where I set min/max heap size, garbage collection strategy, enable JXM and GC logs.

worker.childopts: "-server -Xms512m -Xmx2560m -XX:PermSize=128m -XX:MaxPermSize=512m -XX:+UseParallelOldGC -XX:ParallelGCThreads=3 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/tmp/gc-storm-worker-%ID%.log -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1%ID% -XX:+PrintFlagsFinal -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true"

Parameter “worker.childopts” is loaded by all the Worker JVM nodes. Variable “%ID%” corresponds to port (6701 or 6702) assigned to each Worker. As you can see, I have used it to enable different JMX port for each worker and different GC log file.

We are using Storm using JDK 7, but JDK 8 seems to be compatible too. Latest Storm has switched from Logback to Log4j2 (check full release notes here and here).

Using the above instructions, you will be able to run HBase and Storm mini cluster in your laptop without any problem.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in Administration, Big Data | Tagged , , , | Leave a comment

Install Skype on Debian Jessie amd64

I have just installed Debian Jessie (testing) amd64 in my new laptop (it is really stable and the only problem I have is with dual screen).

One of the rare closed source software I use is Skype (only for my day job).
I downloaded skype for “Ubuntu 12.0.4 (multiarch)” and I tried to install in my Jessie.
Skype is built only for i386 architectures, so I need to add (as root) i386 architecture in my system.

dpkg --add-architecture i386

View dependencies of Skype package:

dpkg -I skype-ubuntu-precise_4.3.0.37-1_i386.deb
 Architecture: i386
 Depends: libc6 (>= 2.3.6-6~), libc6 (>= 2.7), libgcc1 (>= 1:4.1.1), libqt4-dbus (>= 4:4.5.3), libqt4-network (>= 4:4.8.0), libqt4-xml (>= 4:4.5.3), libqtcore4 (>= 4:4.7.0~beta1), libqtgui4 (>= 4:4.8.0), libqtwebkit4 (>= 2.2~2011week36), libstdc++6 (>= 4.2.1), libx11-6, libxext6, libxss1, libxv1, libssl1.0.0, libpulse0, libasound2-plugins

Try to install dependencies, but I got an error:

apt-get -s install libc6:i386 libggc1:i386  libqt4-dbus:i386 libqt4-network:i386 libqt4-xml:i386 libqtcore4:i386 libqtgui4:i386 libqtwebkit4:i386 libstdc++6:i386 libx11-6:i386 libxext6:i386 libxss1:i386 libxv1:i386 libssl1.0.0:i386 libpulse0:i386 libasound2-plugins:i386

E: Unable to locate package libggc1

Exclude libggc1 and try again, but got another error for libaudio2 :

apt-get -s install libc6:i386 libqt4-dbus:i386 libqt4-network:i386 libqt4-xml:i386 libqtcore4:i386 libqtgui4:i386 libqtwebkit4:i386 libstdc++6:i386 libx11-6:i386 libxext6:i386 libxss1:i386 libxv1:i386 libssl1.0.0:i386 libpulse0:i386 libasound2-plugins:i386

Unpacking libaudio2:i386 (1.9.4-1+b1) ...
dpkg: error processing archive /var/cache/apt/archives/libaudio2_1.9.4-1+b1_i386.deb (--unpack):
 trying to overwrite shared '/usr/share/doc/libaudio2/changelog.Debian.gz', which is different from other instances of package libaudio2:i386

Errors were encountered while processing:
 /var/cache/apt/archives/libaudio2_1.9.4-1+b1_i386.deb
E: Sub-process /usr/bin/dpkg returned an error code (1)

Library libaudio2 is an indirect dependency from libqtgui4 package. The problem was the packaging of libaudio2. I had already installed libaudio2:amd64. Then I tried to install it for i386 architecture too (as Skype needs it). I got this error because i386 package tried to modify an existing file (‘/usr/share/doc/libaudio2/changelog.Debian.gz’) that belongs to another package (libaudio2:amd64). I overcome this problem by renaming problematic file and let package libaudio2:i386 to finish its job.

mv /usr/share/doc/libaudio2/changelog.Debian.gz /usr/share/doc/libaudio2/changelog.Debian.amd64.gz

Then try to install Skype dependencies again:

apt-get install libc6:i386 libqt4-dbus:i386 libqt4-network:i386 libqt4-xml:i386 libqtcore4:i386 libqtgui4:i386 libqtwebkit4:i386 libstdc++6:i386 libx11-6:i386 libxext6:i386 libxss1:i386 libxv1:i386 libssl1.0.0:i386 libpulse0:i386

After successfully installed dependencies, I installed Skype package:

dpkg -i skype-ubuntu-precise_4.3.0.37-1_i386.deb 

Skype installation completes. Now, my only problem with Skype is that it does not works well with system proxy. As a workaround I start it from command line.

Please avoid Skype as it is a proprietary Voice-over-IP program that uses a proprietary protocol. Using proprietary phone software means that we can’t be sure who is listening in, because we can’t see the code (check corresponding FSF project).
Out of my day job I use Jitsi (and less frequently Ekiga). I guess WebRTC is the future and we must work on that direction.

Finally, Debian Jessie (testing) amd64 looks really stable.

My only problem is when using dual screen (laptop Dell Latitude E5540). When both monitors are active (not mirrored) the refresh rate of the external monitor is 60Hz, but when only external monitor is active, then refresh rate of the external monitor is 75Hz!!! Any ideas???

Regards,
Adrianos Dadis

Posted in Administration, GNU/Linux | Tagged , , | 2 Comments

Build home mini PC with Cubietruck and Linux

Four months ago, I decided to build a home mini PC. Its main purpose is to act as NAS server, but it should also support additional light services too.

Home mini PC requirements:

  • ARM board (low power consumption and very good performance)
  • SATA 3.5″ hard disk(much more stable than 2.5″)
  • GNU/Linux (as always)

Home mini PC services:

  1. NAS server (1st level requirement)
  2. Light server-side music player (2nd level requirement)
  3. Light BitTorrent client (2nd level requirement)
  4. No need for X window (not a requirement)
  5. Apache HTTP server (3rd level requirement)
  6. Light wiki server (3rd level requirement)
  7. Light database (3rd level requirement)

Candidates:

  • Raspberry Pi: Unbelievable community support and hundreds of applied examples, but has limited RAM and mainly does not support SATA.
  • BeagleBone Black: Stable and much more Open Hardware than Raspberry Pi, but much less features (CPU, RAM, …)
  • Cubietruck (aka Cubieboard 3): Much more features than all the above, but with much smaller community. Specifications summary here.

As you understand, my requirements are not very light for a small board :)
So, I decided to use Cubietruck (CT), which is the strongest board, but getting the risk of small community (as I am not a hardware or Linux expert).

I received my Cubietruck a few weeks ago!!!

cubietruck_details

Final hardware:

  • Cubietruck
  • Cubieboard HDD Addon (to support SATA 3.5″, which needs external power input 12V)
  • Power Adaptor 5V 3A (for CT)
  • Power Adaptor 12V 4A (for HDD addon)
  • Western Digital Red 3TB
  • SD Card 16 GB

ATTENION: You have to use different power adapters to power Cubietruck and HDD Addon. Output USB connection of HDD Addon (which work as power adapters) cannot supply enough Amperes to your Cubietruck, when are needed. Extra details here.

Final Software:

Final view…

cubietruck hdd sata details

Cubietruck with HDD SATA and all cables

Details…

cubietruck hdd addon base

Cubietruck and HDD Addon base

In next blog post, I will try to write about Debian and any extra hints I found.
If you need extra details please leave a comment to answer (or to include this info in next post).

UPDATE (20/12/2014)
I just did a fast read/write test on SATA disk of CT via NFS (requested via comment).
The network topology is as follows:
laptop_wireless --- simple_router --- CT_wired

Copy file (256MB) from laptop to CT:
$> time dd if=/dev/zero of=/mynfs/store/test_dd_128k_2048 bs=128k count=2048
2048+0 records in
2048+0 records out
268435456 bytes (268 MB) copied, 128.897 s, 2.1 MB/s
real 2m10.458s
user 0m0.000s
sys 0m0.188s

Copy this file (256MB) back to laptop:
$> time cp /mynfs/store/test_dd_128k_2048 /tmp/fff
real 1m45.158s
user 0m0.008s
sys 0m0.384s

There are two important factors that affect performance of the above test:
a) If laptop was connected via wire instead of wireless the speed will be much faster.
b) CT was running other things concurrently and was not idle.

Regards,
Adrianos Dadis.

Real Democracy requires Free Software

Posted in GNU/Linux, Personal | Tagged , , , | 4 Comments

Storm event processor – GC log file per worker

In the last three months, I am working with a new team building a product for Big Data analytics on Telecom domain.

Storm event processor is one of the main frameworks we use and it is really great. You can read more details on its official documentation (which has been improved).

Storm uses Workers to do your job, where each of them is a single JVM and is administrated internally by Storm (start, restart if no responsive, move Worker to another node of cluster, etc.). For a single job you can run many Workers on your cluster (Storm decides how to distribute your Workers in cluster nodes). As “node” I mean a running OS, either running on VM or on a physical machine.

The tricky point here is that all Workers in a node read the same configuration file (STORM_HOME/conf/storm.yaml) even they are running/processing a different kind of job. Additionally, there is a single parameter (worker.childopts) in this file, which is used for all Workers (of the same node) to initialize theirs JVMs (how to set JVM Options).

As we want to know how GC performs in each worker we need to monitor GC log of each Worker/JVM.

As I said, the problem is that as all Workers, in a node, read the same parameter from the same configuration file in order to initialize theirs JVMs, so it is not trivial to use a different GC logging file for each Worker/JVM.

Fortunately, Storm developers have expose a “variable” that solves this problem. This variable is named “ID” and it is unique for each Worker on each node (same Worker ID could exist in different nodes).

For Workers JVM Options, we use this entry in our “storm.yaml” file:

worker.childopts: "-Xmx1024m -XX:MaxPermSize=256m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/opt/storm/logs/gc-storm-worker-%ID%.log"

Be aware, that you have to add “%” before and after “ID” string (in order to be identified as an internal Storm variable).

Additionally, for Supervisor JVM Options (one process on each node), we use this entry in our “storm.yaml” file:

supervisor.childopts: "-Xmx512m -XX:MaxPermSize=256m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/opt/storm/logs/gc-storm-supervisor.log"

I have also included a kind of memory settings (“-Xmx” and “-XX:MaxPermSize”) too, but it is just an example.

Please keep in mind that Storm requires Oracle Hotspot JDK 6 (JDK 7/8 is not yet supported). This is a strong drawback, but we hope it will be fixed soon.

Hope it helps,
Adrianos Dadis.

Democracy Requires Free Software

Posted in Big Data, Java, Software Development | Tagged , | 1 Comment