ACCUMULO-375 added code to separate map jobs for parsing wikipedia and for ingesting into Accumulo
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1241579 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/a673727d Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/a673727d Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/a673727d Branch: refs/heads/1.4.5-SNAPSHOT Commit: a673727d6c09551d5a22b76bdbcb0dcc098573db Parents: 686895f Author: Adam Fuchs <afu...@apache.org> Authored: Tue Feb 7 19:48:39 2012 +0000 Committer: Adam Fuchs <afu...@apache.org> Committed: Tue Feb 7 19:48:39 2012 +0000 ---------------------------------------------------------------------- README.parallel | 65 +++++ ingest/bin/ingest_parallel.sh | 74 ++++++ ingest/conf/wikipedia_parallel.xml.example | 59 +++++ .../wikisearch/ingest/ArticleExtractor.java | 26 +- .../ingest/WikipediaConfiguration.java | 17 ++ .../ingest/WikipediaPartitionedIngester.java | 247 +++++++++++++++++++ .../ingest/WikipediaPartitionedMapper.java | 192 ++++++++++++++ .../wikisearch/ingest/WikipediaPartitioner.java | 108 ++++++++ 8 files changed, 787 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/README.parallel ---------------------------------------------------------------------- diff --git a/README.parallel b/README.parallel new file mode 100644 index 0000000..18cab71 --- /dev/null +++ b/README.parallel @@ -0,0 +1,65 @@ + + This project contains a sample application for ingesting and querying wikipedia data. + + + Ingest + ------ + + Prerequisites + ------------- + 1. Accumulo, Hadoop, and ZooKeeper must be installed and running + 2. ACCUMULO_HOME and ZOOKEEPER_HOME must be defined in the environment + 3. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory. + You will want to grab the files with the link name of pages-articles.xml.bz2 + + + INSTRUCTIONS + ------------ + 1. Copy the ingest/conf/wikipedia_parallel.xml.example to ingest/conf/wikipedia.xml and change it to specify Accumulo information. + 2. Copy the ingest/lib/wikisearch-*.jar and ingest/lib/protobuf*.jar to $ACCUMULO_HOME/lib/ext + 3. Then run ingest/bin/ingest_parallel.sh with one argument (the name of the directory in HDFS where the wikipedia XML + files reside) and this will kick off a MapReduce job to ingest the data into Accumulo. + + Query + ----- + + Prerequisites + ------------- + 1. The query software was tested using JBoss AS 6. Install this unless you feel like messing with the installation. + + NOTE: Ran into a bug (https://issues.jboss.org/browse/RESTEASY-531) that did not allow an EJB3.1 war file. The + workaround is to separate the RESTEasy servlet from the EJBs by creating an EJB jar and a WAR file. + + INSTRUCTIONS + ------------- + 1. Copy the query/src/main/resources/META-INF/ejb-jar.xml.example file to + query/src/main/resources/META-INF/ejb-jar.xml. Modify to the file to contain the same + information that you put into the wikipedia.xml file from the Ingest step above. + 2. Re-build the query distribution by running 'mvn package assembly:single' in the top-level directory. + 3. Untar the resulting file in the $JBOSS_HOME/server/default directory. + + $ cd $JBOSS_HOME/server/default + $ tar -xzf $ACCUMULO_HOME/src/examples/wikisearch/query/target/wikisearch-query*.tar.gz + + This will place the dependent jars in the lib directory and the EJB jar into the deploy directory. + 4. Next, copy the wikisearch*.war file in the query-war/target directory to $JBOSS_HOME/server/default/deploy. + 5. Start JBoss ($JBOSS_HOME/bin/run.sh) + 6. Use the Accumulo shell and give the user permissions for the wikis that you loaded, for example: + setauths -u <user> -s all,enwiki,eswiki,frwiki,fawiki + 7. Copy the following jars to the $ACCUMULO_HOME/lib/ext directory from the $JBOSS_HOME/server/default/lib directory: + + commons-lang*.jar + kryo*.jar + minlog*.jar + commons-jexl*.jar + google-collections*.jar + + 8. Copy the $JBOSS_HOME/server/default/deploy/wikisearch-query*.jar to $ACCUMULO_HOME/lib/ext. + + + 9. At this point you should be able to open a browser and view the page: http://localhost:8080/accumulo-wikisearch/ui/ui.jsp. + You can issue the queries using this user interface or via the following REST urls: <host>/accumulo-wikisearch/rest/Query/xml, + <host>/accumulo-wikisearch/rest/Query/html, <host>/accumulo-wikisearch/rest/Query/yaml, or <host>/accumulo-wikisearch/rest/Query/json. + There are two parameters to the REST service, query and auths. The query parameter is the same string that you would type + into the search box at ui.jsp, and the auths parameter is a comma-separated list of wikis that you want to search (i.e. + enwiki,frwiki,dewiki, etc. Or you can use all) http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/bin/ingest_parallel.sh ---------------------------------------------------------------------- diff --git a/ingest/bin/ingest_parallel.sh b/ingest/bin/ingest_parallel.sh new file mode 100755 index 0000000..e921494 --- /dev/null +++ b/ingest/bin/ingest_parallel.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +THIS_SCRIPT="$0" +SCRIPT_DIR="${THIS_SCRIPT%/*}" +SCRIPT_DIR=`cd $SCRIPT_DIR ; pwd` +echo $SCRIPT_DIR + +ACCUMULO_HOME=${ACCUMULO_HOME} +ZOOKEEPER_HOME=${ZOOKEEPER_HOME} + +# +# Check ZOOKEEPER_HOME +# +if [[ -z $ZOOKEEPER_HOME ]]; then + echo "You must set ZOOKEEPER_HOME environment variable" + exit -1; +else + for f in $ZOOKEEPER_HOME/zookeeper-*.jar; do + CLASSPATH=$f + break + done +fi + +# +# Check ACCUMULO_HOME +# +if [[ -z $ACCUMULO_HOME ]]; then + echo "You must set ACCUMULO_HOME environment variable" + exit -1; +else + for f in $ACCUMULO_HOME/lib/*.jar; do + CLASSPATH=${CLASSPATH}:$f + done +fi + +# +# Add our jars +# +for f in $SCRIPT_DIR/../lib/*.jar; do + CLASSPATH=${CLASSPATH}:$f +done + +# +# Transform the classpath into a comma-separated list also +# +LIBJARS=`echo $CLASSPATH | sed 's/:/,/g'` + + +# +# Map/Reduce job +# +JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.4.0-incubating-SNAPSHOT.jar +CONF=$SCRIPT_DIR/../conf/wikipedia.xml +HDFS_DATA_DIR=$1 +export HADOOP_CLASSPATH=$CLASSPATH +echo "hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaPartitionedIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}" +hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaPartitionedIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR} http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/conf/wikipedia_parallel.xml.example ---------------------------------------------------------------------- diff --git a/ingest/conf/wikipedia_parallel.xml.example b/ingest/conf/wikipedia_parallel.xml.example new file mode 100644 index 0000000..cf20f01 --- /dev/null +++ b/ingest/conf/wikipedia_parallel.xml.example @@ -0,0 +1,59 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>wikipedia.accumulo.zookeepers</name> + <value><!--zookeeper servers --></value> + </property> + <property> + <name>wikipedia.accumulo.instance_name</name> + <value><!--instance name --></value> + </property> + <property> + <name>wikipedia.accumulo.user</name> + <value><!--user name --></value> + </property> + <property> + <name>wikipedia.accumulo.password</name> + <value><!-- password --></value> + </property> + <property> + <name>wikipedia.accumulo.table</name> + <value><!--table name --></value> + </property> + <property> + <name>wikipedia.ingest.partitions</name> + <value><!--number of partitions --></value> + </property> + <property> + <name>wikipedia.partitioned.directory</name> + <value><!--hdfs directory for intemediate partitioned storage --></value> + </property> + <property> + <name>wikipedia.ingest.groups</name> + <value><!--the number of intermediate partition groups to generate --></value> + </property> + <property> + <name>wikipedia.run.partitioner</name> + <value><!--whether to run the partitioner step --></value> + </property> + <property> + <name>wikipedia.run.ingest</name> + <value><!--whether to run the ingest step --></value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java index 54e47b6..06d1670 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java @@ -16,6 +16,9 @@ */ package org.apache.accumulo.examples.wikisearch.ingest; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.io.Reader; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -29,6 +32,7 @@ import javax.xml.stream.XMLStreamReader; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.normalizer.NumberNormalizer; +import org.apache.hadoop.io.Writable; public class ArticleExtractor { @@ -37,13 +41,15 @@ public class ArticleExtractor { private static NumberNormalizer nn = new NumberNormalizer(); private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer(); - public static class Article { + public static class Article implements Writable { int id; String title; long timestamp; String comments; String text; + public Article(){} + private Article(int id, String title, long timestamp, String comments, String text) { super(); this.id = id; @@ -90,6 +96,24 @@ public class ArticleExtractor { fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments)); return fields; } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + title = in.readUTF(); + timestamp = in.readLong(); + comments = in.readUTF(); + text = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + out.writeUTF(title); + out.writeLong(timestamp); + out.writeUTF(comments); + out.writeUTF(text); + } } http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java index d76d713..5a0aad4 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java @@ -48,6 +48,11 @@ public class WikipediaConfiguration { public final static String NUM_GROUPS = "wikipedia.ingest.groups"; + public final static String PARTITIONED_ARTICLES_DIRECTORY = "wikipedia.partitioned.directory"; + + public final static String RUN_PARTITIONER = "wikipedia.run.partitioner"; + public final static String RUN_INGEST = "wikipedia.run.ingest"; + public static String getUser(Configuration conf) { return conf.get(USER); @@ -117,6 +122,18 @@ public class WikipediaConfiguration { return conf.getInt(NUM_GROUPS, 1); } + public static Path getPartitionedArticlesPath(Configuration conf) { + return new Path(conf.get(PARTITIONED_ARTICLES_DIRECTORY)); + } + + public static boolean runPartitioner(Configuration conf) { + return conf.getBoolean(RUN_PARTITIONER, false); + } + + public static boolean runIngest(Configuration conf) { + return conf.getBoolean(RUN_INGEST, true); + } + /** * Helper method to get properties from Hadoop configuration * http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java new file mode 100644 index 0000000..e7493dc --- /dev/null +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.examples.wikisearch.ingest; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; +import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner; +import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class WikipediaPartitionedIngester extends Configured implements Tool { + + public final static String INGEST_LANGUAGE = "wikipedia.ingest_language"; + public final static String SPLIT_FILE = "wikipedia.split_file"; + public final static String TABLE_NAME = "wikipedia.table"; + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new WikipediaPartitionedIngester(), args); + System.exit(res); + } + + private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + // Create the shard table + String indexTableName = tableName + "Index"; + String reverseIndexTableName = tableName + "ReverseIndex"; + String metadataTableName = tableName + "Metadata"; + + // create the shard table + if (!tops.exists(tableName)) { + // Set a text index combiner on the given field names. No combiner is set if the option is not supplied + String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME; + + tops.create(tableName); + if (textIndexFamilies.length() > 0) { + System.out.println("Adding content combiner on the fields: " + textIndexFamilies); + + IteratorSetting setting = new IteratorSetting(10, TextIndexCombiner.class); + List<Column> columns = new ArrayList<Column>(); + for (String family : StringUtils.split(textIndexFamilies, ',')) { + columns.add(new Column("fi\0" + family)); + } + TextIndexCombiner.setColumns(setting, columns); + TextIndexCombiner.setLossyness(setting, true); + + tops.attachIterator(tableName, setting, EnumSet.allOf(IteratorScope.class)); + } + + // Set the locality group for the full content column family + tops.setLocalityGroups(tableName, Collections.singletonMap("WikipediaDocuments", Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY)))); + + } + + if (!tops.exists(indexTableName)) { + tops.create(indexTableName); + // Add the UID combiner + IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", GlobalIndexUidCombiner.class); + GlobalIndexUidCombiner.setCombineAllColumns(setting, true); + GlobalIndexUidCombiner.setLossyness(setting, true); + tops.attachIterator(indexTableName, setting, EnumSet.allOf(IteratorScope.class)); + } + + if (!tops.exists(reverseIndexTableName)) { + tops.create(reverseIndexTableName); + // Add the UID combiner + IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", GlobalIndexUidCombiner.class); + GlobalIndexUidCombiner.setCombineAllColumns(setting, true); + GlobalIndexUidCombiner.setLossyness(setting, true); + tops.attachIterator(reverseIndexTableName, setting, EnumSet.allOf(IteratorScope.class)); + } + + if (!tops.exists(metadataTableName)) { + // Add the SummingCombiner with VARLEN encoding for the frequency column + tops.create(metadataTableName); + IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class); + SummingCombiner.setColumns(setting, Collections.singletonList(new Column("f"))); + SummingCombiner.setEncodingType(setting, SummingCombiner.Type.VARLEN); + tops.attachIterator(metadataTableName, setting, EnumSet.allOf(IteratorScope.class)); + } + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + if(WikipediaConfiguration.runPartitioner(conf)) + { + int result = runPartitionerJob(); + if(result != 0) + return result; + } + if(WikipediaConfiguration.runIngest(conf)) + return runIngestJob(); + return 0; + } + + public int runPartitionerJob() throws Exception + { + Job partitionerJob = new Job(getConf(), "Partition Wikipedia"); + Configuration partitionerConf = partitionerJob.getConfiguration(); + partitionerConf.set("mapred.map.tasks.speculative.execution", "false"); + + configurePartitionerJob(partitionerJob); + + List<Path> inputPaths = new ArrayList<Path>(); + SortedSet<String> languages = new TreeSet<String>(); + FileSystem fs = FileSystem.get(partitionerConf); + Path parent = new Path(partitionerConf.get("wikipedia.input")); + listFiles(parent, fs, inputPaths, languages); + + System.out.println("Input files in " + parent + ":" + inputPaths.size()); + Path[] inputPathsArray = new Path[inputPaths.size()]; + inputPaths.toArray(inputPathsArray); + + System.out.println("Languages:" + languages.size()); + + // setup input format + + WikipediaInputFormat.setInputPaths(partitionerJob, inputPathsArray); + + partitionerJob.setMapperClass(WikipediaPartitioner.class); + partitionerJob.setNumReduceTasks(0); + + // setup output format + partitionerJob.setMapOutputKeyClass(Text.class); + partitionerJob.setMapOutputValueClass(Article.class); + partitionerJob.setOutputFormatClass(SequenceFileOutputFormat.class); + Path outputDir = WikipediaConfiguration.getPartitionedArticlesPath(partitionerConf); + SequenceFileOutputFormat.setOutputPath(partitionerJob, outputDir); + + return partitionerJob.waitForCompletion(true) ? 0 : 1; + } + + public int runIngestJob() throws Exception + { + Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia"); + Configuration ingestConf = ingestJob.getConfiguration(); + ingestConf.set("mapred.map.tasks.speculative.execution", "false"); + + String tablename = WikipediaConfiguration.getTableName(ingestConf); + + String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); + String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); + + String user = WikipediaConfiguration.getUser(ingestConf); + byte[] password = WikipediaConfiguration.getPassword(ingestConf); + Connector connector = WikipediaConfiguration.getConnector(ingestConf); + + TableOperations tops = connector.tableOperations(); + + createTables(tops, tablename); + + // setup input format + ingestJob.setInputFormatClass(SequenceFileInputFormat.class); + SequenceFileInputFormat.setInputPaths(ingestJob, WikipediaConfiguration.getPartitionedArticlesPath(ingestConf)); + + // setup output format + ingestJob.setMapOutputKeyClass(Text.class); + ingestJob.setMapOutputValueClass(Mutation.class); + ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); + AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); + AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); + + return ingestJob.waitForCompletion(true) ? 0 : 1; + } + + public final static PathFilter partFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part"); + }; + }; + + protected void configurePartitionerJob(Job job) { + Configuration conf = job.getConfiguration(); + job.setJarByClass(WikipediaPartitionedIngester.class); + job.setInputFormatClass(WikipediaInputFormat.class); + conf.set(AggregatingRecordReader.START_TOKEN, "<page>"); + conf.set(AggregatingRecordReader.END_TOKEN, "</page>"); + } + + protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); + + protected void listFiles(Path path, FileSystem fs, List<Path> files, Set<String> languages) throws IOException { + for (FileStatus status : fs.listStatus(path)) { + if (status.isDir()) { + listFiles(status.getPath(), fs, files, languages); + } else { + Path p = status.getPath(); + Matcher matcher = filePattern.matcher(p.getName()); + if (matcher.matches()) { + languages.add(matcher.group(1)); + files.add(p); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java new file mode 100644 index 0000000..4d94c24 --- /dev/null +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.accumulo.examples.wikisearch.ingest; + + +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; +import org.apache.accumulo.examples.wikisearch.protobuf.Uid; +import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; +import org.apache.lucene.analysis.tokenattributes.TermAttribute; +import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutation> { + + private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class); + + public final static Charset UTF8 = Charset.forName("UTF-8"); + public static final String DOCUMENT_COLUMN_FAMILY = "d"; + public static final String METADATA_EVENT_COLUMN_FAMILY = "e"; + public static final String METADATA_INDEX_COLUMN_FAMILY = "i"; + public static final String TOKENS_FIELD_NAME = "TEXT"; + + private static final Value NULL_VALUE = new Value(new byte[0]); + private static final String cvPrefix = "all|"; + + private int numPartitions = 0; + + private Text tablename = null; + private Text indexTableName = null; + private Text reverseIndexTableName = null; + private Text metadataTableName = null; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + tablename = new Text(WikipediaConfiguration.getTableName(conf)); + indexTableName = new Text(tablename + "Index"); + reverseIndexTableName = new Text(tablename + "ReverseIndex"); + metadataTableName = new Text(tablename + "Metadata"); + + numPartitions = WikipediaConfiguration.getNumPartitions(conf); + } + + @Override + protected void map(Text language, Article article, Context context) throws IOException, InterruptedException { + String NULL_BYTE = "\u0000"; + String colfPrefix = language.toString() + NULL_BYTE; + String indexPrefix = "fi" + NULL_BYTE; + ColumnVisibility cv = new ColumnVisibility(cvPrefix + language); + + if (article != null) { + Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); + + // Create the mutations for the document. + // Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue + Mutation m = new Mutation(partitionId); + for (Entry<String,Object> entry : article.getFieldValues().entrySet()) { + m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); + // Create mutations for the metadata table. + Mutation mm = new Mutation(entry.getKey()); + mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp(), NULL_VALUE); + context.write(metadataTableName, mm); + } + + // Tokenize the content + Set<String> tokens = getTokens(article); + + // We are going to put the fields to be indexed into a multimap. This allows us to iterate + // over the entire set once. + Multimap<String,String> indexFields = HashMultimap.create(); + // Add the normalized field values + LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer(); + for (Entry<String,String> index : article.getNormalizedFieldValues().entrySet()) + indexFields.put(index.getKey(), index.getValue()); + // Add the tokens + for (String token : tokens) + indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token)); + + for (Entry<String,String> index : indexFields.entries()) { + // Create mutations for the in partition index + // Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id + m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE); + + // Create mutations for the global index + // Create a UID object for the Value + Builder uidBuilder = Uid.List.newBuilder(); + uidBuilder.setIGNORE(false); + uidBuilder.setCOUNT(1); + uidBuilder.addUID(Integer.toString(article.getId())); + Uid.List uidList = uidBuilder.build(); + Value val = new Value(uidList.toByteArray()); + + // Create mutations for the global index + // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object + Mutation gm = new Mutation(index.getValue()); + gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); + context.write(indexTableName, gm); + + // Create mutations for the global reverse index + Mutation grm = new Mutation(StringUtils.reverse(index.getValue())); + grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); + context.write(reverseIndexTableName, grm); + + // Create mutations for the metadata table. + Mutation mm = new Mutation(index.getKey()); + mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); + context.write(metadataTableName, mm); + + } + // Add the entire text to the document section of the table. + // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document + m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(), new Value(Base64.encodeBase64(article.getText().getBytes()))); + context.write(tablename, m); + + } else { + context.getCounter("wikipedia", "invalid articles").increment(1); + } + context.progress(); + } + + /** + * Tokenize the wikipedia content + * + * @param article + * @return + * @throws IOException + */ + private Set<String> getTokens(Article article) throws IOException { + Set<String> tokenList = new HashSet<String>(); + WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); + TermAttribute term = tok.addAttribute(TermAttribute.class); + try { + while (tok.incrementToken()) { + String token = term.term(); + if (!StringUtils.isEmpty(token)) + tokenList.add(token); + } + } catch (IOException e) { + log.error("Error tokenizing text", e); + } finally { + try { + tok.end(); + } catch (IOException e) { + log.error("Error calling end()", e); + } finally { + try { + tok.close(); + } catch (IOException e) { + log.error("Error closing tokenizer", e); + } + } + } + return tokenList; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/a673727d/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java new file mode 100644 index 0000000..82af9fd --- /dev/null +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.accumulo.examples.wikisearch.ingest; + + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.IllegalFormatException; +import java.util.Map.Entry; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; +import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; +import org.apache.accumulo.examples.wikisearch.protobuf.Uid; +import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.log4j.Logger; +import org.apache.lucene.analysis.tokenattributes.TermAttribute; +import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +public class WikipediaPartitioner extends Mapper<LongWritable,Text,Text,Article> { + + private static final Logger log = Logger.getLogger(WikipediaPartitioner.class); + + public final static Charset UTF8 = Charset.forName("UTF-8"); + public static final String DOCUMENT_COLUMN_FAMILY = "d"; + public static final String METADATA_EVENT_COLUMN_FAMILY = "e"; + public static final String METADATA_INDEX_COLUMN_FAMILY = "i"; + public static final String TOKENS_FIELD_NAME = "TEXT"; + + private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); + + private ArticleExtractor extractor; + private String language; + + private int myGroup = -1; + private int numGroups = -1; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + + WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit(); + myGroup = wiSplit.getPartition(); + numGroups = WikipediaConfiguration.getNumGroups(conf); + + FileSplit split = wiSplit.getFileSplit(); + String fileName = split.getPath().getName(); + Matcher matcher = languagePattern.matcher(fileName); + if (matcher.matches()) { + language = matcher.group(1).replace('_', '-').toLowerCase(); + } else { + throw new RuntimeException("Unknown ingest language! " + fileName); + } + extractor = new ArticleExtractor(); + } + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8)); + if (article != null) { + int groupId = WikipediaMapper.getPartitionId(article, numGroups); + if(groupId != myGroup) + return; + context.write(new Text(language), article); + } else { + context.getCounter("wikipedia", "invalid articles").increment(1); + context.progress(); + } + } + +}