fbpx
Select Page

Hadoop Map Reduce Programs for Word Count with Steps

Introduction:

Hadoop is an open source software framework designed for storage and processing of large scale variety of data on clusters of commodity hardware. The Apache Hadoop software library is a framework that allows the data distributed processing across clusters for computing using simple programming models called as Map Reduce. It is designed to scale up from single servers to cluster of machines and each offering local computation and storage in efficient way. Hadoop solutions normally include clusters that are hard to manage and maintain. In many scenarios it requires an integration with other tools like mysql, mahout etc. It works in series of map reduce jobs and each of these jobs are high-latency and depend with each other. So no job can start until previous job has been finished and successfully completed. Map Reduce: Map Reduce is a programming model of hadoop for processing a hdfs data. Apache Hadoop can run MapReduce programs written in different languages like Java, Ruby, and Python. MapReduce programs executes in parallel in cluster efficiently. It works in following phases:

1. Map phase
2. Reduce phase

Tools in Hadoop:
HDFS (Hadoop Distributed File System) is a basic storage for Hadoop.
Apache Pig is an ETL (Extract Transform and Load) tool.
Map Reduce is a programmatic model engine to execute MR jobs.
Apache Hive is a Data Warehouse tool used to work on Historical data using HQL.
Apache Sqoop is a tool for Import and export data from RDBMS to HDFS and Vice-Versa.
Apache Ooozie is a tool for Job scheduling to control applications over cluster.
Apache hbase is a NoSQL database based on CAP(Consistency Automacity Partition) theory.
Spark is a framework does in memory computation and works with hadoop. This framework in based on scala and java language. An input to each phase is key-value pairs in hadoop.

We are going to see a some program with steps. In this program we will see how to create mapper and reducer class to achieve the objectives. We will also see to submit hadoop job through terminal on hadoop cluster.

Writing Word Count Program:

For Free Demo classes Call: 8605110150

Registration Link: Click Here!

Step1: Add Following Reference Libraries –
Copy Following Jars from /usr/lib/hadoop/ AND /usr/lib/hadoop/lib
AND /usr/lib/hadoop-0.20-mapreduce/
commons-cli-1.2.jar
hadoop-common-2.6.0-cdh5.13.0.jar
hadoop-common.jar
hadoop-core-2.6.0-mr1-cdh5.13.0.jar
hadoop-core-mr1.jar

Step2: Type following Program in WordCount.java file:
————————————————————————
package PackageDemo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration c = new Configuration();
String[] files = new GenericOptionsParser(c, args).getRemainingArgs();
Path input = new Path(files[0]);
Path output = new Path(files[1]);
Job j = new Job(c, “wordcount”);
j.setJarByClass(WordCount.class);
j.setMapperClass(MapForWordCount.class);
j.setReducerClass(ReduceForWordCount.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);
System.exit(j.waitForCompletion(true) ? 0 : 1);
}
public static class MapForWordCount extends
Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
value.set(tokenizer.nextToken());
con.write(value, new IntWritable(1));
}
}
}
public static class ReduceForWordCount extends
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text word, Iterable<IntWritable> values, Context con)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable value : values)
{
sum += value.get();
} con.write(word, new IntWritable(sum));
}
}
}
————————————————————————

Explanation:
The program consist of 3 classes:
Driver class (Public void static main- the entry point)
Map class which extends public class
Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> and implements the Map
function.
Reduce class which extends public class
Reducer<KEYIN,VALUEIN, KEYOUT, VALUEOUT> and implements the Reduce
function.

WordCount is Main Class / Driver class – Contains Main Method – Must have jobConf
and JobClient
Map – Mapper Logic, Processing
Reduce – Reducer Logic, Aggregation Logic
StringTokenizer class – to convert line into tokens (words)
JobConf- Configure the job -> job name, mapper class name, reducer class
name,input path, output path
Also have methods like setJobName()
setMapperClass()
setRedcuerClass()
setInputFormat()
setOutputFormat()
setOutputKeyClass(Text.class); //words
setOutputValueClass(IntWritable.class); //counter-word
FileInputFormat.setInputPaths(conf, new Path(args[0])); //through
command line in hadoop input file
FileOutputFormat.setOutputPath(conf, new Path(args[1])); //through
command line in hadoop output file

JobClient- To submit job to MR Framework(Job Tracker)
(Data Types)(Interfaces)
Java MapReduce
String Text
Int IntWritable
Long LongWritable
Float FloatWritable
Mapper<LongWritable, Text, Text, IntWritable>
KeyIn valueIN keyOut valueOut

