Look at this sample
=============================================
package org.systemsbiology.hadoop;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import java.io.*;
import java.util.*;
/**
* org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
* Splitter that reads scan tags from an XML file
* No assumption is made about lines but tage and end tags MUST look like
<MyTag </MyTag> with no embedded spaces
* usually you will subclass and hard code the tag you want to split on
*/
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
private static final double SPLIT_SLOP = 1.1; // 10% slop
public static final int BUFFER_SIZE = 4096;
private final String m_BaseTag;
private final String m_StartTag;
private final String m_EndTag;
private String m_Extension;
public XMLTagInputFormat(final String pBaseTag) {
m_BaseTag = pBaseTag;
m_StartTag = "<" + pBaseTag;
m_EndTag = "</" + pBaseTag + ">";
}
public String getExtension() {
return m_Extension;
}
public void setExtension(final String pExtension) {
m_Extension = pExtension;
}
public boolean isSplitReadable(InputSplit split) {
if (!(split instanceof FileSplit))
return true;
FileSplit fsplit = (FileSplit) split;
Path path1 = fsplit.getPath();
return isPathAcceptable(path1);
}
protected boolean isPathAcceptable(final Path pPath1) {
String path = pPath1.toString().toLowerCase();
if(path.startsWith("part-r-"))
return true;
String extension = getExtension();
if (extension != null && path.endsWith(extension.toLowerCase()))
return true;
if (extension != null && path.endsWith(extension.toLowerCase() +
".gz"))
return true;
if (extension == null )
return true;
return false;
}
public String getStartTag() {
return m_StartTag;
}
public String getBaseTag() {
return m_BaseTag;
}
public String getEndTag() {
return m_EndTag;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext
context) {
if (isSplitReadable(split))
return new MyXMLFileReader();
else
return NullRecordReader.INSTANCE; // do not read
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
String fname = file.getName().toLowerCase();
if(fname.endsWith(".gz"))
return false;
return true;
}
/**
* Generate the list of files and make them into FileSplits.
* This needs to be copied to insert a filter on acceptable data
*/
@Override
public List<InputSplit> getSplits(JobContext job
) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
for (FileStatus file : listStatus(job)) {
Path path = file.getPath();
if (!isPathAcceptable(path)) // filter acceptable data
continue;
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
splits.add(new FileSplit(path, length - bytesRemaining,
splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkLocations.length -
1].getHosts()));
}
}
else if (length != 0) {
splits.add(new FileSplit(path, 0, length,
blkLocations[0].getHosts()));
}
else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// LOG.debug("Total # of splits: " + splits.size());
return splits;
}
/**
* Custom RecordReader which returns the entire file as a
* single m_Value with the name as a m_Key
* Value is the entire file
* Key is the file name
*/
public class MyXMLFileReader extends RecordReader<Text, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long m_Start;
private long m_End;
private long m_Current;
private BufferedReader m_Input;
private Text m_Key;
private Text m_Value = null;
private char[] m_Buffer = new char[BUFFER_SIZE];
StringBuilder m_Sb = new StringBuilder();
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws
IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
m_Sb.setLength(0);
m_Start = split.getStart();
m_End = m_Start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the m_Start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (codec != null) {
CompressionInputStream inputStream =
codec.createInputStream(fileIn);
m_Input = new BufferedReader(new
InputStreamReader(inputStream));
m_End = Long.MAX_VALUE;
}
else {
m_Input = new BufferedReader(new InputStreamReader(fileIn));
}
m_Current = m_Start;
if (m_Key == null) {
m_Key = new Text();
}
m_Key.set(split.getPath().getName());
if (m_Value == null) {
m_Value = new Text();
}
}
/**
* look for a <scan tag then read until it closes
*
* @return true if there is data
* @throws java.io.IOException
*/
public boolean nextKeyValue() throws IOException {
if(readFromCurrentBuffer())
return true;
int newSize = 0;
String startTag = getStartTag() + " ";
String startTag2 = getStartTag() + ">";
newSize = m_Input.read(m_Buffer);
while (newSize > 0) {
m_Current += newSize;
m_Sb.append(m_Buffer, 0, newSize);
if( readFromCurrentBuffer())
return true;
newSize = m_Input.read(m_Buffer);
}
// exit because we are at the m_End
if (newSize <= 0) {
m_Key = null;
m_Value = null;
return false;
}
return true;
}
protected boolean readFromCurrentBuffer()
{
String endTag = getEndTag();
String startText = m_Sb.toString();
if(!startText.contains(endTag))
return false; // need more read
String startTag = getStartTag() + " ";
String startTag2 = getStartTag() + ">";
int index = startText.indexOf(startTag);
if (index == -1)
index = startText.indexOf(startTag2);
if(index == -1)
return false;
startText = startText.substring(index);
m_Sb.setLength(0);
m_Sb.append(startText);
String s = startText;
index = s.indexOf(endTag);
if (index == -1)
return false; // need more read
// throw new IllegalStateException("unmatched tag " +
getBaseTag());
index += endTag.length();
String tag = s.substring(0, index).trim();
m_Value.set(tag);
// keep the remaining text to add to the next tag
m_Sb.setLength(0);
String rest = s.substring(index);
m_Sb.append(rest);
return true;
}
@Override
public Text getCurrentKey() {
return m_Key;
}
@Override
public Text getCurrentValue() {
return m_Value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
return ((float) m_Current - m_Start) / (m_Start - m_End);
}
public synchronized void close() throws IOException {
if (m_Input != null) {
m_Input.close();
}
}
}
}
=============================================
On Mon, Jul 11, 2011 at 11:57 AM, Erik T <[email protected]> wrote:
> Hello everyone,
>
> I'm new to Hadoop and I'm trying to figure out how to design a M/R program
> to parse a file and generate a PMML file as output.
>
> What I would like to do is split a file by a keyword instead a given number
> of lines because the location of the split could change from time to time.
>
> I'm looking around and was thinking maybe KeyValueTextInputFormat would be
> the way to go but I'm not finding any clear examples how to use it. So I'm
> not sure if this is the right choice or not.
>
> Here is a basic input example of what I'm working with.
>
> [Input file info]
> more info
> more info
> etc.
> etc.
> *Keyword*
> different info
> different info
> *Keyword*
> some more info
>
> For the example above, each section can be generated separately from each
> other. However, within each section, different lines are dependent upon each
> other to generate a valid PMML file.
>
> Can anyone offer a suggestion what type of input format I should use?
>
> Thanks for your time
> Erik
>
--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com