ACCUMULO-375 added code to separate map jobs for parsing wikipedia and for 
ingesting into Accumulo

git-svn-id: 
https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1241579 
13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/a673727d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/a673727d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/a673727d

Branch: refs/heads/1.4.5-SNAPSHOT
Commit: a673727d6c09551d5a22b76bdbcb0dcc098573db
Parents: 686895f
Author: Adam Fuchs <afu...@apache.org>
Authored: Tue Feb 7 19:48:39 2012 +0000
Committer: Adam Fuchs <afu...@apache.org>
Committed: Tue Feb 7 19:48:39 2012 +0000

----------------------------------------------------------------------
 README.parallel                                 |  65 +++++
 ingest/bin/ingest_parallel.sh                   |  74 ++++++
 ingest/conf/wikipedia_parallel.xml.example      |  59 +++++
 .../wikisearch/ingest/ArticleExtractor.java     |  26 +-
 .../ingest/WikipediaConfiguration.java          |  17 ++
 .../ingest/WikipediaPartitionedIngester.java    | 247 +++++++++++++++++++
 .../ingest/WikipediaPartitionedMapper.java      | 192 ++++++++++++++
 .../wikisearch/ingest/WikipediaPartitioner.java | 108 ++++++++
 8 files changed, 787 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/README.parallel
