Apache Storm: How to configure KafkaBolt with Flux

Flux in a mini framework that can help us define and deploy a Storm topology.

Flux has various wrappers that help you define the required stream(s) and initialize your Bolts and Spouts (using constructor with or without arguments and call custom configuration methods automatically via reflection).

What you only need to use Flux is to add it as dependency in your “pom.xml”, configure it via a single YAML file (check flux examples) and then use it as main class to deploy your topology in a Storm cluster (or as local test).

In order to initialize a KafkaBolt the following steps are needed:

  1. Define a “topicSelector” via “withTopicSelector” method
  2. Define a “kafkaMapper” via “withTupleToKafkaMapper” method
  3. Define a “kafkaProducerProps” via “withProducerProperties” method
  4. Initialize “org.apache.storm.kafka.bolt.KafkaBolt” with above configuration
  5. Include above KafkaBolt as part of a stream

Minimal Flux configuration example for KafkaBolt:

components:
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "localhost:2181"

  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "myTopicName"

  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"

  - id: "kafkaProducerProps"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "localhost:9092"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer" 

bolts:    
  - id: "bolt-kafka"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 1
    configMethods:
      - name: "withProducerProperties"
        args: [ref: "kafkaProducerProps"]
      - name: "withTopicSelector"
        args: [ref: "topicSelector"]
      - name: "withTupleToKafkaMapper"
        args: [ref: "kafkaMapper"]

streams:
  - name: "spout --> kafkaBolt"
    from: "spout-1"
    to: "bolt-kafka"
    grouping:
      type: LOCAL_OR_SHUFFLE

For a full working configuration example check this, which can be used like this.

Example command to deploy your topology on Storm:

storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

Flux configuration for KafkaSpout is already described as official flux example.

Flux is a really helpful framework that eliminates required custom code to define and initialize a topology 🙂

Regards,
Adrianos Dadis

Real Democracy requires Free Software

Advertisements

About Adrianos Dadis

Building Big Data & Streaming processing solutions in telcos business domain. Interested in distributed systems and enterprise integration.
This entry was posted in Big Data, Java, Software Development and tagged , . Bookmark the permalink.

Post your thought

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s