Apache Storm: How to configure KafkaBolt with Flux

Flux in a mini framework that can help us define and deploy a Storm topology.

Flux has various wrappers that help you define the required stream(s) and initialize your Bolts and Spouts (using constructor with or without arguments and call custom configuration methods automatically via reflection).

What you only need to use Flux is to add it as dependency in your “pom.xml”, configure it via a single YAML file (check flux examples) and then use it as main class to deploy your topology in a Storm cluster (or as local test).

In order to initialize a KafkaBolt the following steps are needed:

  1. Define a “topicSelector” via “withTopicSelector” method
  2. Define a “kafkaMapper” via “withTupleToKafkaMapper” method
  3. Define a “kafkaProducerProps” via “withProducerProperties” method
  4. Initialize “org.apache.storm.kafka.bolt.KafkaBolt” with above configuration
  5. Include above KafkaBolt as part of a stream

Minimal Flux configuration example for KafkaBolt:

components:
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "localhost:2181"

  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "myTopicName"

  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"

  - id: "kafkaProducerProps"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "localhost:9092"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer" 

bolts:    
  - id: "bolt-kafka"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 1
    configMethods:
      - name: "withProducerProperties"
        args: [ref: "kafkaProducerProps"]
      - name: "withTopicSelector"
        args: [ref: "topicSelector"]
      - name: "withTupleToKafkaMapper"
        args: [ref: "kafkaMapper"]

streams:
  - name: "spout --> kafkaBolt"
    from: "spout-1"
    to: "bolt-kafka"
    grouping:
      type: LOCAL_OR_SHUFFLE

For a full working configuration example check this, which can be used like this.

Example command to deploy your topology on Storm:

storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

Flux configuration for KafkaSpout is already described as official flux example.

Flux is a really helpful framework that eliminates required custom code to define and initialize a topology 🙂

Regards,
Adrianos Dadis
—
Real Democracy requires Free Software

Advertisement
Posted in Big Data, Java, Software Development | Tagged , | Leave a comment

FOSSCOMM 2016 – Big Data with Apache Storm – Presentation and Demo code

FOSSCOMM (2016) conference is over!!!

Presenting Big Data and Apache Storm on FOSSCOMM was a really amazing experience 🙂

It was a great conference with really interesting presentations and people.
A great thanks to FOSSCOMM team and all volunteers.

Demo code is available on github: storm-demo-fosscomm2016
Our presentation is available on Slideshare.


Regards,
Adrianos Dadis.
—
Real Democracy requires Free Software

Posted in Big Data, Java | Tagged , , , | Leave a comment

Speaking about Big Data and Apache Storm at FOSSCOMM conference 16-17 April

Hello everyone,

this weekend FOSSCOMM conference (Free and Open Source Software Communities Meeting) will be held at University of Piraeus.

Together with Patroclos Christou, I will speak about Big Data Streaming processing using Apache Storm and Apache Kafka (event info) on Sunday 17 April 18:00 .

There are many interesting presentations, check full schedule: Saturday & Sunday

I will be glad to meet you there 🙂

Adrianos Dadis.
—
Real Democracy requires Free Software

Posted in Big Data, Java | Tagged , | Leave a comment

Configurable ETL processing using Apache Storm and Kite SDK Morphlines

From the first days I have worked as software engineer, I always hear the same request by many sides:

We want to have everything configurable, we want to change everything on runtime and we want to have a visual tool to apply all this logic in order to non-developer people use and configure our application.

I like this generic scope too, but as we all know software systems are not so adaptable and customer requests are not stable.

In previous years, we have built such configurable applications (not 100% configurable) using traditional frameworks/techniques (JMX, distributed cache, Spring or JEE and more).

In recent years, there is an additional concept that have to be included in our architecture, this is the concept of Big Data (or 3V or 4V or whatever words fit better). This new concept deprecates various solutions or workarounds that we were familiar and applied in old 3 tiers applications.

The funny thing is that many times I find myself in the same position as 10 years back. This is the rule on software development, it never ends and so personal excellence and new adventures never end too 🙂

The main problem remains the same, how to build a configurable ETL distributed application.

For this reason, I have built a mini adaptable solution that might be helpful in many use cases.
I have used 3 common tools in big data world: Java, Apache Storm and Kite SDK Morplines.
Java as the main programming language, Apache Storm as the distributed streaming processing engine and Kite SDK Morphlines as the configurable ETL engine.

Kite SDK Morplines

Copied from its description: Morphlines is an open source framework that reduces the time and efforts necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.

Additional to builtin commands, you can easily implement your own Command and use it in your morphline configuration file.

Sample Morphline configuration that read a JSON string, parse it and then just log a particular JSON element:

morphlines : [{
	id : json_terminal_log
	importCommands : ["org.kitesdk.**"]
	
	commands : [
			# read the JSON blob
			{ readJson: {} }

			# extract JSON objects into head fields
			{ extractJsonPaths {
			  flatten: true
			  paths: {
				name: /name
				age: /age
			  }
			} }

			# log data
			{ logInfo {
				format : "name: {}, record: {}"
  				args : ["@{name}", "@{}"]
			}}
	]
}]

Storm Morphlines Bolt

In order to use Morphlines inside Storm, I have implemented a custom MorphlinesBolt. The main responsibilities of this Bolt are:

  • Initialize Morphlines handler via a configuration file
  • Initialize mapping instructions:
    a) from Tuple to Morphline input and
    b) from Morphline output to new output Tuple
  • Process each incoming event using the already initialized Morplines context
  • If Bolt is not Terminal, then using the provided Mapper (type “b”), emit a new Tuple using the output of Morphline execution

Simple Configurable ETL topologies

In order to test custom MorphlinesBolt, I have written 2 simple tests. In these tests you can see how MorphlinesBolt is initialized and then the result of each execution. As input, I have used a custom Spout (RandomJsonTestSpout) that just emit new JSON strings every 100ms (configurable).

DummyJsonTerminalLogTopology

A simple topology that configure Morphline context via a configuration file and the execute Morphline handler for each incoming Tuple. On this topology, MorphlinesBolt is configured as terminal bolt, which means that for each input Tuple does not emit new Tuple.

public class DummyJsonTerminalLogTopology {
    public static void main(String[] args) throws Exception {
        Config config = new Config();

        RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);

        String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();
        tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);

        MorphlinesBolt morphBolt = new MorphlinesBolt()
                .withTupleMapper(tuppleMapper)
                .withMorphlineId("json_terminal_log")
                .withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("WORD_SPOUT", spout, 1);
        builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("MyDummyJsonTerminalLogTopology");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 1) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");
        }
    }
}

 

DummyJson2StringTopology

A simple topology that configure Morphline context via a configuration file and the execute Morphline handler for each incoming Tuple. On this topology, MorphlinesBolt is configured as normal bolt, which means that for each input Tuple it emits a new Tuple.

public class DummyJson2StringTopology {

    public static void main(String[] args) throws Exception {
        Config config = new Config();

        RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);

        String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();
        tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);

        MorphlinesBolt morphBolt = new MorphlinesBolt()
                .withTupleMapper(tuppleMapper)
                .withMorphlineId("json2string")
                .withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")
                //.withOutputProcessors(Arrays.asList(resultRecordHandlers));
                .withOutputFields(CmnStormCons.TUPLE_FIELD_MSG)
                .withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));

        LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("WORD_SPOUT", spout, 1);
        builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");
        builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("MyDummyJson2StringTopology");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 1) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            System.out.println("Usage: DummyJson2StringTopology <topology_name>");
        }
    }
}

 

Final thoughts

MorphlinesBolt can be used as part of any configurable ETL “solution” (as single processing Bolt, as Terminal Bolt, as part of complex pipeline, etc.).

morphlines_storm_topology_examples

Source code is provided as a maven module (sv-etl-storm-morphlines) within my collection of sample projects in github.

A great combination would be to use MorphlinesBolt with Flux. This might give you a fully configurable ETL topology!!!
I have not added as option yet, in order to keep it with less dependencies (I may added with scope “test”).

This module is not final and I will try to improve it, so you many find various bugs in this first implementation.

For any additional thoughts or clarifications, then please write a comment 🙂

This is my first post in 2016! I hope you good health and with better thoughts and actions. The first virtues/values of everything is the human and the respect to the environment we all live (society, earth, animals, plants, etc.). All the others are secondary priorities and should not ruin what is implied by first priorities. Keep your most important virtues always in your mind and consider them in any action or thought you do.

Regards,
Adrianos Dadis.
—
Real Democracy requires Free Software

Posted in Big Data, Software Development | Tagged , , , , | Leave a comment

New github repository as collection of my sample projects

I have just created a new github repository to act as a collection for my sample projects (sourcevirtues-samples).

This repository is consisted of various sample projects and code examples, mostly related to big data. Further details for each new sample project (module) will be explained in this blog.

Most of these projects are random thoughts and proof-of-concepts that I am interested and play with them.

Please feel free to collaborate, share, ask for help or report issues.

All module names will start with “sv-“ prefix, which is just a helper prefix to avoid any naming collisions in my/your workspace with other projects.

Regards,
Adrianos Dadis.
—
Real Democracy requires Free Software

Posted in Software Development | Tagged | Leave a comment