StringTokenizer class – to convert line into tokens (words) based on white character In word count program the mapper will run three times-3 inputs for the shuffler (Job of OutputCollector)
So always there is a OutputCollector class to pass k,v’s to another phase.

Reducer<Text, IntWritable, Text, IntWritable> (word,1, word, counter)
Shuffler is doing shuffle words and make buckets with value 1 So reducer counts the
values as logic we have written
Reporter class gives u the statistics of job..like % done..
*In Single node cluster…Number of Mappers is 1
In Multi node cluster ..Number of Mappers Default is 3 (Different Threads)
In sqoop we can increase number of mappers: -m 3
Number of reducer task is 1 default- can increase.. One Instance may run many threads
for parallel processing
Or in main method in word count file
conf.setNumMapTasks(5);
conf.setNumReduceTasks(5);

Will have a common JVM for that with Multi Threading….
Partioner means: Customize Shuffler (Is a default Partitioner)
Combiner means: Local Reducer for the Mapper before Shuffler..It can work on
individual mapper to reduce the data amount on network transmission

Each Slave Machine is a Seperate JVM..
————————————————————————-
Step3: Finally: Right Click on Project> Export> Select export destination as Jar File > next> Finish
Step4: Take a text file with words data and move it in HDFS. wordcountFile

Sample data: abcd,ade,dfd,sdsds,acd,bat,cat,cat,bat,abcd

Step5: To Move this into Hadoop directly, open the terminal and enter the following

For Free Demo classes Call: 8605110150

Registration Link: Click Here!

commands:
> sudo hadoop fs -put wordcountFile wordCountFile

Step 6: Run Jar file *No need to create output directory as mentioned below: MRDir1 Will be created automatically on hdfs
*Also no need to move jar to hdfs, you can run from Desktop as path given Syntax: (hadoop jar jarfilename.jar packageName.ClassName PathToInputTextFile
PathToOutputDirectry)
> sudo hadoop jar /home/cloudera/Desktop/MRProgramsDemo.jar
PackageDemo.WordCount wordCountFile MRDir1

Step 7: Check output
> sudo hadoop fs -ls MRDir1
> sudo hadoop fs -cat MRDir1/part-r-00000
ABCD 2
ACD 1
ADE 1
BAT 2
CAT 2
DFD 1
SDSDS 1

Step 8: [[email protected] ~]$ hadoop fs -put Desktop/demo.txt /user/cloudera

Step9: [[email protected] ~]$ sudo hadoop jar Desktop/MRProgram.jar
com.sevenmentor.WordCount /user/cloudera/demo.txt /user/cloudera/MROutput
[[email protected] ~]$ hadoop fs -cat /user/cloudera/MROutput/part*
Mapper control:
No. of Mapper = No. of input Splits

Step10: Terminal > sudo hadoop jar Desktop/MRProgram.jar
com.sevenmentor.WordCount -Dmapred.max.split.size=1
/user/cloudera/a.txt /user/cloudera/MROutput
Size is in MB
————————————————————————————-
Following Program counts the words matches with Spark and hadoop.
package com.sevenmentor;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration c = new Configuration();
String[] files = new GenericOptionsParser(c, args).getRemainingArgs();
Path input = new Path(files[0]);
Path output = new Path(files[1]);
Job j = new Job(c, “wordcount”);
j.setJarByClass(WordCount.class);
j.setMapperClass(MapForWordCount.class);
j.setReducerClass(ReduceForWordCount.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);
System.exit(j.waitForCompletion(true) ? 0 : 1);
}
public static class MapForWordCount extends
Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
String str = tokenizer.nextToken().toLowerCase();
if(str==”spark” || str==”hadoop”){
value.set(str);
con.write(value,new IntWritable(1));;
}
}
}
}
public static class ReduceForWordCount extends
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text word, Iterable<IntWritable> values, Context con)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable value : values)
{
sum += value.get();
} con.write(word, new IntWritable(sum));
}
}
} ————————————————————————————-
Following Program counts the words having counters > 3 and contains vowels.
package com.sevenmentor;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration c = new Configuration();
String[] files = new GenericOptionsParser(c, args).getRemainingArgs();
Path input = new Path(files[0]);
Path output = new Path(files[1]);
Job j = new Job(c, “wordcount”);
j.setJarByClass(WordCount.class);
j.setMapperClass(MapForWordCount.class);
j.setReducerClass(ReduceForWordCount.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);
System.exit(j.waitForCompletion(true) ? 0 : 1);
}
public static class MapForWordCount extends
Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int flag=0;
while(tokenizer.hasMoreTokens()){
String str = tokenizer.nextToken().toLowerCase();
for(int i=0;i <str.length();i++){
if((str.charAt(i) == ‘a’) ||
(str.charAt(i) == ‘e’) ||
(str.charAt(i) == ‘i’) ||
(str.charAt(i) == ‘o’) ||
(str.charAt(i) == ‘u’)) {
flag=1;
}
}
if(flag==1){
value.set(str);
con.write(value,new IntWritable(1));
}
}
}
}
public static class ReduceForWordCount extends
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text word, Iterable<IntWritable> values, Context con)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable value : values)
{
sum += value.get();
} if(sum >
3){
con.write(word, new IntWritable(sum));
}
}
}
} —————————————————————————————

