Hey Steve,
Want to contribute it as an example to MR? Would love to help.
thanks,
Arun
On Jul 11, 2011, at 12:11 PM, Steve Lewis wrote:
> Look at this sample
> =============================================
> package org.systemsbiology.hadoop;
>
>
>
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.io.compress.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
>
> import java.io.*;
> import java.util.*;
>
> /**
> * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
> * Splitter that reads scan tags from an XML file
> * No assumption is made about lines but tage and end tags MUST look like
> <MyTag </MyTag> with no embedded spaces
> * usually you will subclass and hard code the tag you want to split on
> */
> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
> public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>
>
> private static final double SPLIT_SLOP = 1.1; // 10% slop
>
>
> public static final int BUFFER_SIZE = 4096;
>
> private final String m_BaseTag;
> private final String m_StartTag;
> private final String m_EndTag;
> private String m_Extension;
>
> public XMLTagInputFormat(final String pBaseTag) {
> m_BaseTag = pBaseTag;
> m_StartTag = "<" + pBaseTag;
> m_EndTag = "</" + pBaseTag + ">";
>
> }
>
> public String getExtension() {
> return m_Extension;
> }
>
> public void setExtension(final String pExtension) {
> m_Extension = pExtension;
> }
>
> public boolean isSplitReadable(InputSplit split) {
> if (!(split instanceof FileSplit))
> return true;
> FileSplit fsplit = (FileSplit) split;
> Path path1 = fsplit.getPath();
> return isPathAcceptable(path1);
> }
>
> protected boolean isPathAcceptable(final Path pPath1) {
> String path = pPath1.toString().toLowerCase();
> if(path.startsWith("part-r-"))
> return true;
> String extension = getExtension();
> if (extension != null && path.endsWith(extension.toLowerCase()))
> return true;
> if (extension != null && path.endsWith(extension.toLowerCase() +
> ".gz"))
> return true;
> if (extension == null )
> return true;
> return false;
> }
>
> public String getStartTag() {
> return m_StartTag;
> }
>
> public String getBaseTag() {
> return m_BaseTag;
> }
>
> public String getEndTag() {
> return m_EndTag;
> }
>
> @Override
> public RecordReader<Text, Text> createRecordReader(InputSplit split,
> TaskAttemptContext
> context) {
> if (isSplitReadable(split))
> return new MyXMLFileReader();
> else
> return NullRecordReader.INSTANCE; // do not read
> }
>
> @Override
> protected boolean isSplitable(JobContext context, Path file) {
> String fname = file.getName().toLowerCase();
> if(fname.endsWith(".gz"))
> return false;
> return true;
> }
>
> /**
> * Generate the list of files and make them into FileSplits.
> * This needs to be copied to insert a filter on acceptable data
> */
> @Override
> public List<InputSplit> getSplits(JobContext job
> ) throws IOException {
> long minSize = Math.max(getFormatMinSplitSize(),
> getMinSplitSize(job));
> long maxSize = getMaxSplitSize(job);
>
> // generate splits
> List<InputSplit> splits = new ArrayList<InputSplit>();
> for (FileStatus file : listStatus(job)) {
> Path path = file.getPath();
> if (!isPathAcceptable(path)) // filter acceptable data
> continue;
> FileSystem fs = path.getFileSystem(job.getConfiguration());
> long length = file.getLen();
> BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
> length);
> if ((length != 0) && isSplitable(job, path)) {
> long blockSize = file.getBlockSize();
> long splitSize = computeSplitSize(blockSize, minSize,
> maxSize);
>
> long bytesRemaining = length;
> while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
> int blkIndex = getBlockIndex(blkLocations, length -
> bytesRemaining);
> splits.add(new FileSplit(path, length - bytesRemaining,
> splitSize,
> blkLocations[blkIndex].getHosts()));
> bytesRemaining -= splitSize;
> }
>
> if (bytesRemaining != 0) {
> splits.add(new FileSplit(path, length - bytesRemaining,
> bytesRemaining,
> blkLocations[blkLocations.length -
> 1].getHosts()));
> }
> }
> else if (length != 0) {
> splits.add(new FileSplit(path, 0, length,
> blkLocations[0].getHosts()));
> }
> else {
> //Create empty hosts array for zero length files
> splits.add(new FileSplit(path, 0, length, new String[0]));
> }
> }
> // LOG.debug("Total # of splits: " + splits.size());
> return splits;
> }
>
> /**
> * Custom RecordReader which returns the entire file as a
> * single m_Value with the name as a m_Key
> * Value is the entire file
> * Key is the file name
> */
> public class MyXMLFileReader extends RecordReader<Text, Text> {
>
> private CompressionCodecFactory compressionCodecs = null;
> private long m_Start;
> private long m_End;
> private long m_Current;
> private BufferedReader m_Input;
> private Text m_Key;
> private Text m_Value = null;
> private char[] m_Buffer = new char[BUFFER_SIZE];
> StringBuilder m_Sb = new StringBuilder();
>
> public void initialize(InputSplit genericSplit,
> TaskAttemptContext context) throws IOException
> {
> FileSplit split = (FileSplit) genericSplit;
> Configuration job = context.getConfiguration();
> m_Sb.setLength(0);
> m_Start = split.getStart();
> m_End = m_Start + split.getLength();
> final Path file = split.getPath();
> compressionCodecs = new CompressionCodecFactory(job);
> final CompressionCodec codec = compressionCodecs.getCodec(file);
>
> // open the file and seek to the m_Start of the split
> FileSystem fs = file.getFileSystem(job);
> FSDataInputStream fileIn = fs.open(split.getPath());
> if (codec != null) {
> CompressionInputStream inputStream =
> codec.createInputStream(fileIn);
> m_Input = new BufferedReader(new
> InputStreamReader(inputStream));
> m_End = Long.MAX_VALUE;
> }
> else {
> m_Input = new BufferedReader(new InputStreamReader(fileIn));
> }
> m_Current = m_Start;
> if (m_Key == null) {
> m_Key = new Text();
> }
> m_Key.set(split.getPath().getName());
> if (m_Value == null) {
> m_Value = new Text();
> }
>
> }
>
> /**
> * look for a <scan tag then read until it closes
> *
> * @return true if there is data
> * @throws java.io.IOException
> */
> public boolean nextKeyValue() throws IOException {
> if(readFromCurrentBuffer())
> return true;
> int newSize = 0;
> String startTag = getStartTag() + " ";
> String startTag2 = getStartTag() + ">";
> newSize = m_Input.read(m_Buffer);
>
> while (newSize > 0) {
> m_Current += newSize;
> m_Sb.append(m_Buffer, 0, newSize);
> if( readFromCurrentBuffer())
> return true;
> newSize = m_Input.read(m_Buffer);
> }
> // exit because we are at the m_End
> if (newSize <= 0) {
> m_Key = null;
> m_Value = null;
> return false;
> }
>
> return true;
> }
>
> protected boolean readFromCurrentBuffer()
> {
> String endTag = getEndTag();
> String startText = m_Sb.toString();
> if(!startText.contains(endTag))
> return false; // need more read
> String startTag = getStartTag() + " ";
> String startTag2 = getStartTag() + ">";
> int index = startText.indexOf(startTag);
> if (index == -1)
> index = startText.indexOf(startTag2);
> if(index == -1)
> return false;
> startText = startText.substring(index);
> m_Sb.setLength(0);
> m_Sb.append(startText);
>
> String s = startText;
> index = s.indexOf(endTag);
> if (index == -1)
> return false; // need more read
> // throw new IllegalStateException("unmatched tag " +
> getBaseTag());
> index += endTag.length();
> String tag = s.substring(0, index).trim();
> m_Value.set(tag);
>
> // keep the remaining text to add to the next tag
> m_Sb.setLength(0);
> String rest = s.substring(index);
> m_Sb.append(rest);
> return true;
> }
>
> @Override
> public Text getCurrentKey() {
> return m_Key;
> }
>
> @Override
> public Text getCurrentValue() {
> return m_Value;
> }
>
> /**
> * Get the progress within the split
> */
> public float getProgress() {
> return ((float) m_Current - m_Start) / (m_Start - m_End);
> }
>
> public synchronized void close() throws IOException {
> if (m_Input != null) {
> m_Input.close();
> }
> }
> }
> }
>
> =============================================
>
> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <[email protected]> wrote:
> Hello everyone,
>
> I'm new to Hadoop and I'm trying to figure out how to design a M/R program to
> parse a file and generate a PMML file as output.
>
> What I would like to do is split a file by a keyword instead a given number
> of lines because the location of the split could change from time to time.
>
> I'm looking around and was thinking maybe KeyValueTextInputFormat would be
> the way to go but I'm not finding any clear examples how to use it. So I'm
> not sure if this is the right choice or not.
>
> Here is a basic input example of what I'm working with.
>
> [Input file info]
> more info
> more info
> etc.
> etc.
> Keyword
> different info
> different info
> Keyword
> some more info
>
> For the example above, each section can be generated separately from each
> other. However, within each section, different lines are dependent upon each
> other to generate a valid PMML file.
>
> Can anyone offer a suggestion what type of input format I should use?
>
> Thanks for your time
> Erik
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>