Hi,
I wrote a custom InputFormat for parsing through the Enron Email corpus which
is attached in the file named EmailInputFormat
I have attached the code in a text file with the sample input mail also
attached as a text document
The EmailClass extends Writable and implements all the methods needed to be
implemented and also contains an initiate function to initialize the values in
that class.
This initiate method looks is written in the EmailClass.java
The above method is called by nextKeyValue method which is written in the
EmailRecordReader.txt
------------------------------------
Question:
1. Is it a feasible to build large custom objects within nextKeyValue() to run
in Hadoop?
2. MR program which does a simple task of emitting message-id and from field
email-id from enron corpus of 6 lakh emails merged into one file (174 MB) takes
around 50 minutes on a pseudo node cluster. This is very very slow.
Please help me in this aspect too.
3. Can static field of value in EMailRecordReader help in this situation?
Thanks in advance,
Varad.
------------------------------------
Varad Meru| Software Engineer
[email protected]
Persistent Systems and Solution Ltd. | Partners in Innovation |
www.persistentsys.com
DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the
property of Persistent Systems Ltd. It is intended only for the use of the
individual or entity to which it is addressed. If you are not the intended
recipient, you are not authorized to read, retain, copy, print, distribute or
use this message. If you have received this communication in error, please
notify the sender and delete all copies of this message. Persistent Systems
Ltd. does not accept any liability for virus infected mails.
package EnronAssignment.src.enronassignment3.input;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Writable;
/**
* @author Varad Meru
* @organisation Persistent Systems and Solutions Ltd.
* @see EnronCorpus
* @category Email Analysis using Hadoop
* @performs This class specifies our e-mail object. Thus becoming a standard
* format which will be used in our programs future extensions and
* with other kind of data that we get with our
*
*/
public class EmailClass implements Writable {
private String originalSeparator = "Original Message";
private String forwardedSeparator = "Forwarded by";
private String startTag = "<message-id>";
private String endTag = "</message-id>";
private String noInput = "NO_INPUT_DATA";
public String messageID = noInput;
public String date = noInput;
public String fromEmailID = noInput;
public List<String> toEmailID;
public String subject = noInput;
public List<String> ccEmailID;
public double MIMEVersion = 1.0;
public String content_Type = noInput;
public String content_Transfer_Encoding = noInput;
public List<String> bccEmailID;
public String xfromEmailID = noInput;
public List<String> xtoEmailID;
public List<String> xccEmailID;
public List<String> xbccEmailID;
public String xFolder = noInput;
public String xOrigin = noInput;
public String xFileName = noInput;
public String messageContent = "";
public List<String> attachedFiles;
public String forwardedAttachement = noInput;
public String originalMessage = noInput;
/**
* Default Constructor
*/
public EmailClass() {
}
// A constructor with Fields
/**
* @param messageID
* @param date
* @param fromEmailID
* @param toEmailID
* @param subject
* @param ccEmailID
* @param contentType
* @param contentTransferEncoding
* @param bccEmailID
* @param xfromEmailID
* @param xtoEmailID
* @param xccEmailID
* @param xbccEmailID
* @param xFolder
* @param xOrgin
* @param xFileName
* @param messageContent
* @param attachedFiles
*/
public EmailClass(String messageID, String date, String fromEmailID,
List<String> toEmailID, String subject, List<String>
ccEmailID,
String contentType, String contentTransferEncoding,
List<String> bccEmailID, String xfromEmailID,
List<String> xtoEmailID, List<String> xccEmailID,
List<String> xbccEmailID, String xFolder, String xOrgin,
String xFileName, String messageContent, List<String>
attachedFiles) {
this.messageID = messageID;
this.date = date;
this.fromEmailID = fromEmailID;
this.toEmailID = toEmailID;
this.subject = subject;
this.ccEmailID = ccEmailID;
content_Type = contentType;
content_Transfer_Encoding = contentTransferEncoding;
this.bccEmailID = bccEmailID;
this.xfromEmailID = xfromEmailID;
this.xtoEmailID = xtoEmailID;
this.xccEmailID = xccEmailID;
this.xbccEmailID = xbccEmailID;
this.xFolder = xFolder;
this.xOrigin = xOrgin;
this.xFileName = xFileName;
this.messageContent = messageContent;
this.attachedFiles = attachedFiles;
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
String tempString;
String[] splits = null;
if ((tempString = in.readLine()) == startTag) {
// Message ID
tempString = in.readLine();
splits = tempString.split(":");
this.messageID = splits[1];
// Date of email
tempString = in.readLine();
splits = tempString.split(":");
this.date = splits[1];
// From email id
tempString = in.readLine();
splits = tempString.split(":");
this.fromEmailID = splits[1];
// for getting the "To" list
{
tempString = in.readLine();
splits = tempString.split(":");
String toList = splits[1];
while (!(tempString =
in.readLine()).contains("Subject:")) {
toList += tempString;
// got a whole line of To's in one line
}
StringTokenizer stringTokenizer = new
StringTokenizer(toList,
",");
this.toEmailID = new ArrayList<String>();
while (stringTokenizer.hasMoreTokens()) {
this.toEmailID.add(stringTokenizer.nextToken());
// for inserting all the emails in the
"To" string into the
}
}
// For getting the subject
splits = tempString.split(":");
this.subject = splits[1];
// for getting the Cc list
if ((tempString = in.readLine()).contains("Cc:")) {
splits = tempString.split(":");
String ccList = splits[1];
while (!(tempString =
in.readLine()).contains("Mime-Version:")) {
ccList += tempString;
// got a whole line of To's in one line
}
StringTokenizer stringTokenizer = new
StringTokenizer(ccList,
",");
this.ccEmailID = new ArrayList<String>();
while (stringTokenizer.hasMoreTokens()) {
this.ccEmailID.add(stringTokenizer.nextToken());
// for inserting all the emails in the
"To" string into the
}
}
// Content-Type of the Email
tempString = in.readLine(); // skipped MIME-Version
splits = tempString.split(":");
this.content_Type = splits[1];
// Content-Transfer-Encoding of the Email
tempString = in.readLine();
splits = tempString.split(":");
this.content_Transfer_Encoding = splits[1];
// get the Bcc list
if ((tempString = in.readLine()).contains("Bcc:")) {
splits = tempString.split(":");
String bccList = splits[1];
while (!(tempString =
in.readLine()).contains("X-From:")) {
bccList += tempString;
// got a whole line of To's in one line
}
StringTokenizer stringTokenizer = new
StringTokenizer(bccList,
",");
this.bccEmailID = new ArrayList<String>();
while (stringTokenizer.hasMoreTokens()) {
this.bccEmailID.add(stringTokenizer.nextToken());
}
}
// for getting the xfrom
splits = tempString.split(":");
this.xfromEmailID = splits[1];
// for getting xto
{
tempString = in.readLine();
splits = tempString.split(":");
String xtoList = splits[1];
while (!(tempString =
in.readLine()).contains("X-cc")) {
xtoList += tempString;
// got a whole line of To's in one line
}
StringTokenizer stringTokenizer = new
StringTokenizer(xtoList,
",");
this.xtoEmailID = new ArrayList<String>();
while (stringTokenizer.hasMoreTokens()) {
this.xtoEmailID.add(stringTokenizer.nextToken());
}
}
{
splits = tempString.split(":");
String xccList = splits[1];
while (!(tempString =
in.readLine()).contains("X-bcc")) {
xccList += tempString;
// got a whole line of To's in one line
}
StringTokenizer stringTokenizer = new
StringTokenizer(xccList,
",");
this.xccEmailID = new ArrayList<String>();
while (stringTokenizer.hasMoreTokens()) {
this.xccEmailID.add(stringTokenizer.nextToken());
}
}
{
splits = tempString.split(":");
String xbccList = splits[1];
while (!(tempString =
in.readLine()).contains("X-Folder")) {
xbccList += tempString;
// got a whole line of To's in one line
}
StringTokenizer stringTokenizer = new
StringTokenizer(xbccList,
",");
this.xbccEmailID = new ArrayList<String>();
while (stringTokenizer.hasMoreTokens()) {
this.xbccEmailID.add(stringTokenizer.nextToken());
}
}
// for getting the xFolder
splits = tempString.split(":");
this.xFolder = splits[1];
// xOrigin
tempString = in.readLine();
splits = tempString.split(":");
this.xOrigin = splits[1];
// xFileName
tempString = in.readLine();
splits = tempString.split(":");
this.xFileName = splits[1];
// get MessageContent
tempString = in.readLine();
String tempMessageContent = null;
while (tempString != originalSeparator
|| tempString != forwardedSeparator ||
tempString != endTag) {
tempMessageContent += tempString.trim();
tempString = in.readLine();
}
this.messageContent = tempMessageContent;
/*
* insert into the original attachment file - in the
case of reply
* to an original email
*/
if (tempString.contains(originalSeparator)) {
String tempRepliedMail = null;
while (!((tempString = in.readLine()) !=
endTag)) {
tempRepliedMail += tempString;
// got a whole line of To's in one line
}
this.originalMessage = tempRepliedMail;
}
if (tempString.contains(forwardedSeparator)) {
String tempForwardedMail = null;
while (!((tempString = in.readLine()) !=
endTag)) {
tempForwardedMail += tempString;
// got a whole line of To's in one line
}
this.forwardedAttachement = tempForwardedMail;
}
if (tempString.contains(endTag)) {
// Do Nothing ...
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(messageID);
out.writeUTF(date);
out.writeUTF(fromEmailID);
out.writeUTF(writeList(toEmailID));
out.writeUTF(subject);
// Not in all the mails
if (ccEmailID != null)
out.writeUTF(writeList(ccEmailID));
out.writeDouble(MIMEVersion);
out.writeUTF(content_Type);
out.writeUTF(content_Transfer_Encoding);
// Not in all the mails
if (bccEmailID != null)
out.writeUTF(writeList(bccEmailID));
out.writeUTF(xfromEmailID);
out.writeUTF(writeList(xtoEmailID));
out.writeUTF(writeList(xccEmailID));
out.writeUTF(writeList(xbccEmailID));
out.writeUTF(xFolder);
out.writeUTF(xOrigin);
out.writeUTF(xFileName);
out.writeUTF(messageContent);
out.writeUTF(writeList(attachedFiles));
out.writeUTF(forwardedAttachement);
out.writeUTF(originalMessage);
}
/**
* @param inputStringList
* @return tempString
*/
private String writeList(List<String> inputStringList) {
String tempString = "";
for (String string : inputStringList) {
tempString += string + ",";
}
tempString = tempString.substring(0, tempString.length() - 2);
return tempString;
}
/**
* @param inputData
* @param i
* @param length
* @throws IOException
*/
public void initiate(byte[] inputData, int i, int length)
throws IOException {
String oneEmail = new String(inputData);
// One whole mail put into one string containing \n as its
linebreaker
StringTokenizer stringTokenizer = new StringTokenizer(oneEmail,
"\n");
// tokenizer is used to fetch the mail contents and store into
our
// object
String tempString;
String[] splits;
String[] splits1;
// It'll give us separate lines
/*
* ***************************************************
* Only the message id is being read ... try to build the loop
inside
* the if rather than the other way round
* ***************************************************
*/
tempString = stringTokenizer.nextToken();
// tempString holds the start tag now inside it ...
while (stringTokenizer.hasMoreTokens()) {
// skipped the start tag for the first time and then
read on
tempString = stringTokenizer.nextToken();
// get line after line in the mail between the
<message-id> and
// the </message-id> and parse through it to make sense
of the
// mail. The following parser parser through each tag
and stores
// in the member variables of the class
// Finding if the line contains a Message field
if (tempString.contains("Message-ID:")) {
splits = tempString.split(":");
this.messageID = splits[1];
}
if (tempString.contains("Date:")) {
splits = tempString.split(":");
this.date = splits[1];
}
if (tempString.contains("From:") &&
!tempString.contains("-From")) {
splits = tempString.split(":");
this.fromEmailID = splits[1];
}
if (tempString.contains("To:") &&
!tempString.contains("-To:")) {
splits = tempString.split(":");
String toList = splits[1];
whileLoop: while (true) {
if ((tempString.contains("Subject:"))
||
(tempString.contains("Mime-Version:"))
||
(tempString.contains("Cc:"))
||
(tempString.contains("File:"))
&&
!tempString.contains("-cc:")) {
break whileLoop;
}
tempString =
stringTokenizer.nextToken();
toList += tempString;
}
StringTokenizer stringTokenizer1 = new
StringTokenizer(toList,
",");
this.toEmailID = new ArrayList<String>();
while (stringTokenizer1.hasMoreTokens()) {
this.toEmailID.add(stringTokenizer1.nextToken());
}
}
if (tempString.contains("Subject:")) {
splits1 = tempString.split(":");
this.subject = splits1[1];
}
if (tempString.contains("Cc:") &&
!tempString.contains("-cc:")) {
splits1 = tempString.split(":");
String ccList = splits1[1];
whileLoop: while (true) {
if (tempString.contains("Mime-Version:")
||
tempString.contains("Subject:")) {
break whileLoop;
}
tempString =
stringTokenizer.nextToken();
ccList += tempString;
}
String[] splitsString = ccList.split(",");
this.ccEmailID = new ArrayList<String>();
for (int j = 0; j < splitsString.length; j++) {
this.ccEmailID.add(splitsString[j]);
}
for (int j = 0; j < splitsString.length; j++) {
}
}
if (tempString.contains("Content-Type:")) {
splits = tempString.split(":");
this.content_Type = splits[1];
}
if (tempString.contains("Content-Transfer-Encoding:")) {
splits = tempString.split(":");
this.content_Transfer_Encoding = splits[1];
}
if (tempString.contains("Bcc:") &&
!tempString.contains("-bcc:")) {
splits1 = tempString.split(":");
String bccList = splits1[1];
whileLoop: while (true) {
if (tempString.contains("X-From:")) {
break whileLoop;
}
tempString =
stringTokenizer.nextToken();
bccList += tempString;
}
StringTokenizer stringTokenizer3 = new
StringTokenizer(bccList,
",");
this.bccEmailID = new ArrayList<String>();
while (stringTokenizer3.hasMoreTokens()) {
this.bccEmailID.add(stringTokenizer3.nextToken());
}
}
if (tempString.contains("X-From:")) {
splits1 = tempString.split(":");
this.xfromEmailID = splits1[1];
}
if (tempString.contains("X-To:")) {
splits1 = tempString.split(":");
String xToList = splits1[1];
whileLoop: while (true) {
if (tempString.contains("X-cc:")) {
break whileLoop;
}
tempString =
stringTokenizer.nextToken();
xToList += tempString;
}
StringTokenizer stringTokenizer3 = new
StringTokenizer(xToList,
">,");
this.xtoEmailID = new ArrayList<String>();
while (stringTokenizer3.hasMoreTokens()) {
this.xtoEmailID.add(stringTokenizer3.nextToken());
}
}
if (tempString.contains("X-cc:")) {
splits1 = tempString.split(":");
String xCcList = splits1[1];
whileLoop: while (true) {
if (tempString.contains("X-bcc:")) {
break whileLoop;
}
tempString =
stringTokenizer.nextToken();
xCcList += tempString;
}
StringTokenizer stringTokenizer3 = new
StringTokenizer(xCcList,
",");
this.xccEmailID = new ArrayList<String>();
while (stringTokenizer3.hasMoreTokens()) {
this.xccEmailID.add(stringTokenizer3.nextToken());
}
}
if (tempString.contains("X-bcc:")) {
splits1 = tempString.split(":");
String xBccList = splits1[1];
whileLoop: while (true) {
if (tempString.contains("X-Folder:")) {
break whileLoop;
}
tempString =
stringTokenizer.nextToken();
xBccList += tempString;
}
StringTokenizer stringTokenizer3 = new
StringTokenizer(
xBccList, ",");
this.xbccEmailID = new ArrayList<String>();
while (stringTokenizer3.hasMoreTokens()) {
this.xbccEmailID.add(stringTokenizer3.nextToken());
}
}
if (tempString.contains("X-Folder:")) {
splits1 = tempString.split(":");
this.xFolder = splits1[1];
}
if (tempString.contains("X-Origin:")) {
splits = tempString.split(":");
this.xOrigin = splits[1];
}
if (tempString.contains("X-FileName:")) {
splits = tempString.split(":");
this.xFileName = splits[1];
}
if (tempString.contains(forwardedSeparator)) {
while (stringTokenizer.hasMoreTokens()) {
tempString =
stringTokenizer.nextToken();
this.forwardedAttachement += tempString;
}
}
if (tempString.contains(originalSeparator)) {
while (stringTokenizer.hasMoreTokens()) {
tempString =
stringTokenizer.nextToken();
this.originalMessage += tempString;
}
}
// if we dont get any hit .. then add it as message
content
this.messageContent += tempString;
}
/*
* insert into the original attachment file - in the case of
reply to an
* original email
*/
if (tempString.contains(originalSeparator)) {
String tempRepliedMail = null;
while (!((tempString = stringTokenizer.nextToken()) !=
endTag)) {
tempRepliedMail += tempString;
// got a whole line of To's in one line
}
this.originalMessage = tempRepliedMail;
}
if (tempString.contains(forwardedSeparator)) {
String tempForwardedMail = null;
while (!((tempString = stringTokenizer.nextToken()) !=
endTag)) {
tempForwardedMail += tempString;
// got a whole line of To's in one line
}
this.forwardedAttachement = tempForwardedMail;
}
}
}
package EnronAssignment.src.enronassignment3.input;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class EmailInputFormat extends FileInputFormat<LongWritable, EmailClass>
{
@Override
public RecordReader<LongWritable, EmailClass> createRecordReader(
InputSplit split, TaskAttemptContext context) throws
IOException,
InterruptedException {
context.setStatus(split.toString());
return new EmailRecordReader();
// It returns the object of the EmailRecordReader when called
by the
// hadoop JVM for reading each record.
}
}
package EnronAssignment.src.enronassignment3.input;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class EmailRecordReader extends RecordReader<LongWritable, EmailClass> {
private byte[] startTag;
private byte[] endTag;
public static final String stTag = "<message-id>";
public static final String enTag = "</message-id>";
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
static private EmailClass value = new EmailClass();
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public EmailClass getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (fsin.getPos() - start) / (float) (end - start);
}
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws
IOException,
InterruptedException {
FileSplit fileSplit = (FileSplit) inputSplit;
startTag = stTag.getBytes("utf-8");
endTag = enTag.getBytes("utf-8");
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(taskAttemptContext
.getConfiguration());
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
System.out.println("Call to
nextKeyValue");
value.initiate(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
package EnronAssignment.src.enronassignment3.reduce;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class EmailReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(
Text _key,
java.lang.Iterable<Text> _values,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text,
Text>.Context output)
throws IOException, InterruptedException {
for (Text text : _values) {
output.write(_key, text);
}
}
}
package EnronAssignment.src.enronassignment3.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import EnronAssignment.src.enronassignment3.input.EmailClass;
public class EmailMapper extends Mapper<LongWritable, EmailClass, Text, Text> {
@Override
protected void map(
LongWritable key,
EmailClass value,
org.apache.hadoop.mapreduce.Mapper<LongWritable,
EmailClass, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(new Text(value.messageID), new
Text(value.fromEmailID));
// System.out.println(value.messageID + " " +
value.fromEmailID);
}
}
package EnronAssignment.src.enronassignment3.driver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import EnronAssignment.src.enronassignment3.input.EmailInputFormat;
import EnronAssignment.src.enronassignment3.mapper.EmailMapper;
import EnronAssignment.src.enronassignment3.reduce.EmailReducer;
public class DriverClassForEmailObj {
// //////////////
// MAIN METHOD //
// //////////////
/**
* @param args
* @throws InterruptedException
* @throws ClassNotFoundException
* @throws IOException
*/
public static void main(String[] args) throws InterruptedException,
ClassNotFoundException, IOException {
/*
* Setting the Configurations for the job
*/
Configuration configuration = new Configuration();
String[] otherArgs = null;
try {
otherArgs = new GenericOptionsParser(configuration,
args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: lineitem <in1>
<out>");
System.exit(2);
}
// Just a user message
System.out.println("Starting Job: enron processing");
// configuring the threshold when reducer should start
collecting
// the intermediate <K,V> pairs
configuration.set("mapred.reduce.slowstart.completed.maps", "0.10");
// initialising the job
Job job = new Job(configuration, "enron");
/*
* Setting all the classes required by the job
*/
job.setJarByClass(DriverClassForEmailObj.class);
// Setting the input format for the MR job
job.setInputFormatClass(EmailInputFormat.class);
//job.setReducerClass(EmailReducer.class);
job.setMapperClass(EmailMapper.class);
job.setOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
// setting the number of reduce tasks
job.setNumReduceTasks(1);
/*
* Setting the input and output paths used for the
input directories
* and the output directory
*/
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submitting the job and waiting for the job to get
completed by
// pinging the master-node
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IOException e) {
System.err.println("error in File IO");
e.printStackTrace();
} catch (Exception e) {
System.err.println("error in Code");
e.printStackTrace();
}
}
}