Spark Kafka Integration

1. Download and configure JDK1.8
2. Download and configure Hadoop 2.6
3. Download and configure Spark for Hadoop2.6
    Start master and access the url via http://localhost:8080.
    Start slave pass the spark://localhost:<port> seen in the master
4.Download and configure Kafka

5. Start producer/broker..test using a message and check if spark gets the message.

a) a uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
> bin/ config/
Now start the Kafka server:
> bin/ config/ 
[2016-07-13 04:54:07,557] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-07-13 04:54:07,598] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

c) Create Topic
Let's create a topic named "test" with a single partition and only one replica:
> bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
We can now see that topic if we run the list topic command:
> bin/ --list --zookeeper localhost:2181
d)Run the producer and send messages  to the server. --broker-list localhost:9092 --topic testlogs </filepath/access_log.txt 
6. Eclipse Side

make sure u have spark-examples-1.6.1-hadoop2.6.0.jar in the build path

sample consumer programs are available that will create a direct stream using kafka utils and read from the topic

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])
once you have the DStream , you can extract the fields and save/process.

* package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 * Example:
 *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
 *    topic1,topic2
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from


    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines =
    val words = lines.flatMap(_.split(" "))
    val wordCounts = => (x, 1L)).reduceByKey(_ + _)

    // Start the computation