----------------------------------------------------------------------
diff --git a/README.parallel b/README.parallel
new file mode 100644
index 0000000..18cab71
--- /dev/null
+++ b/README.parallel
@@ -0,0 +1,65 @@
+
+ This project contains a sample application for ingesting and querying 
wikipedia data.
+ 
+  
+ Ingest
+ ------
+ 
+       Prerequisites
+       -------------
+       1. Accumulo, Hadoop, and ZooKeeper must be installed and running
+       2. ACCUMULO_HOME and ZOOKEEPER_HOME must be defined in the environment
+       3. One or more wikipedia dump files 
(http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory.
+            You will want to grab the files with the link name of 
pages-articles.xml.bz2
+ 
+ 
+       INSTRUCTIONS
+       ------------
+       1. Copy the ingest/conf/wikipedia_parallel.xml.example to 
ingest/conf/wikipedia.xml and change it to specify Accumulo information. 
+       2. Copy the ingest/lib/wikisearch-*.jar and ingest/lib/protobuf*.jar to 
$ACCUMULO_HOME/lib/ext
+       3. Then run ingest/bin/ingest_parallel.sh with one argument (the name 
of the directory in HDFS where the wikipedia XML 
+             files reside) and this will kick off a MapReduce job to ingest 
the data into Accumulo.
+   
+ Query
+ -----
+ 
+       Prerequisites
+       -------------
+       1. The query software was tested using JBoss AS 6. Install this unless 
you feel like messing with the installation.
+       
+       NOTE: Ran into a bug (https://issues.jboss.org/browse/RESTEASY-531) 
that did not allow an EJB3.1 war file. The
+       workaround is to separate the RESTEasy servlet from the EJBs by 
creating an EJB jar and a WAR file.
+       
+       INSTRUCTIONS
+       -------------
+       1. Copy the query/src/main/resources/META-INF/ejb-jar.xml.example file 
to 
+          query/src/main/resources/META-INF/ejb-jar.xml. Modify to the file to 
contain the same 
+          information that you put into the wikipedia.xml file from the Ingest 
step above. 
+       2. Re-build the query distribution by running 'mvn package 
assembly:single' in the top-level directory. 
+        3. Untar the resulting file in the $JBOSS_HOME/server/default 
directory.
+
+              $ cd $JBOSS_HOME/server/default
+              $ tar -xzf 
$ACCUMULO_HOME/src/examples/wikisearch/query/target/wikisearch-query*.tar.gz
+ 
+           This will place the dependent jars in the lib directory and the EJB 
jar into the deploy directory.
+       4. Next, copy the wikisearch*.war file in the query-war/target 
directory to $JBOSS_HOME/server/default/deploy. 
+       5. Start JBoss ($JBOSS_HOME/bin/run.sh)
+       6. Use the Accumulo shell and give the user permissions for the wikis 
that you loaded, for example: 
+                       setauths -u <user> -s all,enwiki,eswiki,frwiki,fawiki
+       7. Copy the following jars to the $ACCUMULO_HOME/lib/ext directory from 
the $JBOSS_HOME/server/default/lib directory:
+       
+               commons-lang*.jar
+               kryo*.jar
+               minlog*.jar
+               commons-jexl*.jar
+               google-collections*.jar
+               
+       8. Copy the $JBOSS_HOME/server/default/deploy/wikisearch-query*.jar to 
$ACCUMULO_HOME/lib/ext.
+
+
+       9. At this point you should be able to open a browser and view the 
page: http://localhost:8080/accumulo-wikisearch/ui/ui.jsp.
+       You can issue the queries using this user interface or via the 
following REST urls: <host>/accumulo-wikisearch/rest/Query/xml,
+       <host>/accumulo-wikisearch/rest/Query/html, 
<host>/accumulo-wikisearch/rest/Query/yaml, or 
<host>/accumulo-wikisearch/rest/Query/json.
+       There are two parameters to the REST service, query and auths. The 
query parameter is the same string that you would type
+       into the search box at ui.jsp, and the auths parameter is a 
comma-separated list of wikis that you want to search (i.e.
+       enwiki,frwiki,dewiki, etc. Or you can use all) 

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/bin/ingest_parallel.sh
----------------------------------------------------------------------
diff --git a/ingest/bin/ingest_parallel.sh b/ingest/bin/ingest_parallel.sh
new file mode 100755
index 0000000..e921494
--- /dev/null
+++ b/ingest/bin/ingest_parallel.sh
@@ -0,0 +1,74 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+THIS_SCRIPT="$0"
+SCRIPT_DIR="${THIS_SCRIPT%/*}"
+SCRIPT_DIR=`cd $SCRIPT_DIR ; pwd`
+echo $SCRIPT_DIR
+
+ACCUMULO_HOME=${ACCUMULO_HOME}
+ZOOKEEPER_HOME=${ZOOKEEPER_HOME}
+
+#
+# Check ZOOKEEPER_HOME
+#
+if [[ -z $ZOOKEEPER_HOME ]]; then
+       echo "You must set ZOOKEEPER_HOME environment variable"
+       exit -1;
+else
+       for f in $ZOOKEEPER_HOME/zookeeper-*.jar; do
+               CLASSPATH=$f
+               break
+       done    
+fi
+
+#
+# Check ACCUMULO_HOME
+#
+if [[ -z $ACCUMULO_HOME ]]; then
+       echo "You must set ACCUMULO_HOME environment variable"
+       exit -1;
+else
+       for f in $ACCUMULO_HOME/lib/*.jar; do
+               CLASSPATH=${CLASSPATH}:$f
+       done    
+fi
+
+#
+# Add our jars
+#
+for f in $SCRIPT_DIR/../lib/*.jar; do
+       CLASSPATH=${CLASSPATH}:$f  
+done
+
+#
+# Transform the classpath into a comma-separated list also
+#
+LIBJARS=`echo $CLASSPATH | sed 's/:/,/g'`
+
+
+#
+# Map/Reduce job
+#
+JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.4.0-incubating-SNAPSHOT.jar
+CONF=$SCRIPT_DIR/../conf/wikipedia.xml
+HDFS_DATA_DIR=$1
+export HADOOP_CLASSPATH=$CLASSPATH
+echo "hadoop jar $JAR 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaPartitionedIngester 
-libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}"
+hadoop jar $JAR 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaPartitionedIngester 
-libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/conf/wikipedia_parallel.xml.example
----------------------------------------------------------------------
diff --git a/ingest/conf/wikipedia_parallel.xml.example 
b/ingest/conf/wikipedia_parallel.xml.example
new file mode 100644
index 0000000..cf20f01
--- /dev/null
+++ b/ingest/conf/wikipedia_parallel.xml.example
@@ -0,0 +1,59 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+  <property>
+    <name>wikipedia.accumulo.zookeepers</name>
+    <value><!--zookeeper servers --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.instance_name</name>
+    <value><!--instance name --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.user</name>
+    <value><!--user name --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.password</name>
+    <value><!-- password --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.table</name>
+    <value><!--table name --></value>
+  </property>
+  <property>
+    <name>wikipedia.ingest.partitions</name>
+    <value><!--number of partitions --></value>
+  </property>
+  <property>
+    <name>wikipedia.partitioned.directory</name>
+    <value><!--hdfs directory for intemediate partitioned storage --></value>
+  </property>
+  <property>
+    <name>wikipedia.ingest.groups</name>
+    <value><!--the number of intermediate partition groups to generate 
--></value>
+  </property>
+  <property>
+    <name>wikipedia.run.partitioner</name>
+    <value><!--whether to run the partitioner step --></value>
+  </property>
+  <property>
+    <name>wikipedia.run.ingest</name>
+    <value><!--whether to run the ingest step --></value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
index 54e47b6..06d1670 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
@@ -16,6 +16,9 @@
  */
 package org.apache.accumulo.examples.wikisearch.ingest;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Reader;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -29,6 +32,7 @@ import javax.xml.stream.XMLStreamReader;
 
 import 
org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
 import org.apache.accumulo.examples.wikisearch.normalizer.NumberNormalizer;
+import org.apache.hadoop.io.Writable;
 
 
 public class ArticleExtractor {
@@ -37,13 +41,15 @@ public class ArticleExtractor {
   private static NumberNormalizer nn = new NumberNormalizer();
   private static LcNoDiacriticsNormalizer lcdn = new 
LcNoDiacriticsNormalizer();
   
-  public static class Article {
+  public static class Article implements Writable {
     int id;
     String title;
     long timestamp;
     String comments;
     String text;
     
+    public Article(){}
+    
     private Article(int id, String title, long timestamp, String comments, 
String text) {
       super();
       this.id = id;
@@ -90,6 +96,24 @@ public class ArticleExtractor {
       fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", 
this.comments));
       return fields;
     }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+      title = in.readUTF();
+      timestamp = in.readLong();
+      comments = in.readUTF();
+      text = in.readUTF();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+      out.writeUTF(title);
+      out.writeLong(timestamp);
+      out.writeUTF(comments);
+      out.writeUTF(text);
+    }
     
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
index d76d713..5a0aad4 100644
--- 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
@@ -48,6 +48,11 @@ public class WikipediaConfiguration {
 
   public final static String NUM_GROUPS = "wikipedia.ingest.groups";
 
+  public final static String PARTITIONED_ARTICLES_DIRECTORY = 
"wikipedia.partitioned.directory";
+  
+  public final static String RUN_PARTITIONER = "wikipedia.run.partitioner";
+  public final static String RUN_INGEST = "wikipedia.run.ingest";
+  
   
   public static String getUser(Configuration conf) {
     return conf.get(USER);
@@ -117,6 +122,18 @@ public class WikipediaConfiguration {
     return conf.getInt(NUM_GROUPS, 1);
   }
   
+  public static Path getPartitionedArticlesPath(Configuration conf) {
+    return new Path(conf.get(PARTITIONED_ARTICLES_DIRECTORY));
+  }
+  
+  public static boolean runPartitioner(Configuration conf) {
+    return conf.getBoolean(RUN_PARTITIONER, false);
+  }
+
+  public static boolean runIngest(Configuration conf) {
+    return conf.getBoolean(RUN_INGEST, true);
+  }
+
   /**
    * Helper method to get properties from Hadoop configuration
    * 

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
new file mode 100644
index 0000000..e7493dc
--- /dev/null
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
+import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
+import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WikipediaPartitionedIngester extends Configured implements Tool {
+  
+  public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
+  public final static String SPLIT_FILE = "wikipedia.split_file";
+  public final static String TABLE_NAME = "wikipedia.table";
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new 
WikipediaPartitionedIngester(), args);
+    System.exit(res);
+  }
+  
+  private void createTables(TableOperations tops, String tableName) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException,
+      TableExistsException {
+    // Create the shard table
+    String indexTableName = tableName + "Index";
+    String reverseIndexTableName = tableName + "ReverseIndex";
+    String metadataTableName = tableName + "Metadata";
+    
+    // create the shard table
+    if (!tops.exists(tableName)) {
+      // Set a text index combiner on the given field names. No combiner is 
set if the option is not supplied
+      String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME;
+      
+      tops.create(tableName);
+      if (textIndexFamilies.length() > 0) {
+        System.out.println("Adding content combiner on the fields: " + 
textIndexFamilies);
+        
+        IteratorSetting setting = new IteratorSetting(10, 
TextIndexCombiner.class);
+        List<Column> columns = new ArrayList<Column>();
+        for (String family : StringUtils.split(textIndexFamilies, ',')) {
+          columns.add(new Column("fi\0" + family));
+        }
+        TextIndexCombiner.setColumns(setting, columns);
+        TextIndexCombiner.setLossyness(setting, true);
+        
+        tops.attachIterator(tableName, setting, 
EnumSet.allOf(IteratorScope.class));
+      }
+      
+      // Set the locality group for the full content column family
+      tops.setLocalityGroups(tableName, 
Collections.singletonMap("WikipediaDocuments", Collections.singleton(new 
Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY))));
+      
+    }
+    
+    if (!tops.exists(indexTableName)) {
+      tops.create(indexTableName);
+      // Add the UID combiner
+      IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", 
GlobalIndexUidCombiner.class);
+      GlobalIndexUidCombiner.setCombineAllColumns(setting, true);
+      GlobalIndexUidCombiner.setLossyness(setting, true);
+      tops.attachIterator(indexTableName, setting, 
EnumSet.allOf(IteratorScope.class));
+    }
+    
+    if (!tops.exists(reverseIndexTableName)) {
+      tops.create(reverseIndexTableName);
+      // Add the UID combiner
+      IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", 
GlobalIndexUidCombiner.class);
+      GlobalIndexUidCombiner.setCombineAllColumns(setting, true);
+      GlobalIndexUidCombiner.setLossyness(setting, true);
+      tops.attachIterator(reverseIndexTableName, setting, 
EnumSet.allOf(IteratorScope.class));
+    }
+    
+    if (!tops.exists(metadataTableName)) {
+      // Add the SummingCombiner with VARLEN encoding for the frequency column
+      tops.create(metadataTableName);
+      IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class);
+      SummingCombiner.setColumns(setting, Collections.singletonList(new 
Column("f")));
+      SummingCombiner.setEncodingType(setting, SummingCombiner.Type.VARLEN);
+      tops.attachIterator(metadataTableName, setting, 
EnumSet.allOf(IteratorScope.class));
+    }
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    if(WikipediaConfiguration.runPartitioner(conf))
+    {
+      int result = runPartitionerJob();
+      if(result != 0)
+        return result;
+    }
+    if(WikipediaConfiguration.runIngest(conf))
+      return runIngestJob();
+    return 0;
+  }
+  
+  public int runPartitionerJob() throws Exception
+  {
+    Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
+    Configuration partitionerConf = partitionerJob.getConfiguration();
+    partitionerConf.set("mapred.map.tasks.speculative.execution", "false");
+
+    configurePartitionerJob(partitionerJob);
+    
+    List<Path> inputPaths = new ArrayList<Path>();
+    SortedSet<String> languages = new TreeSet<String>();
+    FileSystem fs = FileSystem.get(partitionerConf);
+    Path parent = new Path(partitionerConf.get("wikipedia.input"));
+    listFiles(parent, fs, inputPaths, languages);
+    
+    System.out.println("Input files in " + parent + ":" + inputPaths.size());
+    Path[] inputPathsArray = new Path[inputPaths.size()];
+    inputPaths.toArray(inputPathsArray);
+    
+    System.out.println("Languages:" + languages.size());
+
+    // setup input format
+    
+    WikipediaInputFormat.setInputPaths(partitionerJob, inputPathsArray);
+    
+    partitionerJob.setMapperClass(WikipediaPartitioner.class);
+    partitionerJob.setNumReduceTasks(0);
+
+    // setup output format
+    partitionerJob.setMapOutputKeyClass(Text.class);
+    partitionerJob.setMapOutputValueClass(Article.class);
+    partitionerJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+    Path outputDir = 
WikipediaConfiguration.getPartitionedArticlesPath(partitionerConf);
+    SequenceFileOutputFormat.setOutputPath(partitionerJob, outputDir);
+    
+    return partitionerJob.waitForCompletion(true) ? 0 : 1;
+  }
+  
+  public int runIngestJob() throws Exception
+  {
+    Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
+    Configuration ingestConf = ingestJob.getConfiguration();
+    ingestConf.set("mapred.map.tasks.speculative.execution", "false");
+
+    String tablename = WikipediaConfiguration.getTableName(ingestConf);
+    
+    String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
+    String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
+    
+    String user = WikipediaConfiguration.getUser(ingestConf);
+    byte[] password = WikipediaConfiguration.getPassword(ingestConf);
+    Connector connector = WikipediaConfiguration.getConnector(ingestConf);
+    
+    TableOperations tops = connector.tableOperations();
+    
+    createTables(tops, tablename);
+    
+    // setup input format
+    ingestJob.setInputFormatClass(SequenceFileInputFormat.class);
+    SequenceFileInputFormat.setInputPaths(ingestJob, 
WikipediaConfiguration.getPartitionedArticlesPath(ingestConf));
+
+    // setup output format
+    ingestJob.setMapOutputKeyClass(Text.class);
+    ingestJob.setMapOutputValueClass(Mutation.class);
+    ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
+    AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, 
password, true, tablename);
+    AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), 
instanceName, zookeepers);
+    
+    return ingestJob.waitForCompletion(true) ? 0 : 1;
+  }
+  
+  public final static PathFilter partFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith("part");
+    };
+  };
+  
+  protected void configurePartitionerJob(Job job) {
+    Configuration conf = job.getConfiguration();
+    job.setJarByClass(WikipediaPartitionedIngester.class);
+    job.setInputFormatClass(WikipediaInputFormat.class);
+    conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+  }
+  
+  protected static final Pattern filePattern = 
Pattern.compile("([a-z_]+).*.xml(.bz2)?");
+  
+  protected void listFiles(Path path, FileSystem fs, List<Path> files, 
Set<String> languages) throws IOException {
+    for (FileStatus status : fs.listStatus(path)) {
+      if (status.isDir()) {
+        listFiles(status.getPath(), fs, files, languages);
+      } else {
+        Path p = status.getPath();
+        Matcher matcher = filePattern.matcher(p.getName());
+        if (matcher.matches()) {
+          languages.add(matcher.group(1));
+          files.add(p);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
new file mode 100644
index 0000000..4d94c24
--- /dev/null
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import 
org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
+import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.tokenattributes.TermAttribute;
+import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+public class WikipediaPartitionedMapper extends 
Mapper<Text,Article,Text,Mutation> {
+  
+  private static final Logger log = 
Logger.getLogger(WikipediaPartitionedMapper.class);
+  
+  public final static Charset UTF8 = Charset.forName("UTF-8");
+  public static final String DOCUMENT_COLUMN_FAMILY = "d";
+  public static final String METADATA_EVENT_COLUMN_FAMILY = "e";
+  public static final String METADATA_INDEX_COLUMN_FAMILY = "i";
+  public static final String TOKENS_FIELD_NAME = "TEXT";
+  
+  private static final Value NULL_VALUE = new Value(new byte[0]);
+  private static final String cvPrefix = "all|";
+  
+  private int numPartitions = 0;
+
+  private Text tablename = null;
+  private Text indexTableName = null;
+  private Text reverseIndexTableName = null;
+  private Text metadataTableName = null;
+  
+  @Override
+  public void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    tablename = new Text(WikipediaConfiguration.getTableName(conf));
+    indexTableName = new Text(tablename + "Index");
+    reverseIndexTableName = new Text(tablename + "ReverseIndex");
+    metadataTableName = new Text(tablename + "Metadata");
+    
+    numPartitions = WikipediaConfiguration.getNumPartitions(conf);
+  }
+  
+  @Override
+  protected void map(Text language, Article article, Context context) throws 
IOException, InterruptedException {
+    String NULL_BYTE = "\u0000";
+    String colfPrefix = language.toString() + NULL_BYTE;
+    String indexPrefix = "fi" + NULL_BYTE;
+    ColumnVisibility cv = new ColumnVisibility(cvPrefix + language);
+    
+    if (article != null) {
+      Text partitionId = new 
Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions)));
+      
+      // Create the mutations for the document.
+      // Row is partition id, colf is language0articleid, colq is 
fieldName\0fieldValue
+      Mutation m = new Mutation(partitionId);
+      for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
+        m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + 
entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
+        // Create mutations for the metadata table.
+        Mutation mm = new Mutation(entry.getKey());
+        mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, 
article.getTimestamp(), NULL_VALUE);
+        context.write(metadataTableName, mm);
+      }
+      
+      // Tokenize the content
+      Set<String> tokens = getTokens(article);
+      
+      // We are going to put the fields to be indexed into a multimap. This 
allows us to iterate
+      // over the entire set once.
+      Multimap<String,String> indexFields = HashMultimap.create();
+      // Add the normalized field values
+      LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer();
+      for (Entry<String,String> index : 
article.getNormalizedFieldValues().entrySet())
+        indexFields.put(index.getKey(), index.getValue());
+      // Add the tokens
+      for (String token : tokens)
+        indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", 
token));
+      
+      for (Entry<String,String> index : indexFields.entries()) {
+        // Create mutations for the in partition index
+        // Row is partition id, colf is 'fi'\0fieldName, colq is 
fieldValue\0language\0article id
+        m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + 
colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE);
+        
+        // Create mutations for the global index
+        // Create a UID object for the Value
+        Builder uidBuilder = Uid.List.newBuilder();
+        uidBuilder.setIGNORE(false);
+        uidBuilder.setCOUNT(1);
+        uidBuilder.addUID(Integer.toString(article.getId()));
+        Uid.List uidList = uidBuilder.build();
+        Value val = new Value(uidList.toByteArray());
+        
+        // Create mutations for the global index
+        // Row is field value, colf is field name, colq is 
partitionid\0language, value is Uid.List object
+        Mutation gm = new Mutation(index.getValue());
+        gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, 
article.getTimestamp(), val);
+        context.write(indexTableName, gm);
+        
+        // Create mutations for the global reverse index
+        Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
+        grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, 
article.getTimestamp(), val);
+        context.write(reverseIndexTableName, grm);
+        
+        // Create mutations for the metadata table.
+        Mutation mm = new Mutation(index.getKey());
+        mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + 
LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), 
NULL_VALUE);
+        context.write(metadataTableName, mm);
+        
+      }
+      // Add the entire text to the document section of the table.
+      // row is the partition, colf is 'd', colq is language\0articleid, value 
is Base64 encoded GZIP'd document
+      m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, 
article.getTimestamp(), new 
Value(Base64.encodeBase64(article.getText().getBytes())));
+      context.write(tablename, m);
+      
+    } else {
+      context.getCounter("wikipedia", "invalid articles").increment(1);
+    }
+    context.progress();
+  }
+  
+  /**
+   * Tokenize the wikipedia content
+   * 
+   * @param article
+   * @return
+   * @throws IOException
+   */
+  private Set<String> getTokens(Article article) throws IOException {
+    Set<String> tokenList = new HashSet<String>();
+    WikipediaTokenizer tok = new WikipediaTokenizer(new 
StringReader(article.getText()));
+    TermAttribute term = tok.addAttribute(TermAttribute.class);
+    try {
+      while (tok.incrementToken()) {
+        String token = term.term();
+        if (!StringUtils.isEmpty(token))
+          tokenList.add(token);
+      }
+    } catch (IOException e) {
+      log.error("Error tokenizing text", e);
+    } finally {
+      try {
+        tok.end();
+      } catch (IOException e) {
+        log.error("Error calling end()", e);
+      } finally {
+        try {
+          tok.close();
+        } catch (IOException e) {
+          log.error("Error closing tokenizer", e);
+        }
+      }
+    }
+    return tokenList;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
new file mode 100644
index 0000000..82af9fd
--- /dev/null
+++ 
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.IllegalFormatException;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import 
org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
+import 
org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
+import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.tokenattributes.TermAttribute;
+import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+public class WikipediaPartitioner extends 
Mapper<LongWritable,Text,Text,Article> {
+  
+  private static final Logger log = 
Logger.getLogger(WikipediaPartitioner.class);
+  
+  public final static Charset UTF8 = Charset.forName("UTF-8");
+  public static final String DOCUMENT_COLUMN_FAMILY = "d";
+  public static final String METADATA_EVENT_COLUMN_FAMILY = "e";
+  public static final String METADATA_INDEX_COLUMN_FAMILY = "i";
+  public static final String TOKENS_FIELD_NAME = "TEXT";
+  
+  private final static Pattern languagePattern = 
Pattern.compile("([a-z_]+).*.xml(.bz2)?");
+  
+  private ArticleExtractor extractor;
+  private String language;
+
+  private int myGroup = -1;
+  private int numGroups = -1;
+  
+  @Override
+  public void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    
+    WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit();
+    myGroup = wiSplit.getPartition();
+    numGroups = WikipediaConfiguration.getNumGroups(conf);
+    
+    FileSplit split = wiSplit.getFileSplit();
+    String fileName = split.getPath().getName();
+    Matcher matcher = languagePattern.matcher(fileName);
+    if (matcher.matches()) {
+      language = matcher.group(1).replace('_', '-').toLowerCase();
+    } else {
+      throw new RuntimeException("Unknown ingest language! " + fileName);
+    }
+    extractor = new ArticleExtractor();
+  }
+  
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
+    Article article = extractor.extract(new InputStreamReader(new 
ByteArrayInputStream(value.getBytes()), UTF8));
+    if (article != null) {
+      int groupId = WikipediaMapper.getPartitionId(article, numGroups);
+      if(groupId != myGroup)
+        return;
+      context.write(new Text(language), article);
+    } else {
+      context.getCounter("wikipedia", "invalid articles").increment(1);
+      context.progress();
+    }
+  }
+  
+}

Reply via email to