Running Map Reduce Program using Eclipse
Hi,
The below post lists out my inital experience to setup and code/run Map Reduce Program using Eclipse and Hadoop 2.5.0.
The easiest way is to create a maven project and use appropriate pom.xml to
setup mapreduce project in Eclipse.
Have tried to list the way if one tries to create java project and add all needed libraries (jar) to build path.
Dependency Jars
org-apache-commons-logging.jar
org.apache-commons-collections.jar
apache-commons-lang.jar
hadoop-common-2.5.0.jar
hadoop-common-2.5.0-tests.jar
hadoop-nfs-2.5.0.jar
commons-configuration-1.7.jar
hadoop-auth-2.5.0.jar
hadoop-hdfs-2.5.0.jar
hadoop-hdfs-2.5.0-tests.jar
hadoop-hdfs-nfs-2.5.0.jar
hadoop-mapreduce-client-app-2.5.0.jar
hadoop-mapreduce-client-common-2.5.0.jar
hadoop-mapreduce-client-core-2.5.0.jar
hadoop-mapreduce-client-hs-2.5.0.jar
hadoop-mapreduce-client-hs-plugins-2.5.0.jar
hadoop-mapreduce-client-jobclient-2.5.0.jar
hadoop-mapreduce-client-jobclient-2.5.0-tests.jar
hadoop-mapreduce-client-shuffle-2.5.0.jar
aopalliance-1.0.jar
asm-3.2.jar
avro-1.7.4.jar
commons-compress-1.4.1.jar
commons-io-1.4.1.jar
guice-3.0.jar
guice-servlet-3.0..jar
hadoop-annotations-2.5.0.jar
hamcrest-core-1.3.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
javax.inject-1.jar
jersey-core-1.9.jar
jersey-guiuce-1.9.jar
jersey-server-1.9.jar
junit-4.11.jar
leveldbjni-all-1.8.jar
log4j-1.2.17.jar
netty-3.6.2.Final.jar
paranamer-2.3.jar
protobuf.java-2.5.0.jar
snappy-java-1.0.4.1.jar
xz-1.0.jar
slf4j-api-1.6.1.jar
guava-r438.jar
hadoop-yarn-api-2.5.0.jar
hadoop-yarn-applications-distributedshell-2.5.0.jar
hadoop-yarn-applications-unmanaged-am-launcher-2.5.0.jar
hadoop-yarn-client-2.5.0.jar
hadoop-yarn-common-2.5.0.jar
hadoop-yarn-server-applicationhistoryservice-2.5.0.jar
hadoop-yarn-server-common-2.5.0.jar
hadoop-yarn-server-nodemanager-2.5.0.jar
hadoop-yarn-server-resourcemanager-2.5.0.jar
hadoop-yarn-server-tests-2.5.0.jar
hadoop-yarn-server-web-proxy-2.5.0.jar
org.apache.commons.httpclient.jar
guava-11.0.2.jar
Problem to solve using Map Reduce:
Input file has data like
1,studentA,80
2,studentB,160
3,studentC,120
4,studentD,90
5,studentE,90
6,studentF,140
7,studentG,180
select id,name,mark where mark >100;
Write MR program for above.
Solution code in Java Example:
Mapper, Reducer and Main Program (Job).
Mapper Code:
package org.myorg.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MarkMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
private static final IntWritable count=new IntWritable(1);
protected void map(LongWritable key,Text value, Context ctx)
throws IOException,InterruptedException{
String[] line=value.toString().split(",");
if(Integer.parseInt(line[2])>=200){
ctx.write(count,new Text(value));
}//end if
}//end map
}//end MarkMapper
Reduce Code:
package org.myorg.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MarkReducer extends Reducer<IntWritable,Text,Text,NullWritable>{
protected void reduce(IntWritable key, Iterable<Text> values,Context ctx)
throws IOException,InterruptedException{
for(Text value:values){
ctx.write(value,NullWritable.get());
System.out.println("In Reducer...wrote value"+value);
}
}
}
Main Job Code:
package org.myorg.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MarkMain{
public static void main(String[] args) throws Exception{
if(args.length!=2){
System.err.println("Usgae : MarkMain <input>path <output>path");
System.exit(-1);
}
Configuration conf=new Configuration();
//Job job=new Job();
Job job=Job.getInstance(conf,"max temp");
job.setJarByClass(MarkMain.class);
job.setJobName("Mark Main");
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MarkMapper.class);
job.setReducerClass(MarkReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
Run MarkMain as Java application with arguments input path the input file and output dir
output snap..
[hadoop@MYSERVER nov20_01]$ cat part-r-00000
7,studentG,180
6,studentF,140
3,studentC,120
2,studentB,160
[hadoop@MYSERVER nov20_01]$
Sample Log shows internal steps used by Map Reduce Framework:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2015-11-20 19:00:58,789 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-11-20 19:00:59,081 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2015-11-20 19:00:59,083 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2015-11-20 19:00:59,368 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2015-11-20 19:00:59,376 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(259)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2015-11-20 19:00:59,388 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1
2015-11-20 19:00:59,456 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2015-11-20 19:00:59,608 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local694807713_0001
2015-11-20 19:00:59,671 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/staging/hadoop694807713/.staging/job_local694807713_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-11-20 19:00:59,676 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/staging/hadoop694807713/.staging/job_local694807713_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-11-20 19:00:59,846 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/local/localRunner/hadoop/job_local694807713_0001/job_local694807713_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-11-20 19:00:59,850 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/local/localRunner/hadoop/job_local694807713_0001/job_local694807713_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-11-20 19:00:59,860 INFO [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://localhost:8080/
2015-11-20 19:00:59,861 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_local694807713_0001
2015-11-20 19:00:59,862 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null
2015-11-20 19:00:59,871 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2015-11-20 19:00:59,942 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks
2015-11-20 19:00:59,943 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local694807713_0001_m_000000_0
2015-11-20 19:00:59,995 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ]
2015-11-20 19:01:00,008 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(733)) - Processing split: file:/u01/hadoop/Work/MapR/inputdata/sample00.txt:0+91
2015-11-20 19:01:00,027 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(388)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2015-11-20 19:01:00,299 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1182)) - (EQUATOR) 0 kvi 26214396(104857584)
2015-11-20 19:01:00,299 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(975)) - mapreduce.task.io.sort.mb: 100
2015-11-20 19:01:00,300 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(976)) - soft limit at 83886080
2015-11-20 19:01:00,300 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(977)) - bufstart = 0; bufvoid = 104857600
2015-11-20 19:01:00,300 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(978)) - kvstart = 26214396; length = 6553600
2015-11-20 19:01:00,318 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) -
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1437)) - Starting flush of map output
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1455)) - Spilling map output
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1456)) - bufstart = 0; bufend = 65; bufvoid = 104857600
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1458)) - kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
2015-11-20 19:01:00,328 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1641)) - Finished spill 0
2015-11-20 19:01:00,342 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local694807713_0001_m_000000_0 is done. And is in the process of committing
2015-11-20 19:01:00,352 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map
2015-11-20 19:01:00,352 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local694807713_0001_m_000000_0' done.
2015-11-20 19:01:00,352 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local694807713_0001_m_000000_0
2015-11-20 19:01:00,353 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2015-11-20 19:01:00,355 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks
2015-11-20 19:01:00,363 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local694807713_0001_r_000000_0
2015-11-20 19:01:00,372 INFO [pool-3-thread-1] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ]
2015-11-20 19:01:00,375 INFO [pool-3-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@282d239b
2015-11-20 19:01:00,396 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(193)) - MergerManager: memoryLimit=1503238528, maxSingleShuffleLimit=375809632, mergeThreshold=992137472, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2015-11-20 19:01:00,400 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local694807713_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2015-11-20 19:01:00,450 INFO [localfetcher#1] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(140)) - localfetcher#1 about to shuffle output of map attempt_local694807713_0001_m_000000_0 decomp: 75 len: 79 to MEMORY
2015-11-20 19:01:00,460 INFO [localfetcher#1] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 75 bytes from map-output for attempt_local694807713_0001_m_000000_0
2015-11-20 19:01:00,508 INFO [localfetcher#1] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(307)) - closeInMemoryFile -> map-output of size: 75, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->75
2015-11-20 19:01:00,509 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning
2015-11-20 19:01:00,511 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2015-11-20 19:01:00,511 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(667)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2015-11-20 19:01:00,568 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2015-11-20 19:01:00,568 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 69 bytes
2015-11-20 19:01:00,574 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(742)) - Merged 1 segments, 75 bytes to disk to satisfy reduce memory limit
2015-11-20 19:01:00,580 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(772)) - Merging 1 files, 79 bytes from disk
2015-11-20 19:01:00,581 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(787)) - Merging 0 segments, 0 bytes from memory into reduce
2015-11-20 19:01:00,581 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2015-11-20 19:01:00,585 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 69 bytes
2015-11-20 19:01:00,586 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2015-11-20 19:01:00,712 INFO [pool-3-thread-1] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
In Reducer...wrote value7,studentG,180
In Reducer...wrote value6,studentF,140
In Reducer...wrote value3,studentC,120
In Reducer...wrote value2,studentB,160
2015-11-20 19:01:00,733 INFO [pool-3-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local694807713_0001_r_000000_0 is done. And is in the process of committing
2015-11-20 19:01:00,735 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2015-11-20 19:01:00,736 INFO [pool-3-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local694807713_0001_r_000000_0 is allowed to commit now
2015-11-20 19:01:00,737 INFO [pool-3-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local694807713_0001_r_000000_0' to file:/u01/hadoop/Work/MapR/outputdata/nov20_01/_temporary/0/task_local694807713_0001_r_000000
2015-11-20 19:01:00,738 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce
2015-11-20 19:01:00,738 INFO [pool-3-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local694807713_0001_r_000000_0' done.
2015-11-20 19:01:00,739 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local694807713_0001_r_000000_0
2015-11-20 19:01:00,739 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.
2015-11-20 19:01:00,864 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local694807713_0001 running in uber mode : false
2015-11-20 19:01:00,866 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%
2015-11-20 19:01:00,868 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local694807713_0001 completed successfully
2015-11-20 19:01:00,893 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 33
File System Counters
FILE: Number of bytes read=708
FILE: Number of bytes written=460542
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=7
Map output records=4
Map output bytes=65
Map output materialized bytes=79
Input split bytes=114
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=79
Reduce input records=4
Reduce output records=4
Spilled Records=8
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=106
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=756023296
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=91
File Output Format Counters
Bytes Written=61
--------------------------------------
output snap..
[hadoop@MYSERVER nov20_01]$ cat part-r-00000
7,green,799
6,yellow,299
3,yellow,220
2,blue,300
[hadoop@MYSERVER nov20_01]$
The below post lists out my inital experience to setup and code/run Map Reduce Program using Eclipse and Hadoop 2.5.0.
The easiest way is to create a maven project and use appropriate pom.xml to
setup mapreduce project in Eclipse.
Have tried to list the way if one tries to create java project and add all needed libraries (jar) to build path.
Dependency Jars
org-apache-commons-logging.jar
org.apache-commons-collections.jar
apache-commons-lang.jar
hadoop-common-2.5.0.jar
hadoop-common-2.5.0-tests.jar
hadoop-nfs-2.5.0.jar
commons-configuration-1.7.jar
hadoop-auth-2.5.0.jar
hadoop-hdfs-2.5.0.jar
hadoop-hdfs-2.5.0-tests.jar
hadoop-hdfs-nfs-2.5.0.jar
hadoop-mapreduce-client-app-2.5.0.jar
hadoop-mapreduce-client-common-2.5.0.jar
hadoop-mapreduce-client-core-2.5.0.jar
hadoop-mapreduce-client-hs-2.5.0.jar
hadoop-mapreduce-client-hs-plugins-2.5.0.jar
hadoop-mapreduce-client-jobclient-2.5.0.jar
hadoop-mapreduce-client-jobclient-2.5.0-tests.jar
hadoop-mapreduce-client-shuffle-2.5.0.jar
aopalliance-1.0.jar
asm-3.2.jar
avro-1.7.4.jar
commons-compress-1.4.1.jar
commons-io-1.4.1.jar
guice-3.0.jar
guice-servlet-3.0..jar
hadoop-annotations-2.5.0.jar
hamcrest-core-1.3.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
javax.inject-1.jar
jersey-core-1.9.jar
jersey-guiuce-1.9.jar
jersey-server-1.9.jar
junit-4.11.jar
leveldbjni-all-1.8.jar
log4j-1.2.17.jar
netty-3.6.2.Final.jar
paranamer-2.3.jar
protobuf.java-2.5.0.jar
snappy-java-1.0.4.1.jar
xz-1.0.jar
slf4j-api-1.6.1.jar
guava-r438.jar
hadoop-yarn-api-2.5.0.jar
hadoop-yarn-applications-distributedshell-2.5.0.jar
hadoop-yarn-applications-unmanaged-am-launcher-2.5.0.jar
hadoop-yarn-client-2.5.0.jar
hadoop-yarn-common-2.5.0.jar
hadoop-yarn-server-applicationhistoryservice-2.5.0.jar
hadoop-yarn-server-common-2.5.0.jar
hadoop-yarn-server-nodemanager-2.5.0.jar
hadoop-yarn-server-resourcemanager-2.5.0.jar
hadoop-yarn-server-tests-2.5.0.jar
hadoop-yarn-server-web-proxy-2.5.0.jar
org.apache.commons.httpclient.jar
guava-11.0.2.jar
Problem to solve using Map Reduce:
Input file has data like
1,studentA,80
2,studentB,160
3,studentC,120
4,studentD,90
5,studentE,90
6,studentF,140
7,studentG,180
select id,name,mark where mark >100;
Write MR program for above.
Solution code in Java Example:
Mapper, Reducer and Main Program (Job).
Mapper Code:
package org.myorg.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MarkMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
private static final IntWritable count=new IntWritable(1);
protected void map(LongWritable key,Text value, Context ctx)
throws IOException,InterruptedException{
String[] line=value.toString().split(",");
if(Integer.parseInt(line[2])>=200){
ctx.write(count,new Text(value));
}//end if
}//end map
}//end MarkMapper
Reduce Code:
package org.myorg.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MarkReducer extends Reducer<IntWritable,Text,Text,NullWritable>{
protected void reduce(IntWritable key, Iterable<Text> values,Context ctx)
throws IOException,InterruptedException{
for(Text value:values){
ctx.write(value,NullWritable.get());
System.out.println("In Reducer...wrote value"+value);
}
}
}
Main Job Code:
package org.myorg.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MarkMain{
public static void main(String[] args) throws Exception{
if(args.length!=2){
System.err.println("Usgae : MarkMain <input>path <output>path");
System.exit(-1);
}
Configuration conf=new Configuration();
//Job job=new Job();
Job job=Job.getInstance(conf,"max temp");
job.setJarByClass(MarkMain.class);
job.setJobName("Mark Main");
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MarkMapper.class);
job.setReducerClass(MarkReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
Run MarkMain as Java application with arguments input path the input file and output dir
output snap..
[hadoop@MYSERVER nov20_01]$ cat part-r-00000
7,studentG,180
6,studentF,140
3,studentC,120
2,studentB,160
[hadoop@MYSERVER nov20_01]$
Sample Log shows internal steps used by Map Reduce Framework:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2015-11-20 19:00:58,789 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-11-20 19:00:59,081 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2015-11-20 19:00:59,083 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2015-11-20 19:00:59,368 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2015-11-20 19:00:59,376 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(259)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2015-11-20 19:00:59,388 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1
2015-11-20 19:00:59,456 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2015-11-20 19:00:59,608 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local694807713_0001
2015-11-20 19:00:59,671 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/staging/hadoop694807713/.staging/job_local694807713_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-11-20 19:00:59,676 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/staging/hadoop694807713/.staging/job_local694807713_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-11-20 19:00:59,846 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/local/localRunner/hadoop/job_local694807713_0001/job_local694807713_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-11-20 19:00:59,850 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-hadoop/mapred/local/localRunner/hadoop/job_local694807713_0001/job_local694807713_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-11-20 19:00:59,860 INFO [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://localhost:8080/
2015-11-20 19:00:59,861 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_local694807713_0001
2015-11-20 19:00:59,862 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null
2015-11-20 19:00:59,871 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2015-11-20 19:00:59,942 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks
2015-11-20 19:00:59,943 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local694807713_0001_m_000000_0
2015-11-20 19:00:59,995 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ]
2015-11-20 19:01:00,008 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(733)) - Processing split: file:/u01/hadoop/Work/MapR/inputdata/sample00.txt:0+91
2015-11-20 19:01:00,027 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(388)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2015-11-20 19:01:00,299 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1182)) - (EQUATOR) 0 kvi 26214396(104857584)
2015-11-20 19:01:00,299 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(975)) - mapreduce.task.io.sort.mb: 100
2015-11-20 19:01:00,300 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(976)) - soft limit at 83886080
2015-11-20 19:01:00,300 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(977)) - bufstart = 0; bufvoid = 104857600
2015-11-20 19:01:00,300 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(978)) - kvstart = 26214396; length = 6553600
2015-11-20 19:01:00,318 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) -
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1437)) - Starting flush of map output
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1455)) - Spilling map output
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1456)) - bufstart = 0; bufend = 65; bufvoid = 104857600
2015-11-20 19:01:00,319 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1458)) - kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
2015-11-20 19:01:00,328 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1641)) - Finished spill 0
2015-11-20 19:01:00,342 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local694807713_0001_m_000000_0 is done. And is in the process of committing
2015-11-20 19:01:00,352 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map
2015-11-20 19:01:00,352 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local694807713_0001_m_000000_0' done.
2015-11-20 19:01:00,352 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local694807713_0001_m_000000_0
2015-11-20 19:01:00,353 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2015-11-20 19:01:00,355 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks
2015-11-20 19:01:00,363 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local694807713_0001_r_000000_0
2015-11-20 19:01:00,372 INFO [pool-3-thread-1] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ]
2015-11-20 19:01:00,375 INFO [pool-3-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@282d239b
2015-11-20 19:01:00,396 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(193)) - MergerManager: memoryLimit=1503238528, maxSingleShuffleLimit=375809632, mergeThreshold=992137472, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2015-11-20 19:01:00,400 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local694807713_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2015-11-20 19:01:00,450 INFO [localfetcher#1] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(140)) - localfetcher#1 about to shuffle output of map attempt_local694807713_0001_m_000000_0 decomp: 75 len: 79 to MEMORY
2015-11-20 19:01:00,460 INFO [localfetcher#1] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 75 bytes from map-output for attempt_local694807713_0001_m_000000_0
2015-11-20 19:01:00,508 INFO [localfetcher#1] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(307)) - closeInMemoryFile -> map-output of size: 75, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->75
2015-11-20 19:01:00,509 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning
2015-11-20 19:01:00,511 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2015-11-20 19:01:00,511 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(667)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2015-11-20 19:01:00,568 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2015-11-20 19:01:00,568 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 69 bytes
2015-11-20 19:01:00,574 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(742)) - Merged 1 segments, 75 bytes to disk to satisfy reduce memory limit
2015-11-20 19:01:00,580 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(772)) - Merging 1 files, 79 bytes from disk
2015-11-20 19:01:00,581 INFO [pool-3-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(787)) - Merging 0 segments, 0 bytes from memory into reduce
2015-11-20 19:01:00,581 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2015-11-20 19:01:00,585 INFO [pool-3-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 69 bytes
2015-11-20 19:01:00,586 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2015-11-20 19:01:00,712 INFO [pool-3-thread-1] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
In Reducer...wrote value7,studentG,180
In Reducer...wrote value6,studentF,140
In Reducer...wrote value3,studentC,120
In Reducer...wrote value2,studentB,160
2015-11-20 19:01:00,733 INFO [pool-3-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local694807713_0001_r_000000_0 is done. And is in the process of committing
2015-11-20 19:01:00,735 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2015-11-20 19:01:00,736 INFO [pool-3-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local694807713_0001_r_000000_0 is allowed to commit now
2015-11-20 19:01:00,737 INFO [pool-3-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local694807713_0001_r_000000_0' to file:/u01/hadoop/Work/MapR/outputdata/nov20_01/_temporary/0/task_local694807713_0001_r_000000
2015-11-20 19:01:00,738 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce
2015-11-20 19:01:00,738 INFO [pool-3-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local694807713_0001_r_000000_0' done.
2015-11-20 19:01:00,739 INFO [pool-3-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local694807713_0001_r_000000_0
2015-11-20 19:01:00,739 INFO [Thread-11] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.
2015-11-20 19:01:00,864 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local694807713_0001 running in uber mode : false
2015-11-20 19:01:00,866 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%
2015-11-20 19:01:00,868 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local694807713_0001 completed successfully
2015-11-20 19:01:00,893 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 33
File System Counters
FILE: Number of bytes read=708
FILE: Number of bytes written=460542
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=7
Map output records=4
Map output bytes=65
Map output materialized bytes=79
Input split bytes=114
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=79
Reduce input records=4
Reduce output records=4
Spilled Records=8
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=106
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=756023296
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=91
File Output Format Counters
Bytes Written=61
--------------------------------------
output snap..
[hadoop@MYSERVER nov20_01]$ cat part-r-00000
7,green,799
6,yellow,299
3,yellow,220
2,blue,300
[hadoop@MYSERVER nov20_01]$
Comments
Post a Comment