Spark Kafka Integration
1. Download and configure JDK1.8
2. Download and configure Hadoop 2.6
3. Download and configure Spark for Hadoop2.6
Start master and access the url via http://localhost:8080.
Start slave pass the spark://localhost:<port> seen in the master
4.Download and configure Kafka
(http://kafka.apache.org/documentation.html#quickstart)
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
5. Start producer/broker..test using a message and check if spark gets the message.
a) a uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
[2016-07-13 04:54:07,598] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
c) Create Topic
Let's create a topic named "test" with a single partition and only one replica:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
make sure u have spark-examples-1.6.1-hadoop2.6.0.jar in the build path
sample consumer programs are available that will create a direct stream using kafka utils and read from the topic
2. Download and configure Hadoop 2.6
3. Download and configure Spark for Hadoop2.6
Start master and access the url via http://localhost:8080.
Start slave pass the spark://localhost:<port> seen in the master
4.Download and configure Kafka
(http://kafka.apache.org/documentation.html#quickstart)
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
5. Start producer/broker..test using a message and check if spark gets the message.
a) a uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
> bin/zookeeper-server-start.sh config/zookeeper.properties
b)
Now start the Kafka server: > bin/kafka-server-start.sh config/server.properties[2016-07-13 04:54:07,557] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-07-13 04:54:07,598] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
c) Create Topic
Let's create a topic named "test" with a single partition and only one replica:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testWe can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
d)Run the producer and send messages to the server.
kafka-console-producer.sh --broker-list localhost:9092 --topic testlogs </filepath/access_log.txt
6. Eclipse Side
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
make sure u have spark-examples-1.6.1-hadoop2.6.0.jar in the build path
sample consumer programs are available that will create a direct stream using kafka utils and read from the topic
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
once you have the DStream , you can extract the fields and save/process.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
* | package org.apache.spark.examples.streaming import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf /** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: DirectKafkaWordCount <brokers> <topics> * <brokers> is a list of one or more Kafka brokers * <topics> is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ * topic1,topic2 */ object DirectKafkaWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(brokers, topics) = args // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() } } |
Comments
Post a Comment