Free Online Courses for Software Developers - MrBool
× Please, log in to give us a feedback. Click here to login
×

You must be logged to download. Click here to login

×

MrBool is totally free and you can help us to help the Developers Community around the world

Yes, I'd like to help the MrBool and the Developers Community before download

No, I'd like to download without make the donation

×

MrBool is totally free and you can help us to help the Developers Community around the world

Yes, I'd like to help the MrBool and the Developers Community before download

No, I'd like to download without make the donation

Map Reduce programming in java

In this article we will discuss about java map reduce programming. Map reduce is a special type of programming framework which is generally used for parallel processing of huge amount of data.

Introduction

MapReduce can be defined as a programming model specifically designed to process huge amount of data in parallel.

Description

MapReduce framework is a light weight framework used for processing huge amount of data so we need to understand that the model is efficient only if we have multiple commodity servers and the processing is working in a distributed manner on all the servers. The framework is scalable, fault tolerant and easily extendable. We will now write the following functions to crate a complete example

  • Write a driver class
  • Write a Map class
  • Write a Reducer class

Listing 1: Following is the driver class for starting Map-Reduce program

package io.com.mrbool

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MrBoolDriver {	
	public static void main(String[] args) throws Exception {		
	
		// Create config and load xml files	
		Configuration config = new Configuration();
		 
		config.addResource(new Path("/usr/etc/hadoop/core-site.xml"));
	    config.addResource(new Path("/usr/etc/hadoop/hdfs-site.xml"));
	              
		// Create MapReduce job 
        Job mapjob = new Job(config,"MrBoolDriver.class");
        mapjob.setJarByClass(MrBoolDriver.class);       
        mapjob.setJobName("Mrbool MapReduce");                  
       
	    // Set output kay and value class
		mapjob.setOutputKeyClass(Text.class);
		mapjob.setOutputValueClass(Text.class);

		// Set Map class
		mapjob.setMapperClass(MrBoolMap.class);
		mapjob.setNumReduceTasks(30);
		
		// Set reducer class
		mapjob.setCombinerClass(MrBoolReducer.class);       
		mapjob.setReducerClass(MrBoolReducer.class);     

		mapjob.setMapOutputKeyClass(Text.class);
		mapjob.setMapOutputValueClass(Text.class);
       
		// Set number of reducer tasks
		mapjob.setNumReduceTasks(10);

		mapjob.setInputFormatClass(TextInputFormat.class);
		mapjob.setOutputFormatClass(TextOutputFormat.class);
       
		FileInputFormat.addInputPath(mapjob, new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/input/"));
		FileOutputFormat.setOutputPath(mapjob,new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/output"));       
		
		// Start MapReduce job
		mapjob.waitForCompletion(true);

	}
}

The map () function

The map () function is the first function where the processing starts. It forms the key-value pair on the received dataset and then splits it on distributed node of commodity servers. The map () function knows where the data resides on distributed file system and executes it on there. The advantage is that the processing of data is done on the location where the data resides. In normal programming paradigm the data is fetched to the location where the application s running and then the processing starts and it is a costly operation. But in map-reduce programming model the execution is done where the data resides and hence it is first and efficient.

Listing 2: This is a sample Map function

package io.com.mrbool

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MrBoolMap extends Mapper<LongWritable, Text, Text,Text> {
	
	// Create variables which will be used in the application
	Path keyfilepath;
	BufferedReader key_buff_reader;
	Text tweet_val = new Text();

	public void map(LongWritable key, Text value, Context context)  {		
		try{	
			
			// Create configuration
			Configuration config = new Configuration();
			 
		 	// Load core files in configuration
			config.addResource(new Path("/usr/etc/hadoop/core-site.xml"));
	        config.addResource(new Path("/usr/etc/hadoop/hdfs-site.xml"));			
			
			// Open file 
			keyfilepath=new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/files/keywords.txt");
            FileSystem file_sys = FileSystem.get(URI.create("files/keywords.txt"),new Configuration());
           
			// Load in buffer
            key_buff_reader=new BufferedReader(new InputStreamReader(file_sys.open(keyfilepath)));

            String key_word = "";
            
            while(key_buff_reader.ready())
            {            	
            	key_word=key_buff_reader.readLine().trim();           
            
            }
            
            final Text ers_key = new Text(key_word);            
            
            if(value == null)
				{
					return;
				} 
            else{			
					StringTokenizer str_tokens = new StringTokenizer(value.toString(),",");
					int cnt = 0;

					while(str_tokens.hasMoreTokens()) {
						cnt ++;
						if(cnt <=1)
						continue;

						String new_tweet = str_tokens.nextToken().toLowerCase().trim().replaceAll("\\*","");
						
						if(new_tweet.contains(key_word.toLowerCase().trim())) {									
							tweet_val.set(new_tweet);
							context.write(ers_key,tweet_val);
						}			
					}
				}
			}
		catch(Exception e){					
			e.printStackTrace();		
		}
	}
}

The Reduce () function

The reduce () function is the second phase in map-reduce programming model. So after mapping is complete, the reduce () function operates on the intermediate data set by retrieving them from disk/memory or any other place. The final result from reduce () function is consolidating the data from all processes.

Listing 3: Following is a sample Reduce function

package io.com.mrbool

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.RandomAccess;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MrBoolReducer extends Reducer<Text,Text,Text,Text> implements RandomAccess
{
	 	
	    Path pos_file_path;
	    Path neg_file_path;
	    Path out_file_path;
	    Path kword_file_path;
	    
	    BufferedReader pos_buff_reader;
	    BufferedReader neg_buff_reader;
	    BufferedReader key_buff_reader; 
	    
	    static Double tot_rec=new Double("0");	
	   
	    static Double neg_cnt=new Double("0");
	    static Double pos_cnt=new Double("0");
	    static Double neu_cnt=new Double("0");
	   
	    static Double neg_percent=new Double("0");
	    static Double pos_percent=new Double("0");
	    static Double neu_percent=new Double("0");

	    Pattern pattrn;
	    Matcher matcher_one;
	    static int row_new=0;
	    FSDataOutputStream out_one,out_two;	 

    public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException
    {          	
    	Configuration conf_one = new Configuration();
        conf_one.addResource(new Path("/usr/etc/hadoop/core-site.xml"));
        conf_one.addResource(new Path("/usr/etc/hadoop/hdfs-site.xml"));
    	
    	
    	kword_file_path=new Path("/usr/inputs/workspace/SentimentAnalysis_Twitter/files/keywords.txt");
        FileSystem fskeyread = FileSystem.get(URI.create("files/keywords.txt"),new Configuration());
        key_buff_reader=new BufferedReader(new InputStreamReader(fskeyread.open(kword_file_path)));
        
        String keywrd = "";
        while(key_buff_reader.ready())
        {
        	keywrd=key_buff_reader.readLine().trim();
        
        }
  
    	String check_one=keywrd;        
              
		FileSystem file_sys = FileSystem.get(conf_one);
		FileSystem fileSys_PosNeg = FileSystem.get(conf_one);
        
        Path path_one = new Path("/user/sentimentoutput.txt");
        Path path_Pos_Neg = new Path("/user/posnegetiveoutput.txt");
              
        if (!file_sys.exists(path_one)) {                       
            out_one = file_sys.create(path_one);
            out_two = fileSys_PosNeg.create(path_Pos_Neg);
           
        }             
                  
        if(check_one.equals(key.toString().toLowerCase()))
        {     	
        	for(Text twit_new:values)
            {	
            	// Load positive dictionary file
				pos_file_path=new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/files/positive-words.txt");
                FileSystem fs_one = FileSystem.get(URI.create("files/positive-words.txt"),new Configuration());
                pos_buff_reader=new BufferedReader(new InputStreamReader(fs_one.open(pos_file_path)));
              
                // Load negative dictionary file
                neg_file_path = new Path("/usr/inputs/workspace/Sent_Analysis_Twitter/files/negative-words.txt");
                FileSystem fs_two = FileSystem.get(URI.create("files/negative-words.txt"),new Configuration());
                neg_buff_reader =new BufferedReader(new InputStreamReader(fs_two.open(neg_file_path)));

                ++tot_rec;

                boolean flag_one=false;
                boolean flag_two=false;

                String mytwit_all=twit_new.toString();              
                String regex_one = "";
                String regex_two = "";

                while(pos_buff_reader.ready())
                {
                    regex_one=pos_buff_reader.readLine().trim();
                    row_new++;           
                    pattrn = Pattern.compile(regex_one, Pattern.CASE_INSENSITIVE);
                    matcher_one = pattrn.matcher(mytwit_all);
                    flag_one=matcher_one.find();

                    if(flag_one)
                    {  
                    	out_two.writeBytes(mytwit_all);
                        context.write(new Text(regex_one),new Text(mytwit_all));                  
                        break;
                    }       
                }
                while(neg_buff_reader.ready())
                {
                    row_new++;                    
                    regex_two=neg_buff_reader.readLine().trim();
                    pattrn = Pattern.compile(regex_two, Pattern.CASE_INSENSITIVE);
                    matcher_one = pattrn.matcher(mytwit_all);
                    flag_two=matcher_one.find();
                    if(flag_two)
                    {                    	
                    	out_two.writeBytes(mytwit_all);
                        context.write(new Text(regex_two),new Text(mytwit_all));
                        break;
                    }

                }
                if(flag_one&flag_two)
                {
                    ++neu_cnt;
                }
                else
                {
                    if(flag_one)
                    {
                        ++pos_cnt;
                    }
                    if(flag_two)
                    {
                        ++neg_cnt;
                    }
                    if(flag_one==false&flag_two==false)
                    {
                        ++neu_cnt;
                    }
                }
                neg_buff_reader.close();
                pos_buff_reader.close();

            }//for
            pos_percent=pos_cnt/tot_rec*100;
            neg_percent=neg_cnt/tot_rec*100;
            neu_percent=neu_cnt/tot_rec*100;

          try{        	  
				out_one.writeBytes("\n"+keywrd);
				out_one.writeBytes(","+tot_rec); 
				out_one.writeBytes(","+neg_percent); 
				out_one.writeBytes(","+pos_percent);
				out_one.writeBytes(","+neu_percent);
          
				out_one.close();
				file_sys.close();
          }catch(Exception e){
        	  e.printStackTrace();
          }

        }        
    }
}

Conclusion

In this article we have discussed the MapReduce programming implementation using java. The framework is used to process large amount of data in distributed nodes. The scalability and extensibility is the main advantage of MapReduce programming model. And the other differentiator is that the processing is on the location where data resides and hence it is fast and efficient.



Website: www.techalpine.com Have 16 years of experience as a technical architect and software consultant in enterprise application and product development. Have interest in new technology and innovation area along with technical...

What did you think of this post?
Services
[Close]
To have full access to this post (or download the associated files) you must have MrBool Credits.

  See the prices for this post in Mr.Bool Credits System below:

Individually – in this case the price for this post is US$ 0,00 (Buy it now)
in this case you will buy only this video by paying the full price with no discount.

Package of 10 credits - in this case the price for this post is US$ 0,00
This subscription is ideal if you want to download few videos. In this plan you will receive a discount of 50% in each video. Subscribe for this package!

Package of 50 credits – in this case the price for this post is US$ 0,00
This subscription is ideal if you want to download several videos. In this plan you will receive a discount of 83% in each video. Subscribe for this package!


> More info about MrBool Credits
[Close]
You must be logged to download.

Click here to login