Hi All,

I am trying to read a orc file and  perform groupBy operation on it , but
When i run it on a large data set we are facing the following error
message.

Input format of INPUT DATA

|178111256|  107125374|
|178111256|  107148618|
|178111256|  107175361|
|178111256|  107189910|

schema ,

int
int

and we are try to group by the first column.

But as per the logic and syntax the code is appropriate but it is  working
well on small data set. I have attached the code in the text file.

Thank you for your time.

ERROR MESSAGE:
Error: java.lang.ArrayIndexOutOfBoundsException at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349)
at java.io.DataOutputStream.writeByte(DataOutputStream.java:153) at
org.apache.hadoop.io.WritableUtils.writeVLong(WritableUtils.java:273) at
org.apache.hadoop.io.WritableUtils.writeVInt(WritableUtils.java:253) at
org.apache.hadoop.io.Text.write(Text.java:330) at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1149)
at
org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610)
at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:73) at
orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:39) at
org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:422) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)

-- 
REGARDS
BALAKUMAR SEETHARAMAN
package orc_groupby.orc_groupby;


import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Orc_groupBy extends Configured  implements Tool {
        
    public static int A_ID=0;
    public static int B_ID=1;

    public static class MyMapper<K , V extends Writable>
    extends MapReduceBase implements Mapper<K, OrcStruct, Text, Text> {

        private StructObjectInspector oip;
        private final OrcSerde serde = new OrcSerde();

        public void configure(JobConf job) {
            Properties table = new Properties();
            table.setProperty("columns", "viewedid,viewerid");
            table.setProperty("columns.types", "int,int");

            serde.initialize(job, table);

            try {
                oip = (StructObjectInspector) serde.getObjectInspector();
            } catch (SerDeException e) {
                e.printStackTrace();
            }
        }
        public void map(K key, OrcStruct val,
                OutputCollector<Text, Text> output, Reporter reporter)
                        throws IOException {
            List<? extends StructField> fields =oip.getAllStructFieldRefs();
            WritableIntObjectInspector bInspector =  
(WritableIntObjectInspector) fields.get(B_ID).getFieldObjectInspector();
            String a = "";
            String b = "";
            try {
                a =  bInspector.getPrimitiveJavaObject( 
oip.getStructFieldData(serde.deserialize(val), fields.get(A_ID))).toString();
                b =  
bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val),
 fields.get(B_ID))).toString();
                //System.out.print("A="+a+" B="+b);
                //System.exit(0);
            } catch (SerDeException e1) {
                e1.printStackTrace();
            }
            output.collect(new Text(a), new Text(b));
           
        }
    }


    public static class MyReducer<K, V extends Writable>
    extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter)
                        throws IOException {

            String toout = "";
            while (values.hasNext()) {
                String value = values.next().toString();
                toout += ","+value.toString();
            }
            output.collect(key, new Text(toout));
        }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Orc_groupBy(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {
        /*
        //remove output directory
        Runtime r = Runtime.getRuntime();
        Process p = r.exec("rm -rf "+args[1]);
        p.waitFor();
       
        BufferedReader b = new BufferedReader(new 
InputStreamReader(p.getInputStream()));
        String line = "";

        while ((line = b.readLine()) != null) {
            System.out.println(line);
        }
        b.close();
        */

        JobConf job = new JobConf(new Configuration(), Orc_groupBy.class);

        // Specify various job-specific parameters    
        job.setJobName("myjob");
       // job.set("mapreduce.framework.name","local");
       // job.set("fs.default.name","hdfs:///");
        job.set("log4j.logger.org.apache.hadoop","INFO");
        job.set("log4j.logger.org.apache.hadoop","INFO");
        
     /* job.set("mapreduce.map.sort.spill.percent","0.8");
        job.set("mapreduce.task.io.sort.factor","10");
        job.set("mapreduce.task.io.sort.mb","100");
        job.set("mapred.map.multithreadedrunner.threads","1");
        job.set("mapreduce.mapper.multithreadedmapper.threads","1");
        */

        //push down projection columns
        //job.set("hive.io.file.readcolumn.ids","0,1");
        //job.set("hive.io.file.read.all.columns","false");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(Orc_groupBy.MyMapper.class);
        job.setReducerClass(Orc_groupBy.MyReducer.class);


        job.setInputFormat(OrcInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        JobClient.runJob(job);
        return 0;
    }

}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to