For Free Demo classes Call: 8605110150

Registration Link: Click Here!

In Apache Pig style:
As we have Apache Pig is an ETL tool which is an abstraction over map reduce. We
have written following code. Finally it goes same MR engine to perform tasks as per
phases.

Program 1:
With Pig Only (Without external .jar):
>lines = load ‘wordcount’ as (line:chararray);
>words = foreach lines generate FLATTEN(TOKENIZE(line)) as word;
>grp = GROUP words by word;
>wordcount = foreach grp generate group, COUNT(words);
OR
>wordcount = foreach grp generate group, SIZE(words);

> DUMP wordcount;
Output:
(ABCD, 2)
(ACD,1)
(ADE,1)
(BAT,2)
(CAT,2)
(DFD,1)
(SDSDS,1)

Program 2: Counting specific words.
>lines = load ‘wordcount’ as (line:chararray);
>wordss = foreach lines generate FLATTEN(TOKENIZE(line)) as word;
>words = FILTER wordss BY word==’spark’ OR word==’hadoop’
>grp = GROUP words by word;
>wordcount = foreach grp generate group, COUNT(words);
OR
>wordcount = foreach grp generate group, SIZE(words);
> DUMP wordcount;

Program 3:
Counting words have counter > 3 and contains vowels – Through UDF
>lines = load ‘wordcount’ as (line:chararray);
>wordss = foreach lines generate FLATTEN(TOKENIZE(line)) as word;
>words = FILTER wordss BY MyUDF.CheckVowels(word)==1
>grp = GROUP words by word;
>wordcount = foreach grp generate group, COUNT(words) as cnt;
>wordcnt =FILTER wordcount BY cnt>3;
> DUMP wordcnt;

Creating UDF using Java:
Implementing MyUDF.checkVowels()

package MyUDF; //this is mandatory to write package name created
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class CheckVowels extends EvalFunc<Integer>{
public Integer exec(Tuple input) throws IOException {
int flag=0;
if (input == null || input.size() == 0) return flag;
String str = ((String)input.get(0)).toLowerCase();
for(int i=0;i <str.length();i++){
if((str.charAt(i) == ‘a’) ||
(str.charAt(i) == ‘e’) ||
(str.charAt(i) == ‘i’) ||
(str.charAt(i) == ‘o’) ||
(str.charAt(i) == ‘u’)) {
flag=1;
}
} if(flag==1) return 1;
else return 0;
}
}
————————————————————————————-
Word Count Program with Partitioner

What is Partitioner:
Partitioner is to controls the partitioning of intermediate key of mapper output. Here
key is used to derive the partition using hash function. A total number of partitions
will be depends on total number of reduce tasks.

Program:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class Test {
public static class MyMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String ln = value.toString();
StringTokenizer tokens = new StringTokenizer(ln);
while (tokens.hasMoreTokens()) {
value.set(tokenizer.nextToken());
output.collect(value, new IntWritable(1));
}
}
}
public static class Partitioner1 implements Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String mKey = key.toString().toLowerCase();
if (mKey.equals(“spark”)) {
return 0;
} if (
mKey.equals(“hadoop”)) {
return 1;
} else {
return 2;
}
}
@Override
public void configure(JobConf arg0) {
// New instance of JobConf to change Job Configurations
}
}
public static class MyReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter report)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
// sum = sum + 1;
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] arg) throws Exception {
JobConf conf = new JobConf(Test.class);
conf.setJobName(“wordcount_Part”);
// Applying program to run 3 reducers
conf.setNumReduceTasks(3);
conf.setMapperClass(MyMap.class);
conf.setCombinerClass(MyReduce.class);
conf.setReducerClass(MyReduce.class);
conf.setPartitionerClass(Partitioner1.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(arg[0]));
FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
JobClient.runJob(conf);
}
}

Call the Trainer and Book your free demo Class now!!!

call icon

© Copyright 2019 | Sevenmentor Pvt Ltd.

Author:

Mr. Sachin Patil (Hadoop Trainer and Coordinator Exp: 12+ years)

At Sevenmentor Pvt. Ltd.






Pin It on Pinterest