Hi All,
I am trying to read a orc file and perform groupBy operation on it , but
When i run it on a large data set we are facing the following error
message.
Input format of INPUT DATA
|178111256| 107125374|
|178111256| 107148618|
|178111256| 107175361|
|178111256| 107189910|
schema ,
int
int
and we are try to group by the first column.
But as per the logic and syntax the code is appropriate but it is working
well on small data set. I have attached the code in the text file.
Thank you for your time.
ERROR MESSAGE:
Error: java.lang.ArrayIndexOutOfBoundsException at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349)
at java.io.DataOutputStream.writeByte(DataOutputStream.java:153) at
org.apache.hadoop.io.WritableUtils.writeVLong(WritableUtils.java:273) at
org.apache.hadoop.io.WritableUtils.writeVInt(WritableUtils.java:253) at
org.apache.hadoop.io.Text.write(Text.java:330) at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1149)
at
org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610)
at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:73) at
orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:39) at
org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:422) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
--
REGARDS
BALAKUMAR SEETHARAMAN
package orc_groupby.orc_groupby;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Orc_groupBy extends Configured implements Tool {
public static int A_ID=0;
public static int B_ID=1;
public static class MyMapper<K , V extends Writable>
extends MapReduceBase implements Mapper<K, OrcStruct, Text, Text> {
private StructObjectInspector oip;
private final OrcSerde serde = new OrcSerde();
public void configure(JobConf job) {
Properties table = new Properties();
table.setProperty("columns", "viewedid,viewerid");
table.setProperty("columns.types", "int,int");
serde.initialize(job, table);
try {
oip = (StructObjectInspector) serde.getObjectInspector();
} catch (SerDeException e) {
e.printStackTrace();
}
}
public void map(K key, OrcStruct val,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
List<? extends StructField> fields =oip.getAllStructFieldRefs();
WritableIntObjectInspector bInspector =
(WritableIntObjectInspector) fields.get(B_ID).getFieldObjectInspector();
String a = "";
String b = "";
try {
a = bInspector.getPrimitiveJavaObject(
oip.getStructFieldData(serde.deserialize(val), fields.get(A_ID))).toString();
b =
bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val),
fields.get(B_ID))).toString();
//System.out.print("A="+a+" B="+b);
//System.exit(0);
} catch (SerDeException e1) {
e1.printStackTrace();
}
output.collect(new Text(a), new Text(b));
}
}
public static class MyReducer<K, V extends Writable>
extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter)
throws IOException {
String toout = "";
while (values.hasNext()) {
String value = values.next().toString();
toout += ","+value.toString();
}
output.collect(key, new Text(toout));
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Orc_groupBy(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
/*
//remove output directory
Runtime r = Runtime.getRuntime();
Process p = r.exec("rm -rf "+args[1]);
p.waitFor();
BufferedReader b = new BufferedReader(new
InputStreamReader(p.getInputStream()));
String line = "";
while ((line = b.readLine()) != null) {
System.out.println(line);
}
b.close();
*/
JobConf job = new JobConf(new Configuration(), Orc_groupBy.class);
// Specify various job-specific parameters
job.setJobName("myjob");
// job.set("mapreduce.framework.name","local");
// job.set("fs.default.name","hdfs:///");
job.set("log4j.logger.org.apache.hadoop","INFO");
job.set("log4j.logger.org.apache.hadoop","INFO");
/* job.set("mapreduce.map.sort.spill.percent","0.8");
job.set("mapreduce.task.io.sort.factor","10");
job.set("mapreduce.task.io.sort.mb","100");
job.set("mapred.map.multithreadedrunner.threads","1");
job.set("mapreduce.mapper.multithreadedmapper.threads","1");
*/
//push down projection columns
//job.set("hive.io.file.readcolumn.ids","0,1");
//job.set("hive.io.file.read.all.columns","false");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(Orc_groupBy.MyMapper.class);
job.setReducerClass(Orc_groupBy.MyReducer.class);
job.setInputFormat(OrcInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
JobClient.runJob(job);
return 0;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]