Revert "ACCUMULO-1880 create mapreduce module" This reverts commit 99baad37e86739c701821e3c09d69434236d302a.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d62caadd Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d62caadd Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d62caadd Branch: refs/heads/master Commit: d62caadd28468ced53125a730fc7b05b94631686 Parents: 5bc1d08 Author: Josh Elser <josh.el...@gmail.com> Authored: Thu Dec 18 14:54:15 2014 -0500 Committer: Josh Elser <josh.el...@gmail.com> Committed: Thu Dec 18 19:28:49 2014 -0500 ---------------------------------------------------------------------- assemble/pom.xml | 4 - .../core/cli/MapReduceClientOnDefaultTable.java | 49 ++ .../cli/MapReduceClientOnRequiredTable.java | 53 ++ .../accumulo/core/cli/MapReduceClientOpts.java | 32 + .../core/client/mapred/AbstractInputFormat.java | 627 +++++++++++++++ .../client/mapred/AccumuloFileOutputFormat.java | 179 +++++ .../core/client/mapred/AccumuloInputFormat.java | 96 +++ .../mapred/AccumuloMultiTableInputFormat.java | 98 +++ .../client/mapred/AccumuloOutputFormat.java | 558 +++++++++++++ .../client/mapred/AccumuloRowInputFormat.java | 87 ++ .../core/client/mapred/InputFormatBase.java | 383 +++++++++ .../core/client/mapred/RangeInputSplit.java | 40 + .../client/mapreduce/AbstractInputFormat.java | 693 ++++++++++++++++ .../mapreduce/AccumuloFileOutputFormat.java | 177 ++++ .../client/mapreduce/AccumuloInputFormat.java | 81 ++ .../AccumuloMultiTableInputFormat.java | 101 +++ .../client/mapreduce/AccumuloOutputFormat.java | 563 +++++++++++++ .../mapreduce/AccumuloRowInputFormat.java | 79 ++ .../core/client/mapreduce/InputFormatBase.java | 384 +++++++++ .../core/client/mapreduce/InputTableConfig.java | 369 +++++++++ .../core/client/mapreduce/RangeInputSplit.java | 523 ++++++++++++ .../mapreduce/lib/impl/ConfiguratorBase.java | 404 ++++++++++ .../lib/impl/DistributedCacheHelper.java | 52 ++ .../lib/impl/FileOutputConfigurator.java | 187 +++++ .../mapreduce/lib/impl/InputConfigurator.java | 800 +++++++++++++++++++ .../mapreduce/lib/impl/OutputConfigurator.java | 204 +++++ .../client/mapreduce/lib/impl/package-info.java | 34 + .../lib/partition/KeyRangePartitioner.java | 60 ++ .../lib/partition/RangePartitioner.java | 136 ++++ .../mapred/AccumuloFileOutputFormatTest.java | 250 ++++++ .../client/mapred/AccumuloInputFormatTest.java | 340 ++++++++ .../AccumuloMultiTableInputFormatTest.java | 188 +++++ .../client/mapred/AccumuloOutputFormatTest.java | 203 +++++ .../mapred/AccumuloRowInputFormatTest.java | 206 +++++ .../core/client/mapred/TokenFileTest.java | 177 ++++ .../mapreduce/AccumuloFileOutputFormatTest.java | 239 ++++++ .../mapreduce/AccumuloInputFormatTest.java | 412 ++++++++++ .../AccumuloMultiTableInputFormatTest.java | 185 +++++ .../mapreduce/AccumuloOutputFormatTest.java | 198 +++++ .../mapreduce/AccumuloRowInputFormatTest.java | 202 +++++ .../BadPasswordSplitsAccumuloInputFormat.java | 42 + .../EmptySplitsAccumuloInputFormat.java | 45 ++ .../client/mapreduce/InputTableConfigTest.java | 119 +++ .../client/mapreduce/RangeInputSplitTest.java | 125 +++ .../core/client/mapreduce/TokenFileTest.java | 171 ++++ .../lib/impl/ConfiguratorBaseTest.java | 137 ++++ .../lib/partition/RangePartitionerTest.java | 82 ++ examples/simple/pom.xml | 4 - mapreduce/pom.xml | 67 -- .../core/cli/MapReduceClientOnDefaultTable.java | 49 -- .../cli/MapReduceClientOnRequiredTable.java | 53 -- .../accumulo/core/cli/MapReduceClientOpts.java | 32 - .../core/client/mapred/AbstractInputFormat.java | 627 --------------- .../client/mapred/AccumuloFileOutputFormat.java | 179 ----- .../core/client/mapred/AccumuloInputFormat.java | 96 --- .../mapred/AccumuloMultiTableInputFormat.java | 98 --- .../client/mapred/AccumuloOutputFormat.java | 558 ------------- .../client/mapred/AccumuloRowInputFormat.java | 87 -- .../core/client/mapred/InputFormatBase.java | 383 --------- .../core/client/mapred/RangeInputSplit.java | 40 - .../client/mapreduce/AbstractInputFormat.java | 693 ---------------- .../mapreduce/AccumuloFileOutputFormat.java | 177 ---- .../client/mapreduce/AccumuloInputFormat.java | 81 -- .../AccumuloMultiTableInputFormat.java | 101 --- .../client/mapreduce/AccumuloOutputFormat.java | 563 ------------- .../mapreduce/AccumuloRowInputFormat.java | 79 -- .../core/client/mapreduce/InputFormatBase.java | 384 --------- .../core/client/mapreduce/InputTableConfig.java | 369 --------- .../core/client/mapreduce/RangeInputSplit.java | 523 ------------ .../mapreduce/lib/impl/ConfiguratorBase.java | 404 ---------- .../lib/impl/DistributedCacheHelper.java | 52 -- .../lib/impl/FileOutputConfigurator.java | 187 ----- .../mapreduce/lib/impl/InputConfigurator.java | 800 ------------------- .../mapreduce/lib/impl/OutputConfigurator.java | 204 ----- .../client/mapreduce/lib/impl/package-info.java | 34 - .../lib/partition/KeyRangePartitioner.java | 60 -- .../lib/partition/RangePartitioner.java | 136 ---- mapreduce/src/main/resources/.gitignore | 0 .../mapred/AccumuloFileOutputFormatTest.java | 250 ------ .../client/mapred/AccumuloInputFormatTest.java | 340 -------- .../AccumuloMultiTableInputFormatTest.java | 188 ----- .../client/mapred/AccumuloOutputFormatTest.java | 203 ----- .../mapred/AccumuloRowInputFormatTest.java | 206 ----- .../core/client/mapred/TokenFileTest.java | 177 ---- .../mapreduce/AccumuloFileOutputFormatTest.java | 239 ------ .../mapreduce/AccumuloInputFormatTest.java | 412 ---------- .../AccumuloMultiTableInputFormatTest.java | 185 ----- .../mapreduce/AccumuloOutputFormatTest.java | 198 ----- .../mapreduce/AccumuloRowInputFormatTest.java | 202 ----- .../BadPasswordSplitsAccumuloInputFormat.java | 42 - .../EmptySplitsAccumuloInputFormat.java | 45 -- .../client/mapreduce/InputTableConfigTest.java | 119 --- .../client/mapreduce/RangeInputSplitTest.java | 125 --- .../core/client/mapreduce/TokenFileTest.java | 171 ---- .../lib/impl/ConfiguratorBaseTest.java | 137 ---- .../lib/partition/RangePartitionerTest.java | 82 -- mapreduce/src/test/resources/log4j.properties | 28 - pom.xml | 6 - test/pom.xml | 4 - 99 files changed, 10370 insertions(+), 10483 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/assemble/pom.xml ---------------------------------------------------------------------- diff --git a/assemble/pom.xml b/assemble/pom.xml index c764091..388a956 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -73,10 +73,6 @@ </dependency> <dependency> <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-mapreduce</artifactId> - </dependency> - <dependency> - <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-master</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java new file mode 100644 index 0000000..e7a3dd4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.cli; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.hadoop.mapreduce.Job; + +import com.beust.jcommander.Parameter; + +public class MapReduceClientOnDefaultTable extends MapReduceClientOpts { + @Parameter(names = "--table", description = "table to use") + public String tableName; + + public MapReduceClientOnDefaultTable(String table) { + this.tableName = table; + } + + public String getTableName() { + return tableName; + } + + @Override + public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { + super.setAccumuloConfigs(job); + AccumuloInputFormat.setConnectorInfo(job, principal, getToken()); + AccumuloInputFormat.setInputTableName(job, getTableName()); + AccumuloInputFormat.setScanAuthorizations(job, auths); + AccumuloOutputFormat.setConnectorInfo(job, principal, getToken()); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, getTableName()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java new file mode 100644 index 0000000..abfc17d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.cli; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.hadoop.mapreduce.Job; + +import com.beust.jcommander.Parameter; + +public class MapReduceClientOnRequiredTable extends MapReduceClientOpts { + @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") + private String tableName; + + @Parameter(names = {"-tf", "--tokenFile"}, description = "File in hdfs containing the user's authentication token create with \"bin/accumulo create-token\"") + private String tokenFile = ""; + + @Override + public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { + super.setAccumuloConfigs(job); + + if (tokenFile.isEmpty()) { + AccumuloInputFormat.setConnectorInfo(job, principal, getToken()); + AccumuloOutputFormat.setConnectorInfo(job, principal, getToken()); + } else { + AccumuloInputFormat.setConnectorInfo(job, principal, tokenFile); + AccumuloOutputFormat.setConnectorInfo(job, principal, tokenFile); + } + AccumuloInputFormat.setInputTableName(job, getTableName()); + AccumuloInputFormat.setScanAuthorizations(job, auths); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, getTableName()); + } + + public String getTableName() { + return tableName; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java new file mode 100644 index 0000000..4b3b7ed --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.cli; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.hadoop.mapreduce.Job; + +/** + * Adds some MR awareness to the ClientOpts + */ +public class MapReduceClientOpts extends ClientOpts { + public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { + AccumuloInputFormat.setZooKeeperInstance(job, this.getClientConfiguration()); + AccumuloOutputFormat.setZooKeeperInstance(job, this.getClientConfiguration()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java new file mode 100644 index 0000000..b83a024 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.OfflineScanner; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * An abstract input format to provide shared methods common to all other input format classes. At the very least, any classes inheriting from this class will + * need to define their own {@link RecordReader}. + */ +public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { + protected static final Class<?> CLASS = AccumuloInputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission) + * @param token + * the user's password + * @since 1.5.0 + */ + public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { + InputConfigurator.setConnectorInfo(CLASS, job, principal, token); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + * <p> + * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission) + * @param tokenFile + * the path to the token file + * @since 1.6.0 + */ + public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { + InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); + } + + /** + * Determines if the connector has been configured. + * + * @param job + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + */ + protected static Boolean isConnectorInfoSet(JobConf job) { + return InputConfigurator.isConnectorInfoSet(CLASS, job); + } + + /** + * Gets the user name from the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + */ + protected static String getPrincipal(JobConf job) { + return InputConfigurator.getPrincipal(CLASS, job); + } + + /** + * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. + * + * @param job + * the Hadoop context for the configured job + * @return the principal's authentication token + * @since 1.6.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + * @see #setConnectorInfo(JobConf, String, String) + */ + protected static AuthenticationToken getAuthenticationToken(JobConf job) { + return InputConfigurator.getAuthenticationToken(CLASS, job); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @param zooKeepers + * a comma-separated list of zookeeper servers + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + */ + @Deprecated + public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { + setZooKeeperInstance(job, new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param clientConfig + * client configuration containing connection options + * @since 1.6.0 + */ + public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { + InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(JobConf job, String instanceName) { + InputConfigurator.setMockInstance(CLASS, job, instanceName); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(JobConf, ClientConfiguration) + * @see #setMockInstance(JobConf, String) + */ + protected static Instance getInstance(JobConf job) { + return InputConfigurator.getInstance(CLASS, job); + } + + /** + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 + */ + public static void setLogLevel(JobConf job, Level level) { + InputConfigurator.setLogLevel(CLASS, job, level); + } + + /** + * Gets the log level from this configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(JobConf, Level) + */ + protected static Level getLogLevel(JobConf job) { + return InputConfigurator.getLogLevel(CLASS, job); + } + + /** + * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. + * + * @param job + * the Hadoop job instance to be configured + * @param auths + * the user's authorizations + * @since 1.5.0 + */ + public static void setScanAuthorizations(JobConf job, Authorizations auths) { + InputConfigurator.setScanAuthorizations(CLASS, job, auths); + } + + /** + * Gets the authorizations to set for the scans from the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the Accumulo scan authorizations + * @since 1.5.0 + * @see #setScanAuthorizations(JobConf, Authorizations) + */ + protected static Authorizations getScanAuthorizations(JobConf job) { + return InputConfigurator.getScanAuthorizations(CLASS, job); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return an Accumulo tablet locator + * @throws org.apache.accumulo.core.client.TableNotFoundException + * if the table name set on the configuration doesn't exist + * @since 1.6.0 + */ + protected static TabletLocator getTabletLocator(JobConf job, String tableId) throws TableNotFoundException { + return InputConfigurator.getTabletLocator(CLASS, job, tableId); + } + + // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) + /** + * Check whether a configuration is fully configured to be used with an Accumulo {@link InputFormat}. + * + * @param job + * the Hadoop context for the configured job + * @throws java.io.IOException + * if the context is improperly configured + * @since 1.5.0 + */ + protected static void validateOptions(JobConf job) throws IOException { + InputConfigurator.validateOptions(CLASS, job); + } + + /** + * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job. + * + * @param job + * the Hadoop job instance to be configured + * @return the {@link InputTableConfig} objects set on the job + * @since 1.6.0 + */ + public static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) { + return InputConfigurator.getInputTableConfigs(CLASS, job); + } + + /** + * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table. + * + * <p> + * null is returned in the event that the table doesn't exist. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table name for which to grab the config object + * @return the {@link InputTableConfig} for the given table + * @since 1.6.0 + */ + public static InputTableConfig getInputTableConfig(JobConf job, String tableName) { + return InputConfigurator.getInputTableConfig(CLASS, job, tableName); + } + + /** + * An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader} instances that convert from Accumulo + * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to the user's K/V types. + * + * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to update the following variables: + * <ul> + * <li>Key {@link #currentKey} (used for progress reporting)</li> + * <li>int {@link #numKeysRead} (used for progress reporting)</li> + * </ul> + */ + protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> { + protected long numKeysRead; + protected Iterator<Map.Entry<Key,Value>> scannerIterator; + protected RangeInputSplit split; + + /** + * Configures the iterators on a scanner for the given table name. + * + * @param job + * the Hadoop job configuration + * @param scanner + * the scanner for which to configure the iterators + * @param tableName + * the table name for which the scanner is configured + * @since 1.6.0 + */ + protected abstract void setupIterators(JobConf job, Scanner scanner, String tableName, RangeInputSplit split); + + /** + * Initialize a scanner over the given input split using this task attempt configuration. + */ + public void initialize(InputSplit inSplit, JobConf job) throws IOException { + Scanner scanner; + split = (RangeInputSplit) inSplit; + log.debug("Initializing input split: " + split.getRange()); + + Instance instance = split.getInstance(); + if (null == instance) { + instance = getInstance(job); + } + + String principal = split.getPrincipal(); + if (null == principal) { + principal = getPrincipal(job); + } + + AuthenticationToken token = split.getToken(); + if (null == token) { + token = getAuthenticationToken(job); + } + + Authorizations authorizations = split.getAuths(); + if (null == authorizations) { + authorizations = getScanAuthorizations(job); + } + + String table = split.getTableName(); + + // in case the table name changed, we can still use the previous name for terms of configuration, + // but the scanner will use the table id resolved at job setup time + InputTableConfig tableConfig = getInputTableConfig(job, split.getTableName()); + + Boolean isOffline = split.isOffline(); + if (null == isOffline) { + isOffline = tableConfig.isOfflineScan(); + } + + Boolean isIsolated = split.isIsolatedScan(); + if (null == isIsolated) { + isIsolated = tableConfig.shouldUseIsolatedScanners(); + } + + Boolean usesLocalIterators = split.usesLocalIterators(); + if (null == usesLocalIterators) { + usesLocalIterators = tableConfig.shouldUseLocalIterators(); + } + + List<IteratorSetting> iterators = split.getIterators(); + if (null == iterators) { + iterators = tableConfig.getIterators(); + } + + Collection<Pair<Text,Text>> columns = split.getFetchedColumns(); + if (null == columns) { + columns = tableConfig.getFetchedColumns(); + } + + try { + log.debug("Creating connector with user: " + principal); + log.debug("Creating scanner for table: " + table); + log.debug("Authorizations are: " + authorizations); + if (isOffline) { + scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); + } else if (instance instanceof MockInstance) { + scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations); + } else { + ClientContext context = new ClientContext(instance, new Credentials(principal, token), ClientConfiguration.loadDefault()); + scanner = new ScannerImpl(context, split.getTableId(), authorizations); + } + if (isIsolated) { + log.info("Creating isolated scanner"); + scanner = new IsolatedScanner(scanner); + } + if (usesLocalIterators) { + log.info("Using local iterators"); + scanner = new ClientSideIteratorScanner(scanner); + } + setupIterators(job, scanner, split.getTableName(), split); + } catch (Exception e) { + throw new IOException(e); + } + + // setup a scanner within the bounds of this split + for (Pair<Text,Text> c : columns) { + if (c.getSecond() != null) { + log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); + scanner.fetchColumn(c.getFirst(), c.getSecond()); + } else { + log.debug("Fetching column family " + c.getFirst()); + scanner.fetchColumnFamily(c.getFirst()); + } + } + + scanner.setRange(split.getRange()); + + numKeysRead = 0; + + // do this last after setting all scanner options + scannerIterator = scanner.iterator(); + } + + @Override + public void close() {} + + @Override + public long getPos() throws IOException { + return numKeysRead; + } + + @Override + public float getProgress() throws IOException { + if (numKeysRead > 0 && currentKey == null) + return 1.0f; + return split.getProgress(currentKey); + } + + protected Key currentKey = null; + + } + + Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableId, List<Range> ranges) throws TableNotFoundException, AccumuloException, + AccumuloSecurityException { + + Instance instance = getInstance(job); + Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job)); + + return InputConfigurator.binOffline(tableId, ranges, instance, conn); + } + + /** + * Read the metadata table to get tablets and match up ranges to them. + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + Level logLevel = getLogLevel(job); + log.setLevel(logLevel); + validateOptions(job); + + Random random = new Random(); + LinkedList<InputSplit> splits = new LinkedList<InputSplit>(); + Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job); + for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { + String tableName = tableConfigEntry.getKey(); + InputTableConfig tableConfig = tableConfigEntry.getValue(); + + Instance instance = getInstance(job); + boolean mockInstance; + String tableId; + // resolve table name to id once, and use id from this point forward + if (instance instanceof MockInstance) { + tableId = ""; + mockInstance = true; + } else { + try { + tableId = Tables.getTableId(instance, tableName); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + mockInstance = false; + } + + Authorizations auths = getScanAuthorizations(job); + String principal = getPrincipal(job); + AuthenticationToken token = getAuthenticationToken(job); + + boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); + List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); + if (ranges.isEmpty()) { + ranges = new ArrayList<Range>(1); + ranges.add(new Range()); + } + + // get the metadata information for these ranges + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); + TabletLocator tl; + try { + if (tableConfig.isOfflineScan()) { + binnedRanges = binOfflineTable(job, tableId, ranges); + while (binnedRanges == null) { + // Some tablets were still online, try again + UtilWaitThread.sleep(100 + random.nextInt(100)); // sleep randomly between 100 and 200 ms + binnedRanges = binOfflineTable(job, tableId, ranges); + } + } else { + tl = getTabletLocator(job, tableId); + // its possible that the cache could contain complete, but old information about a tables tablets... so clear it + tl.invalidateCache(); + + ClientContext context = new ClientContext(getInstance(job), new Credentials(getPrincipal(job), getAuthenticationToken(job)), + ClientConfiguration.loadDefault()); + while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + if (!(instance instanceof MockInstance)) { + if (!Tables.exists(instance, tableId)) + throw new TableDeletedException(tableId); + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); + } + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + UtilWaitThread.sleep(100 + random.nextInt(100)); // sleep randomly between 100 and 200 ms + tl.invalidateCache(); + } + } + } catch (Exception e) { + throw new IOException(e); + } + + HashMap<Range,ArrayList<String>> splitsToAdd = null; + + if (!autoAdjust) + splitsToAdd = new HashMap<Range,ArrayList<String>>(); + + HashMap<String,String> hostNameCache = new HashMap<String,String>(); + for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { + String ip = tserverBin.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getCanonicalHostName(); + hostNameCache.put(ip, location); + } + for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { + Range ke = extentRanges.getKey().toDataRange(); + for (Range r : extentRanges.getValue()) { + if (autoAdjust) { + // divide ranges into smaller ranges, based on the tablets + RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); + + split.setOffline(tableConfig.isOfflineScan()); + split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); + split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); + split.setMockInstance(mockInstance); + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + + splits.add(split); + } else { + // don't divide ranges + ArrayList<String> locations = splitsToAdd.get(r); + if (locations == null) + locations = new ArrayList<String>(1); + locations.add(location); + splitsToAdd.put(r, locations); + } + } + } + } + + if (!autoAdjust) + for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { + RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); + + split.setOffline(tableConfig.isOfflineScan()); + split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); + split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); + split.setMockInstance(mockInstance); + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + + splits.add(split); + } + } + + return splits.toArray(new InputSplit[splits.size()]); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java new file mode 100644 index 0000000..cfaaa58 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; +import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; + +/** + * This class allows MapReduce jobs to write output in the Accumulo data file format.<br /> + * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important requirement of Accumulo data files. + * + * <p> + * The output path to be created must be specified via {@link AccumuloFileOutputFormat#setOutputPath(JobConf, Path)}. This is inherited from + * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Other methods from {@link FileOutputFormat} are not supported and may be ignored or cause failures. + * Using other Hadoop configuration options that affect the behavior of the underlying files directly in the Job's configuration may work, but are not directly + * supported at this time. + */ +public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { + + private static final Class<?> CLASS = AccumuloFileOutputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); + + /** + * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been + * stored in the Job's configuration. + * + * @param job + * the Hadoop context for the configured job + * @since 1.5.0 + */ + protected static AccumuloConfiguration getAccumuloConfiguration(JobConf job) { + return FileOutputConfigurator.getAccumuloConfiguration(CLASS, job); + } + + /** + * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job. + * + * @param job + * the Hadoop job instance to be configured + * @param compressionType + * one of "none", "gz", "lzo", or "snappy" + * @since 1.5.0 + */ + public static void setCompressionType(JobConf job, String compressionType) { + FileOutputConfigurator.setCompressionType(CLASS, job, compressionType); + } + + /** + * Sets the size for data blocks within each file.<br /> + * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group. + * + * <p> + * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance). + * + * @param job + * the Hadoop job instance to be configured + * @param dataBlockSize + * the block size, in bytes + * @since 1.5.0 + */ + public static void setDataBlockSize(JobConf job, long dataBlockSize) { + FileOutputConfigurator.setDataBlockSize(CLASS, job, dataBlockSize); + } + + /** + * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system. + * + * @param job + * the Hadoop job instance to be configured + * @param fileBlockSize + * the block size, in bytes + * @since 1.5.0 + */ + public static void setFileBlockSize(JobConf job, long fileBlockSize) { + FileOutputConfigurator.setFileBlockSize(CLASS, job, fileBlockSize); + } + + /** + * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow + * index hierarchy within the file. This can affect the performance of queries. + * + * @param job + * the Hadoop job instance to be configured + * @param indexBlockSize + * the block size, in bytes + * @since 1.5.0 + */ + public static void setIndexBlockSize(JobConf job, long indexBlockSize) { + FileOutputConfigurator.setIndexBlockSize(CLASS, job, indexBlockSize); + } + + /** + * Sets the file system replication factor for the resulting file, overriding the file system default. + * + * @param job + * the Hadoop job instance to be configured + * @param replication + * the number of replicas for produced files + * @since 1.5.0 + */ + public static void setReplication(JobConf job, int replication) { + FileOutputConfigurator.setReplication(CLASS, job, replication); + } + + @Override + public RecordWriter<Key,Value> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + // get the path of the temporary output file + final Configuration conf = job; + final AccumuloConfiguration acuConf = getAccumuloConfiguration(job); + + final String extension = acuConf.get(Property.TABLE_FILE_TYPE); + final Path file = new Path(getWorkOutputPath(job), getUniqueName(job, "part") + "." + extension); + + final LRUMap validVisibilities = new LRUMap(ConfiguratorBase.getVisibilityCacheSize(conf)); + + return new RecordWriter<Key,Value>() { + FileSKVWriter out = null; + + @Override + public void close(Reporter reporter) throws IOException { + if (out != null) + out.close(); + } + + @Override + public void write(Key key, Value value) throws IOException { + + Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData()); + if (wasChecked == null) { + byte[] cv = key.getColumnVisibilityData().toArray(); + new ColumnVisibility(cv); + validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); + } + + if (out == null) { + out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf); + out.startDefaultLocalityGroup(); + } + out.append(key, value); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java new file mode 100644 index 0000000..18e286a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Level; + +/** + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides keys and values of type {@link Key} and + * {@link Value} to the Map function. + * + * The user must specify the following via static configurator methods: + * + * <ul> + * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} + * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} + * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)} + * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR + * {@link AccumuloInputFormat#setMockInstance(JobConf, String)} + * </ul> + * + * Other static methods are optional. + */ +public class AccumuloInputFormat extends InputFormatBase<Key,Value> { + + @Override + public RecordReader<Key,Value> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + log.setLevel(getLogLevel(job)); + + // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. + if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) { + org.apache.accumulo.core.client.mapreduce.RangeInputSplit risplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split; + Level level = risplit.getLogLevel(); + if (null != level) { + log.setLevel(level); + } + } + + RecordReaderBase<Key,Value> recordReader = new RecordReaderBase<Key,Value>() { + + @Override + public boolean next(Key key, Value value) throws IOException { + if (scannerIterator.hasNext()) { + ++numKeysRead; + Entry<Key,Value> entry = scannerIterator.next(); + key.set(currentKey = entry.getKey()); + value.set(entry.getValue().get()); + if (log.isTraceEnabled()) + log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true)); + return true; + } + return false; + } + + @Override + public Key createKey() { + return new Key(); + } + + @Override + public Value createValue() { + return new Value(); + } + + }; + recordReader.initialize(split, job); + return recordReader; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java new file mode 100644 index 0000000..bbafef5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * This class allows MapReduce jobs to use multiple Accumulo tables as the source of data. This {@link org.apache.hadoop.mapred.InputFormat} provides keys and + * values of type {@link Key} and {@link Value} to the Map function. + * + * The user must specify the following via static configurator methods: + * + * <ul> + * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)} + * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} + * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)} + * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)} + * <li>{@link AccumuloMultiTableInputFormat#setInputTableConfigs(org.apache.hadoop.mapred.JobConf, java.util.Map)} + * </ul> + * + * Other static methods are optional. + */ + +public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value> { + + /** + * Sets the {@link InputTableConfig} objects on the given Hadoop configuration + * + * @param job + * the Hadoop job instance to be configured + * @param configs + * the table query configs to be set on the configuration. + * @since 1.6.0 + */ + public static void setInputTableConfigs(JobConf job, Map<String,InputTableConfig> configs) { + InputConfigurator.setInputTableConfigs(CLASS, job, configs); + } + + @Override + public RecordReader<Key,Value> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + log.setLevel(getLogLevel(job)); + InputFormatBase.RecordReaderBase<Key,Value> recordReader = new InputFormatBase.RecordReaderBase<Key,Value>() { + + @Override + public boolean next(Key key, Value value) throws IOException { + if (scannerIterator.hasNext()) { + ++numKeysRead; + Map.Entry<Key,Value> entry = scannerIterator.next(); + key.set(currentKey = entry.getKey()); + value.set(entry.getValue().get()); + if (log.isTraceEnabled()) + log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true)); + return true; + } + return false; + } + + @Override + public Key createKey() { + return new Key(); + } + + @Override + public Value createValue() { + return new Value(); + } + + }; + recordReader.initialize(split, job); + return recordReader; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java new file mode 100644 index 0000000..a32a8b8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} accepts keys and values of type {@link Text} (for a table + * name) and {@link Mutation} from the Map and Reduce functions. + * + * The user must specify the following via static configurator methods: + * + * <ul> + * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} + * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, String)} + * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloOutputFormat#setMockInstance(JobConf, String)} + * </ul> + * + * Other static methods are optional. + */ +public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { + + private static final Class<?> CLASS = AccumuloOutputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(JobConf, boolean)} is set to true) + * @param token + * the user's password + * @since 1.5.0 + */ + public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { + OutputConfigurator.setConnectorInfo(CLASS, job, principal, token); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + * <p> + * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(JobConf, boolean)} is set to true) + * @param tokenFile + * the path to the password file + * @since 1.6.0 + */ + public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { + OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); + } + + /** + * Determines if the connector has been configured. + * + * @param job + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + */ + protected static Boolean isConnectorInfoSet(JobConf job) { + return OutputConfigurator.isConnectorInfoSet(CLASS, job); + } + + /** + * Gets the principal from the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + */ + protected static String getPrincipal(JobConf job) { + return OutputConfigurator.getPrincipal(CLASS, job); + } + + /** + * Gets the serialized token class from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead. + */ + @Deprecated + protected static String getTokenClass(JobConf job) { + return getAuthenticationToken(job).getClass().getName(); + } + + /** + * Gets the serialized token from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead. + */ + @Deprecated + protected static byte[] getToken(JobConf job) { + return AuthenticationTokenSerializer.serialize(getAuthenticationToken(job)); + } + + /** + * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. + * + * @param job + * the Hadoop job instance to be configured + * @return the principal's authentication token + * @since 1.6.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + * @see #setConnectorInfo(JobConf, String, String) + */ + protected static AuthenticationToken getAuthenticationToken(JobConf job) { + return OutputConfigurator.getAuthenticationToken(CLASS, job); + } + + /** + * Configures a {@link ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @param zooKeepers + * a comma-separated list of zookeeper servers + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + */ + + @Deprecated + public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { + setZooKeeperInstance(job, new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)); + } + + /** + * Configures a {@link ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * + * @param clientConfig + * client configuration for specifying connection timeouts, SSL connection options, etc. + * @since 1.6.0 + */ + public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { + OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); + } + + /** + * Configures a {@link MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(JobConf job, String instanceName) { + OutputConfigurator.setMockInstance(CLASS, job, instanceName); + } + + /** + * Initializes an Accumulo {@link Instance} based on the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(JobConf, ClientConfiguration) + * @see #setMockInstance(JobConf, String) + */ + protected static Instance getInstance(JobConf job) { + return OutputConfigurator.getInstance(CLASS, job); + } + + /** + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 + */ + public static void setLogLevel(JobConf job, Level level) { + OutputConfigurator.setLogLevel(CLASS, job, level); + } + + /** + * Gets the log level from this configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(JobConf, Level) + */ + protected static Level getLogLevel(JobConf job) { + return OutputConfigurator.getLogLevel(CLASS, job); + } + + /** + * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and + * underscores. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table to use when the tablename is null in the write call + * @since 1.5.0 + */ + public static void setDefaultTableName(JobConf job, String tableName) { + OutputConfigurator.setDefaultTableName(CLASS, job, tableName); + } + + /** + * Gets the default table name from the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the default table name + * @since 1.5.0 + * @see #setDefaultTableName(JobConf, String) + */ + protected static String getDefaultTableName(JobConf job) { + return OutputConfigurator.getDefaultTableName(CLASS, job); + } + + /** + * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is + * used. Setting the configuration multiple times overwrites any previous configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param bwConfig + * the configuration for the {@link BatchWriter} + * @since 1.5.0 + */ + public static void setBatchWriterOptions(JobConf job, BatchWriterConfig bwConfig) { + OutputConfigurator.setBatchWriterOptions(CLASS, job, bwConfig); + } + + /** + * Gets the {@link BatchWriterConfig} settings. + * + * @param job + * the Hadoop context for the configured job + * @return the configuration object + * @since 1.5.0 + * @see #setBatchWriterOptions(JobConf, BatchWriterConfig) + */ + protected static BatchWriterConfig getBatchWriterOptions(JobConf job) { + return OutputConfigurator.getBatchWriterOptions(CLASS, job); + } + + /** + * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores. + * + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setCreateTables(JobConf job, boolean enableFeature) { + OutputConfigurator.setCreateTables(CLASS, job, enableFeature); + } + + /** + * Determines whether tables are permitted to be created as needed. + * + * @param job + * the Hadoop context for the configured job + * @return true if the feature is disabled, false otherwise + * @since 1.5.0 + * @see #setCreateTables(JobConf, boolean) + */ + protected static Boolean canCreateTables(JobConf job) { + return OutputConfigurator.canCreateTables(CLASS, job); + } + + /** + * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. + * + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setSimulationMode(JobConf job, boolean enableFeature) { + OutputConfigurator.setSimulationMode(CLASS, job, enableFeature); + } + + /** + * Determines whether this feature is enabled. + * + * @param job + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setSimulationMode(JobConf, boolean) + */ + protected static Boolean getSimulationMode(JobConf job) { + return OutputConfigurator.getSimulationMode(CLASS, job); + } + + /** + * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. + */ + protected static class AccumuloRecordWriter implements RecordWriter<Text,Mutation> { + private MultiTableBatchWriter mtbw = null; + private HashMap<Text,BatchWriter> bws = null; + private Text defaultTableName = null; + + private boolean simulate = false; + private boolean createTables = false; + + private long mutCount = 0; + private long valCount = 0; + + private Connector conn; + + protected AccumuloRecordWriter(JobConf job) throws AccumuloException, AccumuloSecurityException, IOException { + Level l = getLogLevel(job); + if (l != null) + log.setLevel(getLogLevel(job)); + this.simulate = getSimulationMode(job); + this.createTables = canCreateTables(job); + + if (simulate) + log.info("Simulating output only. No writes to tables will occur"); + + this.bws = new HashMap<Text,BatchWriter>(); + + String tname = getDefaultTableName(job); + this.defaultTableName = (tname == null) ? null : new Text(tname); + + if (!simulate) { + this.conn = getInstance(job).getConnector(getPrincipal(job), getAuthenticationToken(job)); + mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(job)); + } + } + + /** + * Push a mutation into a table. If table is null, the defaultTable will be used. If canCreateTable is set, the table will be created if it does not exist. + * The table name must only contain alphanumerics and underscore. + */ + @Override + public void write(Text table, Mutation mutation) throws IOException { + if (table == null || table.toString().isEmpty()) + table = this.defaultTableName; + + if (!simulate && table == null) + throw new IOException("No table or default table specified. Try simulation mode next time"); + + ++mutCount; + valCount += mutation.size(); + printMutation(table, mutation); + + if (simulate) + return; + + if (!bws.containsKey(table)) + try { + addTable(table); + } catch (final Exception e) { + log.error("Could not add table '"+table.toString()+"'", e); + throw new IOException(e); + } + + try { + bws.get(table).addMutation(mutation); + } catch (MutationsRejectedException e) { + throw new IOException(e); + } + } + + public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { + if (simulate) { + log.info("Simulating adding table: " + tableName); + return; + } + + log.debug("Adding table: " + tableName); + BatchWriter bw = null; + String table = tableName.toString(); + + if (createTables && !conn.tableOperations().exists(table)) { + try { + conn.tableOperations().create(table); + } catch (AccumuloSecurityException e) { + log.error("Accumulo security violation creating " + table, e); + throw e; + } catch (TableExistsException e) { + // Shouldn't happen + } + } + + try { + bw = mtbw.getBatchWriter(table); + } catch (TableNotFoundException e) { + log.error("Accumulo table " + table + " doesn't exist and cannot be created.", e); + throw new AccumuloException(e); + } catch (AccumuloException e) { + throw e; + } catch (AccumuloSecurityException e) { + throw e; + } + + if (bw != null) + bws.put(tableName, bw); + } + + private int printMutation(Text table, Mutation m) { + if (log.isTraceEnabled()) { + log.trace(String.format("Table %s row key: %s", table, hexDump(m.getRow()))); + for (ColumnUpdate cu : m.getUpdates()) { + log.trace(String.format("Table %s column: %s:%s", table, hexDump(cu.getColumnFamily()), hexDump(cu.getColumnQualifier()))); + log.trace(String.format("Table %s security: %s", table, new ColumnVisibility(cu.getColumnVisibility()).toString())); + log.trace(String.format("Table %s value: %s", table, hexDump(cu.getValue()))); + } + } + return m.getUpdates().size(); + } + + private String hexDump(byte[] ba) { + StringBuilder sb = new StringBuilder(); + for (byte b : ba) { + if ((b > 0x20) && (b < 0x7e)) + sb.append((char) b); + else + sb.append(String.format("x%02x", b)); + } + return sb.toString(); + } + + @Override + public void close(Reporter reporter) throws IOException { + log.debug("mutations written: " + mutCount + ", values written: " + valCount); + if (simulate) + return; + + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + if (e.getAuthorizationFailuresMap().size() >= 0) { + HashMap<String,Set<SecurityErrorCode>> tables = new HashMap<String,Set<SecurityErrorCode>>(); + for (Entry<KeyExtent,Set<SecurityErrorCode>> ke : e.getAuthorizationFailuresMap().entrySet()) { + Set<SecurityErrorCode> secCodes = tables.get(ke.getKey().getTableId().toString()); + if (secCodes == null) { + secCodes = new HashSet<SecurityErrorCode>(); + tables.put(ke.getKey().getTableId().toString(), secCodes); + } + secCodes.addAll(ke.getValue()); + } + + log.error("Not authorized to write to tables : " + tables); + } + + if (e.getConstraintViolationSummaries().size() > 0) { + log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); + } + } + } + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + if (!isConnectorInfoSet(job)) + throw new IOException("Connector info has not been set."); + try { + // if the instance isn't configured, it will complain here + String principal = getPrincipal(job); + AuthenticationToken token = getAuthenticationToken(job); + Connector c = getInstance(job).getConnector(principal, token); + if (!c.securityOperations().authenticateUser(principal, token)) + throw new IOException("Unable to authenticate user"); + } catch (AccumuloException e) { + throw new IOException(e); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + @Override + public RecordWriter<Text,Mutation> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + try { + return new AccumuloRecordWriter(job); + } catch (Exception e) { + throw new IOException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d62caadd/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java new file mode 100644 index 0000000..673c5b8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides row names as {@link Text} as keys, and a + * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map function. + * + * The user must specify the following via static configurator methods: + * + * <ul> + * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} + * <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)} + * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)} + * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloRowInputFormat#setMockInstance(JobConf, String)} + * </ul> + * + * Other static methods are optional. + */ +public class AccumuloRowInputFormat extends InputFormatBase<Text,PeekingIterator<Entry<Key,Value>>> { + @Override + public RecordReader<Text,PeekingIterator<Entry<Key,Value>>> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + log.setLevel(getLogLevel(job)); + RecordReaderBase<Text,PeekingIterator<Entry<Key,Value>>> recordReader = new RecordReaderBase<Text,PeekingIterator<Entry<Key,Value>>>() { + RowIterator rowIterator; + + @Override + public void initialize(InputSplit inSplit, JobConf job) throws IOException { + super.initialize(inSplit, job); + rowIterator = new RowIterator(scannerIterator); + } + + @Override + public boolean next(Text key, PeekingIterator<Entry<Key,Value>> value) throws IOException { + if (!rowIterator.hasNext()) + return false; + value.initialize(rowIterator.next()); + numKeysRead = rowIterator.getKVCount(); + key.set((currentKey = value.peek().getKey()).getRow()); + return true; + } + + @Override + public Text createKey() { + return new Text(); + } + + @Override + public PeekingIterator<Entry<Key,Value>> createValue() { + return new PeekingIterator<Entry<Key,Value>>(); + } + }; + recordReader.initialize(split, job); + return recordReader; + } +}