Introduction
MapReduce can be defined as a programming model specifically designed to process huge amount of data in parallel.
Description
MapReduce framework is a light weight framework used for processing huge amount of data so we need to understand that the model is efficient only if we have multiple commodity servers and the processing is working in a distributed manner on all the servers. The framework is scalable, fault tolerant and easily extendable. We will now write the following functions to crate a complete example
- Write a driver class
- Write a Map class
- Write a Reducer class
Listing 1: Following is the driver class for starting Map-Reduce program
package io.com.mrbool import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MrBoolDriver { public static void main(String[] args) throws Exception { // Create config and load xml files Configuration config = new Configuration(); config.addResource(new Path("/usr/etc/hadoop/core-site.xml")); config.addResource(new Path("/usr/etc/hadoop/hdfs-site.xml")); // Create MapReduce job Job mapjob = new Job(config,"MrBoolDriver.class"); mapjob.setJarByClass(MrBoolDriver.class); mapjob.setJobName("Mrbool MapReduce"); // Set output kay and value class mapjob.setOutputKeyClass(Text.class); mapjob.setOutputValueClass(Text.class); // Set Map class mapjob.setMapperClass(MrBoolMap.class); mapjob.setNumReduceTasks(30); // Set reducer class mapjob.setCombinerClass(MrBoolReducer.class); mapjob.setReducerClass(MrBoolReducer.class); mapjob.setMapOutputKeyClass(Text.class); mapjob.setMapOutputValueClass(Text.class); // Set number of reducer tasks mapjob.setNumReduceTasks(10); mapjob.setInputFormatClass(TextInputFormat.class); mapjob.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(mapjob, new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/input/")); FileOutputFormat.setOutputPath(mapjob,new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/output")); // Start MapReduce job mapjob.waitForCompletion(true); } }
The map () function
The map () function is the first function where the processing starts. It forms the key-value pair on the received dataset and then splits it on distributed node of commodity servers. The map () function knows where the data resides on distributed file system and executes it on there. The advantage is that the processing of data is done on the location where the data resides. In normal programming paradigm the data is fetched to the location where the application s running and then the processing starts and it is a costly operation. But in map-reduce programming model the execution is done where the data resides and hence it is first and efficient.
Listing 2: This is a sample Map function
package io.com.mrbool import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MrBoolMap extends Mapper<LongWritable, Text, Text,Text> { // Create variables which will be used in the application Path keyfilepath; BufferedReader key_buff_reader; Text tweet_val = new Text(); public void map(LongWritable key, Text value, Context context) { try{ // Create configuration Configuration config = new Configuration(); // Load core files in configuration config.addResource(new Path("/usr/etc/hadoop/core-site.xml")); config.addResource(new Path("/usr/etc/hadoop/hdfs-site.xml")); // Open file keyfilepath=new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/files/keywords.txt"); FileSystem file_sys = FileSystem.get(URI.create("files/keywords.txt"),new Configuration()); // Load in buffer key_buff_reader=new BufferedReader(new InputStreamReader(file_sys.open(keyfilepath))); String key_word = ""; while(key_buff_reader.ready()) { key_word=key_buff_reader.readLine().trim(); } final Text ers_key = new Text(key_word); if(value == null) { return; } else{ StringTokenizer str_tokens = new StringTokenizer(value.toString(),","); int cnt = 0; while(str_tokens.hasMoreTokens()) { cnt ++; if(cnt <=1) continue; String new_tweet = str_tokens.nextToken().toLowerCase().trim().replaceAll("\\*",""); if(new_tweet.contains(key_word.toLowerCase().trim())) { tweet_val.set(new_tweet); context.write(ers_key,tweet_val); } } } } catch(Exception e){ e.printStackTrace(); } } }
The Reduce () function
The reduce () function is the second phase in map-reduce programming model. So after mapping is complete, the reduce () function operates on the intermediate data set by retrieving them from disk/memory or any other place. The final result from reduce () function is consolidating the data from all processes.
Listing 3: Following is a sample Reduce function
package io.com.mrbool import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.RandomAccess; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MrBoolReducer extends Reducer<Text,Text,Text,Text> implements RandomAccess { Path pos_file_path; Path neg_file_path; Path out_file_path; Path kword_file_path; BufferedReader pos_buff_reader; BufferedReader neg_buff_reader; BufferedReader key_buff_reader; static Double tot_rec=new Double("0"); static Double neg_cnt=new Double("0"); static Double pos_cnt=new Double("0"); static Double neu_cnt=new Double("0"); static Double neg_percent=new Double("0"); static Double pos_percent=new Double("0"); static Double neu_percent=new Double("0"); Pattern pattrn; Matcher matcher_one; static int row_new=0; FSDataOutputStream out_one,out_two; public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { Configuration conf_one = new Configuration(); conf_one.addResource(new Path("/usr/etc/hadoop/core-site.xml")); conf_one.addResource(new Path("/usr/etc/hadoop/hdfs-site.xml")); kword_file_path=new Path("/usr/inputs/workspace/SentimentAnalysis_Twitter/files/keywords.txt"); FileSystem fskeyread = FileSystem.get(URI.create("files/keywords.txt"),new Configuration()); key_buff_reader=new BufferedReader(new InputStreamReader(fskeyread.open(kword_file_path))); String keywrd = ""; while(key_buff_reader.ready()) { keywrd=key_buff_reader.readLine().trim(); } String check_one=keywrd; FileSystem file_sys = FileSystem.get(conf_one); FileSystem fileSys_PosNeg = FileSystem.get(conf_one); Path path_one = new Path("/user/sentimentoutput.txt"); Path path_Pos_Neg = new Path("/user/posnegetiveoutput.txt"); if (!file_sys.exists(path_one)) { out_one = file_sys.create(path_one); out_two = fileSys_PosNeg.create(path_Pos_Neg); } if(check_one.equals(key.toString().toLowerCase())) { for(Text twit_new:values) { // Load positive dictionary file pos_file_path=new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/files/positive-words.txt"); FileSystem fs_one = FileSystem.get(URI.create("files/positive-words.txt"),new Configuration()); pos_buff_reader=new BufferedReader(new InputStreamReader(fs_one.open(pos_file_path))); // Load negative dictionary file neg_file_path = new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/files/negative-words.txt"); FileSystem fs_two = FileSystem.get(URI.create("files/negative-words.txt"),new Configuration()); neg_buff_reader =new BufferedReader(new InputStreamReader(fs_two.open(neg_file_path))); ++tot_rec; boolean flag_one=false; boolean flag_two=false; String mytwit_all=twit_new.toString(); String regex_one = ""; String regex_two = ""; while(pos_buff_reader.ready()) { regex_one=pos_buff_reader.readLine().trim(); row_new++; pattrn = Pattern.compile(regex_one, Pattern.CASE_INSENSITIVE); matcher_one = pattrn.matcher(mytwit_all); flag_one=matcher_one.find(); if(flag_one) { out_two.writeBytes(mytwit_all); context.write(new Text(regex_one),new Text(mytwit_all)); break; } } while(neg_buff_reader.ready()) { row_new++; regex_two=neg_buff_reader.readLine().trim(); pattrn = Pattern.compile(regex_two, Pattern.CASE_INSENSITIVE); matcher_one = pattrn.matcher(mytwit_all); flag_two=matcher_one.find(); if(flag_two) { out_two.writeBytes(mytwit_all); context.write(new Text(regex_two),new Text(mytwit_all)); break; } } if(flag_one&flag_two) { ++neu_cnt; } else { if(flag_one) { ++pos_cnt; } if(flag_two) { ++neg_cnt; } if(flag_one==false&flag_two==false) { ++neu_cnt; } } neg_buff_reader.close(); pos_buff_reader.close(); }//for pos_percent=pos_cnt/tot_rec*100; neg_percent=neg_cnt/tot_rec*100; neu_percent=neu_cnt/tot_rec*100; try{ out_one.writeBytes("\n"+keywrd); out_one.writeBytes(","+tot_rec); out_one.writeBytes(","+neg_percent); out_one.writeBytes(","+pos_percent); out_one.writeBytes(","+neu_percent); out_one.close(); file_sys.close(); }catch(Exception e){ e.printStackTrace(); } } } }
Conclusion
In this article we have discussed the MapReduce programming implementation using java. The framework is used to process large amount of data in distributed nodes. The scalability and extensibility is the main advantage of MapReduce programming model. And the other differentiator is that the processing is on the location where data resides and hence it is fast and efficient.