Hi Friends,
I have to sort huge amount of data in minimum possible time probably using
partitioning. The key is composed of 3 fields(partition, text and number).
This is how partition is defined:
- Partition "1" for range 1-10
- Partition "2" for range 11-20
- Partition "3" for range 21-30
*I/P file format*: partition[tab]text[tab]range-start[tab]range-end
[cloudera@localhost kMer2]$ cat input1
- 1 chr1 1 10
- 1 chr1 2 8
- 2 chr1 11 18
[cloudera@localhost kMer2]$ cat input2
- 1 chr1 3 7
- 2 chr1 12 19
[cloudera@localhost kMer2]$ cat input3
- 3 chr1 22 30
[cloudera@localhost kMer2]$ cat input4
- 3 chr1 22 30
- 1 chr1 9 10
- 2 chr1 15 16
Then I ran following command:
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
\
-D stream.map.output.field.separator='\t' \
-D stream.num.map.output.key.fields=3 \
-D map.output.key.field.separator='\t' \
-D mapred.text.key.partitioner.options=-k1 \
-D mapred.reduce.tasks=3 \
-input /usr/pkansal/kMer2/ip \
-output /usr/pkansal/kMer2/op \
-mapper /home/cloudera/kMer2/kMer2Map.py \
-file /home/cloudera/kMer2/kMer2Map.py \
-reducer /home/cloudera/kMer2/kMer2Red.py \
-file /home/cloudera/kMer2/kMer2Red.py
Both mapper and reducer scripts just contain one line of code:
for line in sys.stdin:
line = line.strip()
print "%s" % (line)
Following is the o/p:
[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00000
- 2 chr1 12 19
- 2 chr1 15 16
- 3 chr1 22 30
- 3 chr1 22 30
[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00001
- 1 chr1 2 8
- 1 chr1 3 7
- 1 chr1 9 10
- 2 chr1 11 18
[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00002
- 1 chr1 1 10
- 3 chr1 22 29
This is not the o/p which I expected. I expected all records with:
- partition 1 in one single file eg part-m-00000
- partition 2 in one single file eg part-m-00001
- partition 3 in one single file eg part-m-00002
Can you please suggest if I am doing it in a right way?
--
Regards,
Piyush Kansal