Spark Kafka Integration

1. Download and configure JDK1.8
2. Download and configure Hadoop 2.6
3. Download and configure Spark for Hadoop2.6
    Start master and access the url via http://localhost:8080.
    Start slave pass the spark://localhost:<port> seen in the master
4.Download and configure Kafka
(http://kafka.apache.org/documentation.html#quickstart)

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

5. Start producer/broker..test using a message and check if spark gets the message.

a) a uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
> bin/zookeeper-server-start.sh config/zookeeper.properties
 
b) 
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties 
[2016-07-13 04:54:07,557] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-07-13 04:54:07,598] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

c) Create Topic
Let's create a topic named "test" with a single partition and only one replica:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
We can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test 
 
d)Run the producer and send messages  to the server.
 kafka-console-producer.sh --broker-list localhost:9092 --topic testlogs </filepath/access_log.txt 
 
6. Eclipse Side
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

make sure u have spark-examples-1.6.1-hadoop2.6.0.jar in the build path

sample consumer programs are available that will create a direct stream using kafka utils and read from the topic

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])
 
once you have the DStream , you can extract the fields and save/process.
 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 







* package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
 *    topic1,topic2
 */
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
































































 


Comments

Popular posts from this blog

ScoreCard Model using R

Zeppelin and Anaconda

The auxService:mapreduce_shuffle does not exist