This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e230d8bac0e54ae96f2de69c560f6ff605d7e84c
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Wed Jun 24 20:46:44 2020 +0800

    Fix dependency issue in HDP3 and revert cod change to HBase bulk load
---
 core-metrics/pom.xml                               |   1 +
 pom.xml                                            |  19 +-
 server-base/pom.xml                                |   2 +-
 server/pom.xml                                     |   2 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  14 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  22 +-
 .../storage/hbase/steps/HFileOutputFormat3.java    | 818 ++++++++++++---------
 stream-receiver/pom.xml                            |   4 +-
 8 files changed, 524 insertions(+), 358 deletions(-)

diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml
index eca9e5e..90056bb 100644
--- a/core-metrics/pom.xml
+++ b/core-metrics/pom.xml
@@ -48,6 +48,7 @@
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <!-- Env & Test -->
diff --git a/pom.xml b/pom.xml
index 77e2338..7ffe25f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -594,7 +594,7 @@
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
-                <scope>provided</scope>
+                <scope>compile</scope>
             </dependency>
             <dependency>
                 <groupId>com.jcraft</groupId>
@@ -704,11 +704,23 @@
                 <groupId>org.apache.hive</groupId>
                 <artifactId>hive-jdbc</artifactId>
                 <version>${hive.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>jetty-runner</artifactId>
+                        <groupId>org.eclipse.jetty</groupId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.apache.hive.hcatalog</groupId>
                 <artifactId>hive-hcatalog-core</artifactId>
                 <version>${hive-hcatalog.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>jetty-runner</artifactId>
+                        <groupId>org.eclipse.jetty</groupId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <!-- Yarn dependencies -->
             <dependency>
@@ -1084,11 +1096,6 @@
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
-                <artifactId>jetty-runner</artifactId>
-                <version>${jetty.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.eclipse.jetty</groupId>
                 <artifactId>jetty-util</artifactId>
                 <version>${jetty.version}</version>
                 <scope>test</scope>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 09a6858..cf8c380 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -266,7 +266,7 @@
         <dependency>
             <groupId>commons-configuration</groupId>
             <artifactId>commons-configuration</artifactId>
-            <scope>provided</scope>
+            <scope>compile</scope>
         </dependency>
     </dependencies>
 
diff --git a/server/pom.xml b/server/pom.xml
index dc7bb44..19c013c 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -35,7 +35,7 @@
         <dependency>
             <groupId>commons-configuration</groupId>
             <artifactId>commons-configuration</artifactId>
-            <scope>provided</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index b26f336..d961849 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.storage.hbase.steps;
 
@@ -135,7 +135,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
         HTable htable = (HTable) 
conn.getTable(TableName.valueOf(hbaseTableName));
 
-        HFileOutputFormat3.configureIncrementalLoadMap(job, htable);
+        HFileOutputFormat3.configureIncrementalLoadMap(job, 
htable.getDescriptor());
 
         logger.info("Saving HBase configuration to {}", hbaseConfPath);
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
@@ -160,7 +160,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     }
 
     public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, 
Double> cubeSizeMap,
-            final KylinConfig kylinConfig, final CubeSegment cubeSegment, 
final Path hfileSplitsOutputFolder)
+                                                               final 
KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path 
hfileSplitsOutputFolder)
             throws IOException {
 
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
@@ -258,7 +258,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     }
 
     protected static void saveHFileSplits(final List<HashMap<Long, Double>> 
innerRegionSplits, int mbPerRegion,
-            final Path outputFolder, final KylinConfig kylinConfig) throws 
IOException {
+                                          final Path outputFolder, final 
KylinConfig kylinConfig) throws IOException {
 
         if (outputFolder == null) {
             logger.warn("outputFolder for hfile split file is null, skip inner 
region split");
@@ -346,4 +346,4 @@ public class CreateHTableJob extends AbstractHadoopJob {
         int exitCode = ToolRunner.run(new CreateHTableJob(), args);
         System.exit(exitCode);
     }
-}
+}
\ No newline at end of file
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 28752ca..1e0e216 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -6,20 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.storage.hbase.steps;
 
-import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
-
 import java.io.IOException;
 import java.util.Collection;
 
@@ -54,6 +52,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
+
 /**
  * @author George Song (ysong1)
  */
@@ -61,7 +61,6 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     protected static final Logger logger = 
LoggerFactory.getLogger(CubeHFileJob.class);
 
-    @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
 
@@ -84,7 +83,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
-            // construct configuration for the MR job cluster
+            // use current hbase configuration
             Configuration configuration = new 
Configuration(HBaseConnection.getCurrentHBaseConfiguration());
             String[] allServices = getAllServices(configuration);
             merge(configuration, getConf());
@@ -110,7 +109,6 @@ public class CubeHFileJob extends AbstractHadoopJob {
             RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(hTableName));
             // Automatic config !
             HFileOutputFormat3.configureIncrementalLoad(job, table, 
regionLocator);
-            HFileOutputFormat3.configureHConnection(job, hbaseConf, 
getJobTempDir());
             reconfigurePartitions(hbaseConf, partitionFilePath);
 
             job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -121,7 +119,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class);
 
             // set block replication to 3 for hfiles
-            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
             this.deletePath(job.getConfiguration(), output);
 
@@ -173,9 +171,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     private String[] getAllServices(Configuration hbaseConf) {
         Collection<String> hbaseHdfsServices
-            = 
hbaseConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
+                = 
hbaseConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
         Collection<String> mainNameServices
-            = 
getConf().getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
+                = 
getConf().getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
         mainNameServices.addAll(hbaseHdfsServices);
         return mainNameServices.toArray(new String[0]);
     }
@@ -185,4 +183,4 @@ public class CubeHFileJob extends AbstractHadoopJob {
         System.exit(exitCode);
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 2f139b5..8579ded 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -14,49 +14,52 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
+
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.File;
-import java.io.FileOutputStream;
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.HStoreFile;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -66,17 +69,21 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
 import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
+import org.apache.hadoop.hbase.mapreduce.CellSerialization;
+import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
+import org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -89,71 +96,134 @@ import 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.kylin.shaded.com.google.common.base.Strings;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
- * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, 
with fix attempt on KYLIN-2788
- *
+ * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, 
with fix attempt on KYLIN-4293|HBASE-22887
+ * <p>
  * Writes HFiles. Passed Cells must arrive in order.
  * Writes current time as the sequence id for the file. Sets the major 
compacted
  * attribute on created @{link {@link HFile}s. Calling write(null,null) will 
forcibly roll
  * all HFiles being written.
  * <p>
  * Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}.
+ * using {@link #configureIncrementalLoad(Job, TableDescriptor, 
RegionLocator)}.
  */
 @InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable, Cell> {
-    static Log LOG = LogFactory.getLog(HFileOutputFormat3.class);
+public class HFileOutputFormat3
+        extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HFileOutputFormat3.class);
+
+    static class TableInfo {
+        private TableDescriptor tableDesctiptor;
+        private RegionLocator regionLocator;
+
+        public TableInfo(TableDescriptor tableDesctiptor, RegionLocator 
regionLocator) {
+            this.tableDesctiptor = tableDesctiptor;
+            this.regionLocator = regionLocator;
+        }
+
+        /**
+         * The modification for the returned HTD doesn't affect the inner TD.
+         *
+         * @return A clone of inner table descriptor
+         * @deprecated use {@link #getTableDescriptor}
+         */
+        @Deprecated
+        public HTableDescriptor getHTableDescriptor() {
+            return new HTableDescriptor(tableDesctiptor);
+        }
+
+        public TableDescriptor getTableDescriptor() {
+            return tableDesctiptor;
+        }
+
+        public RegionLocator getRegionLocator() {
+            return regionLocator;
+        }
+    }
+
+    protected static final byte[] tableSeparator = Bytes.toBytes(";");
+
+    protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] 
suffix) {
+        return Bytes.add(tableName, tableSeparator, suffix);
+    }
 
     // The following constants are private since these are used by
     // HFileOutputFormat2 to internally transfer data between job setup and
     // reducer run using conf.
     // These should not be changed by the client.
-    private static final String COMPRESSION_FAMILIES_CONF_KEY = 
"hbase.hfileoutputformat.families.compression";
-    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = 
"hbase.hfileoutputformat.families.bloomtype";
-    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = 
"hbase.mapreduce.hfileoutputformat.blocksize";
-    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 
"hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+    static final String COMPRESSION_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.compression";
+    static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.bloomtype";
+    static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.blocksize";
+    static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
 
     // This constant is public since the client can modify this when setting
     // up their conf object and thus refer to this symbol.
     // It is present for backwards compatibility reasons. Use it only to
     // override the auto-detection of datablock encoding.
-    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
+    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
     /**
      * Keep locality while generating HFiles for bulkload. See HBASE-12596
      */
-    public static final String LOCALITY_SENSITIVE_CONF_KEY = 
"hbase.bulkload.locality.sensitive.enabled";
+    public static final String LOCALITY_SENSITIVE_CONF_KEY =
+            "hbase.bulkload.locality.sensitive.enabled";
     private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
-    private static final String OUTPUT_TABLE_NAME_CONF_KEY = 
"hbase.mapreduce.hfileoutputformat.table.name";
+    static final String OUTPUT_TABLE_NAME_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.table.name";
+    static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+            "hbase.mapreduce.use.multi.table.hfileoutputformat";
 
-    private static final String BULKLOAD_HCONNECTION_CONF_KEY = 
"hbase.bulkload.hconnection.configuration";
+    public static final String STORAGE_POLICY_PROPERTY = 
HStore.BLOCK_STORAGE_POLICY_KEY;
+    public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = 
STORAGE_POLICY_PROPERTY + ".";
 
     @Override
-    public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final 
TaskAttemptContext context)
-            throws IOException, InterruptedException {
+    public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
+            final TaskAttemptContext context) throws IOException, 
InterruptedException {
         return createRecordWriter(context, this.getOutputCommitter(context));
     }
 
-    static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> 
createRecordWriter(final TaskAttemptContext context,
-            final OutputCommitter committer) throws IOException, 
InterruptedException {
+    protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, 
byte[] family) {
+        return combineTableNameSuffix(tableName, family);
+    }
+
+    static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+    createRecordWriter(final TaskAttemptContext context, final OutputCommitter 
committer)
+            throws IOException {
 
         // Get the path of the temporary output file
-        final Path outputdir = ((FileOutputCommitter) committer).getWorkPath();
+        final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
         final Configuration conf = context.getConfiguration();
-        LOG.debug("Task output path: " + outputdir);
-        final FileSystem fs = outputdir.getFileSystem(conf);
+        final boolean writeMultipleTables = 
conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+        final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+        if (writeTableNames == null || writeTableNames.isEmpty()) {
+            throw new IllegalArgumentException("Configuration parameter " + 
OUTPUT_TABLE_NAME_CONF_KEY
+                    + " cannot be empty");
+        }
+        final FileSystem fs = outputDir.getFileSystem(conf);
         // These configs. are from hbase-*.xml
-        final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, 
HConstants.DEFAULT_MAX_FILE_SIZE);
+        final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+                HConstants.DEFAULT_MAX_FILE_SIZE);
         // Invented config.  Add to hbase-*.xml if other than default 
compression.
-        final String defaultCompressionStr = conf.get("hfile.compression", 
Compression.Algorithm.NONE.getName());
-        final Algorithm defaultCompression = 
HFileWriterImpl.compressionByName(defaultCompressionStr);
-        final boolean compactionExclude = 
conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
-                false);
+        final String defaultCompressionStr = conf.get("hfile.compression",
+                Compression.Algorithm.NONE.getName());
+        final Algorithm defaultCompression = HFileWriterImpl
+                .compressionByName(defaultCompressionStr);
+        final boolean compactionExclude = conf.getBoolean(
+                "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+        final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
+                Bytes.toString(tableSeparator))).collect(Collectors.toSet());
 
         // create a map from column family to the compression algorithm
         final Map<byte[], Algorithm> compressionMap = 
createFamilyCompressionMap(conf);
@@ -161,7 +231,8 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
         final Map<byte[], Integer> blockSizeMap = 
createFamilyBlockSizeMap(conf);
 
         String dataBlockEncodingStr = 
conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
-        final Map<byte[], DataBlockEncoding> datablockEncodingMap = 
createFamilyDataBlockEncodingMap(conf);
+        final Map<byte[], DataBlockEncoding> datablockEncodingMap
+                = createFamilyDataBlockEncodingMap(conf);
         final DataBlockEncoding overriddenEncoding;
         if (dataBlockEncodingStr != null) {
             overriddenEncoding = 
DataBlockEncoding.valueOf(dataBlockEncodingStr);
@@ -169,55 +240,83 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
             overriddenEncoding = null;
         }
 
-        final Configuration hConnectionConf = getConfigureHConnection(conf);
-        
         return new RecordWriter<ImmutableBytesWritable, V>() {
             // Map of families to writers and how much has been output on the 
writer.
-            private final Map<byte[], WriterLength> writers = new 
TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
-            private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
-            private final byte[] now = 
Bytes.toBytes(System.currentTimeMillis());
-            private boolean rollRequested = false;
+            private final Map<byte[], WriterLength> writers =
+                    new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            private final Map<byte[], byte[]> previousRows =
+                    new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            private final long now = EnvironmentEdgeManager.currentTime();
 
             @Override
-            public void write(ImmutableBytesWritable row, V cell) throws 
IOException {
-                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            public void write(ImmutableBytesWritable row, V cell)
+                    throws IOException {
+                Cell kv = cell;
+                // null input == user explicitly wants to flush
                 if (row == null && kv == null) {
-                    rollWriters();
+                    rollWriters(null);
                     return;
                 }
+
                 byte[] rowKey = CellUtil.cloneRow(kv);
-                long length = kv.getLength();
+                int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - 
Bytes.SIZEOF_INT;
                 byte[] family = CellUtil.cloneFamily(kv);
-                WriterLength wl = this.writers.get(family);
-                if (wl == null) {
-                    fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+                byte[] tableNameBytes = null;
+                if (writeMultipleTables) {
+                    tableNameBytes = 
HFileOutputFormat3.getTableName(row.get());
+                    if 
(!allTableNames.contains(Bytes.toString(tableNameBytes))) {
+                        throw new IllegalArgumentException("TableName '" + 
Bytes.toString(tableNameBytes) +
+                                "' not" + " expected");
+                    }
+                } else {
+                    tableNameBytes = Bytes.toBytes(writeTableNames);
                 }
-                if (wl != null && wl.written + length >= maxsize) {
-                    this.rollRequested = true;
+                byte[] tableAndFamily = 
getTableNameSuffixedWithFamily(tableNameBytes, family);
+                WriterLength wl = this.writers.get(tableAndFamily);
+
+                // If this is a new column family, verify that the directory 
exists
+                if (wl == null) {
+                    Path writerPath = null;
+                    if (writeMultipleTables) {
+                        writerPath = new Path(outputDir, new 
Path(Bytes.toString(tableNameBytes), Bytes
+                                .toString(family)));
+                    } else {
+                        writerPath = new Path(outputDir, 
Bytes.toString(family));
+                    }
+                    fs.mkdirs(writerPath);
+                    configureStoragePolicy(conf, fs, tableAndFamily, 
writerPath);
                 }
-                if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) 
!= 0) {
-                    rollWriters();
+
+                // This can only happen once a row is finished though
+                if (wl != null && wl.written + length >= maxsize
+                        && Bytes.compareTo(this.previousRows.get(family), 
rowKey) != 0) {
+                    rollWriters(wl);
                 }
+
+                // create a new WAL writer, if necessary
                 if (wl == null || wl.writer == null) {
                     if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
                         HRegionLocation loc = null;
-                        String tableName = 
conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+
+                        String tableName = Bytes.toString(tableNameBytes);
                         if (tableName != null) {
-                            try (Connection connection = 
ConnectionFactory.createConnection(hConnectionConf);
-                                    RegionLocator locator = 
connection.getRegionLocator(TableName.valueOf(tableName))) {
+                            try (Connection connection = 
ConnectionFactory.createConnection(conf);
+                                 RegionLocator locator =
+                                         
connection.getRegionLocator(TableName.valueOf(tableName))) {
                                 loc = locator.getRegionLocation(rowKey);
                             } catch (Throwable e) {
-                                LOG.warn("there's something wrong when 
locating rowkey: " + Bytes.toString(rowKey), e);
+                                LOG.warn("There's something wrong when 
locating rowkey: " +
+                                        Bytes.toString(rowKey) + " for 
tablename: " + tableName, e);
                                 loc = null;
                             }
                         }
 
                         if (null == loc) {
                             if (LOG.isTraceEnabled()) {
-                                LOG.trace("failed to get region location, so 
use default writer: " +
+                                LOG.trace("failed to get region location, so 
use default writer for rowkey: " +
                                         Bytes.toString(rowKey));
                             }
-                            wl = getNewWriter(family, conf, null);
+                            wl = getNewWriter(tableNameBytes, family, conf, 
null);
                         } else {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("first rowkey: [" + 
Bytes.toString(rowKey) + "]");
@@ -229,86 +328,124 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
                                     LOG.trace("failed to resolve bind address: 
" + loc.getHostname() + ":"
                                             + loc.getPort() + ", so use 
default writer");
                                 }
-                                wl = getNewWriter(family, conf, null);
+                                wl = getNewWriter(tableNameBytes, family, 
conf, null);
                             } else {
-                                if(LOG.isDebugEnabled()) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.debug("use favored nodes writer: " + 
initialIsa.getHostString());
                                 }
-                                wl = getNewWriter(family, conf, new 
InetSocketAddress[] { initialIsa });
+                                wl = getNewWriter(tableNameBytes, family, 
conf, new InetSocketAddress[]{initialIsa
+                                });
                             }
                         }
                     } else {
-                        wl = getNewWriter(family, conf, null);
+                        wl = getNewWriter(tableNameBytes, family, conf, null);
                     }
                 }
-                kv.updateLatestStamp(this.now);
+
+                // we now have the proper WAL writer. full steam ahead
+                PrivateCellUtil.updateLatestStamp(cell, this.now);
                 wl.writer.append(kv);
                 wl.written += length;
-                this.previousRow = rowKey;
+
+                // Copy the row so we know when a row transition.
+                this.previousRows.put(family, rowKey);
             }
 
-            private void rollWriters() throws IOException {
-                for (WriterLength wl : this.writers.values()) {
-                    if (wl.writer != null) {
-                        LOG.info("Writer=" + wl.writer.getPath() + 
((wl.written == 0) ? "" : ", wrote=" + wl.written));
-                        close(wl.writer);
+            private void rollWriters(WriterLength writerLength) throws 
IOException {
+                if (writerLength != null) {
+                    closeWriter(writerLength);
+                } else {
+                    for (WriterLength wl : this.writers.values()) {
+                        closeWriter(wl);
                     }
-                    wl.writer = null;
-                    wl.written = 0;
                 }
-                this.rollRequested = false;
             }
 
-            @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important")
-            private WriterLength getNewWriter(byte[] family, Configuration 
conf, InetSocketAddress[] favoredNodes)
-                    throws IOException {
+            private void closeWriter(WriterLength wl) throws IOException {
+                if (wl.writer != null) {
+                    LOG.info(
+                            "Writer=" + wl.writer.getPath() + ((wl.written == 
0) ? "" : ", wrote=" + wl.written));
+                    close(wl.writer);
+                }
+                wl.writer = null;
+                wl.written = 0;
+            }
+
+            /*
+             * Create a new StoreFile.Writer.
+             * @param family
+             * @return A WriterLength, containing a new StoreFile.Writer.
+             * @throws IOException
+             */
+            @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"BX_UNBOXING_IMMEDIATELY_REBOXED",
+                    justification = "Not important")
+            private WriterLength getNewWriter(byte[] tableName, byte[] family, 
Configuration
+                    conf, InetSocketAddress[] favoredNodes) throws IOException 
{
+                byte[] tableAndFamily = 
getTableNameSuffixedWithFamily(tableName, family);
+                Path familydir = new Path(outputDir, Bytes.toString(family));
+                if (writeMultipleTables) {
+                    familydir = new Path(outputDir,
+                            new Path(Bytes.toString(tableName), 
Bytes.toString(family)));
+                }
                 WriterLength wl = new WriterLength();
-                Path familydir = new Path(outputdir, Bytes.toString(family));
-                Algorithm compression = compressionMap.get(family);
+                Algorithm compression = compressionMap.get(tableAndFamily);
                 compression = compression == null ? defaultCompression : 
compression;
-                BloomType bloomType = bloomTypeMap.get(family);
+                BloomType bloomType = bloomTypeMap.get(tableAndFamily);
                 bloomType = bloomType == null ? BloomType.NONE : bloomType;
-                Integer blockSize = blockSizeMap.get(family);
+                Integer blockSize = blockSizeMap.get(tableAndFamily);
                 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : 
blockSize;
                 DataBlockEncoding encoding = overriddenEncoding;
-                encoding = encoding == null ? datablockEncodingMap.get(family) 
: encoding;
+                encoding = encoding == null ? 
datablockEncodingMap.get(tableAndFamily) : encoding;
                 encoding = encoding == null ? DataBlockEncoding.NONE : 
encoding;
                 Configuration tempConf = new Configuration(conf);
                 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-                HFileContextBuilder contextBuilder = new 
HFileContextBuilder().withCompression(compression)
+                HFileContextBuilder contextBuilder = new HFileContextBuilder()
+                        .withCompression(compression)
                         .withChecksumType(HStore.getChecksumType(conf))
-                        
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
+                        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+                        .withBlockSize(blockSize);
+
+                if (HFile.getFormatVersion(conf) >= 
HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+                    contextBuilder.withIncludesTags(true);
+                }
+
                 contextBuilder.withDataBlockEncoding(encoding);
                 HFileContext hFileContext = contextBuilder.build();
-
                 if (null == favoredNodes) {
-                    StoreFileWriter.Builder writerBuilder = new 
StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
-                    wl.writer = 
writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
-                            .withComparator(new 
CellComparatorImpl.MetaCellComparator()).withFileContext(hFileContext).build();
+                    wl.writer =
+                            new StoreFileWriter.Builder(conf, new 
CacheConfig(tempConf), fs)
+                                    
.withOutputDir(familydir).withBloomType(bloomType)
+                                    
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
                 } else {
-                    StoreFileWriter.Builder writerBuilder = new 
StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
-                    wl.writer = 
writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
-                            .withComparator(new 
CellComparatorImpl.MetaCellComparator())
-                            
.withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
+                    wl.writer =
+                            new StoreFileWriter.Builder(conf, new 
CacheConfig(tempConf), new HFileSystem(fs))
+                                    
.withOutputDir(familydir).withBloomType(bloomType)
+                                    
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
+                                    .withFavoredNodes(favoredNodes).build();
                 }
 
-                this.writers.put(family, wl);
+                this.writers.put(tableAndFamily, wl);
                 return wl;
             }
 
             private void close(final StoreFileWriter w) throws IOException {
                 if (w != null) {
-                    w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY, 
Bytes.toBytes(System.currentTimeMillis()));
-                    w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY, 
Bytes.toBytes(context.getTaskAttemptID().toString()));
-                    w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY, 
Bytes.toBytes(true));
-                    
w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, 
Bytes.toBytes(compactionExclude));
+                    w.appendFileInfo(BULKLOAD_TIME_KEY,
+                            Bytes.toBytes(System.currentTimeMillis()));
+                    w.appendFileInfo(BULKLOAD_TASK_KEY,
+                            
Bytes.toBytes(context.getTaskAttemptID().toString()));
+                    w.appendFileInfo(MAJOR_COMPACTION_KEY,
+                            Bytes.toBytes(true));
+                    w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+                            Bytes.toBytes(compactionExclude));
                     w.appendTrackedTimestampsToMetadata();
                     w.close();
                 }
             }
 
             @Override
-            public void close(TaskAttemptContext c) throws IOException, 
InterruptedException {
+            public void close(TaskAttemptContext c)
+                    throws IOException, InterruptedException {
                 for (WriterLength wl : this.writers.values()) {
                     close(wl.writer);
                 }
@@ -316,6 +453,21 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
         };
     }
 
+    /**
+     * Configure block storage policy for CF after the directory is created.
+     */
+    static void configureStoragePolicy(final Configuration conf, final 
FileSystem fs,
+                                       byte[] tableAndFamily, Path cfPath) {
+        if (null == conf || null == fs || null == tableAndFamily || null == 
cfPath) {
+            return;
+        }
+
+        String policy =
+                conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + 
Bytes.toString(tableAndFamily),
+                        conf.get(STORAGE_POLICY_PROPERTY));
+        FSUtils.setStoragePolicy(fs, cfPath, policy);
+    }
+
     /*
      * Data structure to hold a Writer and amount of data written on it.
      */
@@ -328,11 +480,27 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      * Return the start keys of all of the regions in this table,
      * as a list of ImmutableBytesWritable.
      */
-    private static List<ImmutableBytesWritable> 
getRegionStartKeys(RegionLocator table) throws IOException {
-        byte[][] byteKeys = table.getStartKeys();
-        ArrayList<ImmutableBytesWritable> ret = new 
ArrayList<ImmutableBytesWritable>(byteKeys.length);
-        for (byte[] byteKey : byteKeys) {
-            ret.add(new ImmutableBytesWritable(byteKey));
+    private static List<ImmutableBytesWritable> 
getRegionStartKeys(List<RegionLocator> regionLocators,
+                                                                   boolean 
writeMultipleTables)
+            throws IOException {
+
+        ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
+        for (RegionLocator regionLocator : regionLocators) {
+            TableName tableName = regionLocator.getName();
+            LOG.info("Looking up current regions for table " + tableName);
+            byte[][] byteKeys = regionLocator.getStartKeys();
+            for (byte[] byteKey : byteKeys) {
+                byte[] fullKey = byteKey; //HFileOutputFormat2 use case
+                if (writeMultipleTables) {
+                    //MultiTableHFileOutputFormat use case
+                    fullKey = combineTableNameSuffix(tableName.getName(), 
byteKey);
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("SplitPoint startkey for table [" + tableName + 
"]: ["
+                            + Bytes.toStringBinary(fullKey) + "]");
+                }
+                ret.add(new ImmutableBytesWritable(fullKey));
+            }
         }
         return ret;
     }
@@ -342,8 +510,8 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      * {@link TotalOrderPartitioner} that contains the split points in 
startKeys.
      */
     @SuppressWarnings("deprecation")
-    private static void writePartitions(Configuration conf, Path 
partitionsPath, List<ImmutableBytesWritable> startKeys)
-            throws IOException {
+    private static void writePartitions(Configuration conf, Path 
partitionsPath,
+                                        List<ImmutableBytesWritable> 
startKeys, boolean writeMultipleTables) throws IOException {
         LOG.info("Writing partition information to " + partitionsPath);
         if (startKeys.isEmpty()) {
             throw new IllegalArgumentException("No regions passed");
@@ -353,18 +521,22 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
         // have keys < the first region (which has an empty start key)
         // so we need to remove it. Otherwise we would end up with an
         // empty reducer with index 0
-        TreeSet<ImmutableBytesWritable> sorted = new 
TreeSet<ImmutableBytesWritable>(startKeys);
-
+        TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
         ImmutableBytesWritable first = sorted.first();
-        if (!Arrays.equals(first.get(), HConstants.EMPTY_BYTE_ARRAY)) {
-            throw new IllegalArgumentException("First region of table should 
have empty start key. Instead has: "
-                    + Bytes.toStringBinary(first.get()));
+        if (writeMultipleTables) {
+            first = new 
ImmutableBytesWritable(HFileOutputFormat3.getSuffix(sorted.first().get()));
+        }
+        if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+            throw new IllegalArgumentException(
+                    "First region of table should have empty start key. 
Instead has: "
+                            + Bytes.toStringBinary(first.get()));
         }
-        sorted.remove(first);
+        sorted.remove(sorted.first());
 
         // Write the actual file
         FileSystem fs = partitionsPath.getFileSystem(conf);
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
partitionsPath, ImmutableBytesWritable.class,
+        SequenceFile.Writer writer = SequenceFile.createWriter(
+                fs, conf, partitionsPath, ImmutableBytesWritable.class,
                 NullWritable.class);
 
         try {
@@ -376,70 +548,6 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
         }
     }
 
-    public static File configureHConnection(Job job, Configuration 
hConnectionConf, File tempDir) throws IOException {
-        File tempFile = new File(tempDir, "HConfiguration-" + 
System.currentTimeMillis() + ".xml");
-        tempFile.deleteOnExit();
-
-        FileOutputStream os = new FileOutputStream(tempFile);
-        hConnectionConf.writeXml(os);
-        os.close();
-
-        String tmpFiles = job.getConfiguration().get("tmpfiles", null);
-        if (tmpFiles == null) {
-            tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath());
-        } else {
-            tmpFiles += "," + fixWindowsPath("file://" + 
tempFile.getAbsolutePath());
-        }
-        job.getConfiguration().set("tmpfiles", tmpFiles);
-        LOG.info("A temporary file " + tempFile.getAbsolutePath()
-                + " is created for storing hconnection related 
configuration!!!");
-
-        job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, 
tempFile.getName());
-        return tempFile;
-    }
-
-    public static Configuration getConfigureHConnection(Configuration jobConf) 
{
-        if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) 
{
-            return jobConf;
-        }
-        File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY));
-        Configuration hConnectionConf = new Configuration(false);
-        hConnectionConf.addResource(new Path(tempFile.toURI()));
-        return hConnectionConf;
-    }
-
-    public static String fixWindowsPath(String path) {
-        // fix windows path
-        if (path.startsWith("file://") && !path.startsWith("file:///") && 
path.contains(":\\")) {
-            path = path.replace("file://", "file:///");
-        }
-        if (path.startsWith("file:///")) {
-            path = path.replace('\\', '/');
-        }
-        return path;
-    }
-
-    /**
-     * Configure a MapReduce Job to perform an incremental load into the given
-     * table. This
-     * <ul>
-     *   <li>Inspects the table to configure a total order partitioner</li>
-     *   <li>Uploads the partitions file to the cluster and adds it to the 
DistributedCache</li>
-     *   <li>Sets the number of reduce tasks to match the current number of 
regions</li>
-     *   <li>Sets the output key/value class to match HFileOutputFormat2's 
requirements</li>
-     *   <li>Sets the reducer up to perform the appropriate sorting (either 
KeyValueSortReducer or
-     *     PutSortReducer)</li>
-     * </ul>
-     * The user should be sure to set the map output value class to either 
KeyValue or Put before
-     * running this function.
-     *
-     * @deprecated Use {@link #configureIncrementalLoad(Job, Table, 
RegionLocator)} instead.
-     */
-    @Deprecated
-    public static void configureIncrementalLoad(Job job, HTable table) throws 
IOException {
-        configureIncrementalLoad(job, table.getTableDescriptor(), 
table.getRegionLocator());
-    }
-
     /**
      * Configure a MapReduce Job to perform an incremental load into the given
      * table. This
@@ -454,8 +562,9 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      * The user should be sure to set the map output value class to either 
KeyValue or Put before
      * running this function.
      */
-    public static void configureIncrementalLoad(Job job, Table table, 
RegionLocator regionLocator) throws IOException {
-        configureIncrementalLoad(job, table.getTableDescriptor(), 
regionLocator);
+    public static void configureIncrementalLoad(Job job, Table table, 
RegionLocator regionLocator)
+            throws IOException {
+        configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
     }
 
     /**
@@ -472,23 +581,34 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      * The user should be sure to set the map output value class to either 
KeyValue or Put before
      * running this function.
      */
-    public static void configureIncrementalLoad(Job job, HTableDescriptor 
tableDescriptor, RegionLocator regionLocator)
-            throws IOException {
-        configureIncrementalLoad(job, tableDescriptor, regionLocator, 
HFileOutputFormat3.class);
+    public static void configureIncrementalLoad(Job job, TableDescriptor 
tableDescriptor,
+                                                RegionLocator regionLocator) 
throws IOException {
+        ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
+        singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
+        configureIncrementalLoad(job, singleTableInfo, 
HFileOutputFormat3.class);
     }
 
-    static void configureIncrementalLoad(Job job, HTableDescriptor 
tableDescriptor, RegionLocator regionLocator,
-            Class<? extends OutputFormat<?, ?>> cls) throws IOException, 
UnsupportedEncodingException {
+    static void configureIncrementalLoad(Job job, List<TableInfo> 
multiTableInfo,
+                                         Class<? extends OutputFormat<?, ?>> 
cls) throws IOException {
         Configuration conf = job.getConfiguration();
         job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(KeyValue.class);
+        job.setOutputValueClass(MapReduceExtendedCell.class);
         job.setOutputFormatClass(cls);
 
+        if (multiTableInfo.stream().distinct().count() != 
multiTableInfo.size()) {
+            throw new IllegalArgumentException("Duplicate entries found in 
TableInfo argument");
+        }
+        boolean writeMultipleTables = false;
+        if (MultiTableHFileOutputFormat.class.equals(cls)) {
+            writeMultipleTables = true;
+            conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+        }
         // Based on the configured map output class, set the correct reducer 
to properly
         // sort the incoming values.
         // TODO it would be nice to pick one or the other of these formats.
-        if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-            job.setReducerClass(KeyValueSortReducer.class);
+        if (KeyValue.class.equals(job.getMapOutputValueClass())
+                || 
MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(CellSortReducer.class);
         } else if (Put.class.equals(job.getMapOutputValueClass())) {
             job.setReducerClass(PutSortReducer.class);
         } else if (Text.class.equals(job.getMapOutputValueClass())) {
@@ -497,50 +617,75 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
             LOG.warn("Unknown map output value type:" + 
job.getMapOutputValueClass());
         }
 
-        conf.setStrings("io.serializations", conf.get("io.serializations"), 
MutationSerialization.class.getName(),
-                ResultSerialization.class.getName(), 
KeyValueSerialization.class.getName());
+        conf.setStrings("io.serializations", conf.get("io.serializations"),
+                MutationSerialization.class.getName(), 
ResultSerialization.class.getName(),
+                CellSerialization.class.getName());
 
         if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
-            // record this table name for creating writer by favored nodes
             LOG.info("bulkload locality sensitive enabled");
-            conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 
regionLocator.getName().getNameAsString());
         }
-        
+
+        /* Now get the region start keys for every table required */
+        List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
+        List<RegionLocator> regionLocators = new 
ArrayList<>(multiTableInfo.size());
+        List<TableDescriptor> tableDescriptors = new 
ArrayList<>(multiTableInfo.size());
+
+        for (TableInfo tableInfo : multiTableInfo) {
+            regionLocators.add(tableInfo.getRegionLocator());
+            
allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
+            tableDescriptors.add(tableInfo.getTableDescriptor());
+        }
+        // Record tablenames for creating writer by favored nodes, and 
decoding compression, block size and other attributes of columnfamily per table
+        conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, 
Bytes
+                .toString(tableSeparator)));
+        List<ImmutableBytesWritable> startKeys = 
getRegionStartKeys(regionLocators, writeMultipleTables);
         // Use table's region boundaries for TOP split points.
-        LOG.info("Looking up current regions for table " + 
tableDescriptor.getTableName());
-        List<ImmutableBytesWritable> startKeys = 
getRegionStartKeys(regionLocator);
-        LOG.info("Configuring " + startKeys.size() + " reduce partitions " + 
"to match current region count");
+        LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+                "to match current region count for all tables");
         job.setNumReduceTasks(startKeys.size());
 
-        configurePartitioner(job, startKeys);
+        configurePartitioner(job, startKeys, writeMultipleTables);
         // Set compression algorithms based on column families
-        configureCompression(conf, tableDescriptor);
-        configureBloomType(tableDescriptor, conf);
-        configureBlockSize(tableDescriptor, conf);
-        configureDataBlockEncoding(tableDescriptor, conf);
+
+        conf.set(COMPRESSION_FAMILIES_CONF_KEY, 
serializeColumnFamilyAttribute(compressionDetails,
+                tableDescriptors));
+        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 
serializeColumnFamilyAttribute(blockSizeDetails,
+                tableDescriptors));
+        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 
serializeColumnFamilyAttribute(bloomTypeDetails,
+                tableDescriptors));
+        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(dataBlockEncodingDetails, 
tableDescriptors));
 
         TableMapReduceUtil.addDependencyJars(job);
         TableMapReduceUtil.initCredentials(job);
-        LOG.info("Incremental table " + regionLocator.getName() + " output 
configured.");
+        LOG.info("Incremental output configured for tables: " + 
StringUtils.join(allTableNames, ","));
     }
 
-    public static void configureIncrementalLoadMap(Job job, Table table) 
throws IOException {
+    public static void configureIncrementalLoadMap(Job job, TableDescriptor 
tableDescriptor) throws
+            IOException {
         Configuration conf = job.getConfiguration();
 
         job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(KeyValue.class);
+        job.setOutputValueClass(MapReduceExtendedCell.class);
         job.setOutputFormatClass(HFileOutputFormat3.class);
 
+        ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
+        singleTableDescriptor.add(tableDescriptor);
+
+        conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 
tableDescriptor.getTableName().getNameAsString());
         // Set compression algorithms based on column families
-        configureCompression(conf, table.getTableDescriptor());
-        configureBloomType(table.getTableDescriptor(), conf);
-        configureBlockSize(table.getTableDescriptor(), conf);
-        HTableDescriptor tableDescriptor = table.getTableDescriptor();
-        configureDataBlockEncoding(tableDescriptor, conf);
+        conf.set(COMPRESSION_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(compressionDetails, 
singleTableDescriptor));
+        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(blockSizeDetails, 
singleTableDescriptor));
+        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(bloomTypeDetails, 
singleTableDescriptor));
+        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(dataBlockEncodingDetails, 
singleTableDescriptor));
 
         TableMapReduceUtil.addDependencyJars(job);
         TableMapReduceUtil.initCredentials(job);
-        LOG.info("Incremental table " + table.getName() + " output 
configured.");
+        LOG.info("Incremental table " + tableDescriptor.getTableName() + " 
output configured.");
     }
 
     /**
@@ -551,9 +696,11 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      * @return a map from column family to the configured compression algorithm
      */
     @VisibleForTesting
-    static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 
conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 
COMPRESSION_FAMILIES_CONF_KEY);
-        Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], 
Algorithm>(Bytes.BYTES_COMPARATOR);
+    static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
+                                                                     conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                COMPRESSION_FAMILIES_CONF_KEY);
+        Map<byte[], Algorithm> compressionMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             Algorithm algorithm = 
HFileWriterImpl.compressionByName(e.getValue());
             compressionMap.put(e.getKey(), algorithm);
@@ -570,8 +717,9 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      */
     @VisibleForTesting
     static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) 
{
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 
BLOOM_TYPE_FAMILIES_CONF_KEY);
-        Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], 
BloomType>(Bytes.BYTES_COMPARATOR);
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                BLOOM_TYPE_FAMILIES_CONF_KEY);
+        Map<byte[], BloomType> bloomTypeMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             BloomType bloomType = BloomType.valueOf(e.getValue());
             bloomTypeMap.put(e.getKey(), bloomType);
@@ -588,8 +736,9 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      */
     @VisibleForTesting
     static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 
BLOCK_SIZE_FAMILIES_CONF_KEY);
-        Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                BLOCK_SIZE_FAMILIES_CONF_KEY);
+        Map<byte[], Integer> blockSizeMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             Integer blockSize = Integer.parseInt(e.getValue());
             blockSizeMap.put(e.getKey(), blockSize);
@@ -603,27 +752,31 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      *
      * @param conf to read the serialized values from
      * @return a map from column family to HFileDataBlockEncoder for the
-     *         configured data block type for the family
+     * configured data block type for the family
      */
     @VisibleForTesting
-    static Map<byte[], DataBlockEncoding> 
createFamilyDataBlockEncodingMap(Configuration conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 
DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
-        Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], 
DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
+    static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
+            Configuration conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
+        Map<byte[], DataBlockEncoding> encoderMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             encoderMap.put(e.getKey(), 
DataBlockEncoding.valueOf((e.getValue())));
         }
         return encoderMap;
     }
 
+
     /**
      * Run inside the task to deserialize column family to given conf value 
map.
      *
-     * @param conf to read the serialized values from
+     * @param conf     to read the serialized values from
      * @param confName conf key to read from the configuration
      * @return a map of column family to the given configuration value
      */
-    private static Map<byte[], String> createFamilyConfValueMap(Configuration 
conf, String confName) {
-        Map<byte[], String> confValMap = new TreeMap<byte[], 
String>(Bytes.BYTES_COMPARATOR);
+    private static Map<byte[], String> createFamilyConfValueMap(
+            Configuration conf, String confName) {
+        Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         String confVal = conf.get(confName, "");
         for (String familyConf : confVal.split("&")) {
             String[] familySplit = familyConf.split("=");
@@ -631,7 +784,7 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
                 continue;
             }
             try {
-                confValMap.put(URLDecoder.decode(familySplit[0], 
"UTF-8").getBytes(StandardCharsets.UTF_8),
+                confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], 
"UTF-8")),
                         URLDecoder.decode(familySplit[1], "UTF-8"));
             } catch (UnsupportedEncodingException e) {
                 // will not happen with UTF-8 encoding
@@ -645,13 +798,18 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
      * Configure <code>job</code> with a TotalOrderPartitioner, partitioning 
against
      * <code>splitPoints</code>. Cleans up the partitions file after job 
exists.
      */
-    static void configurePartitioner(Job job, List<ImmutableBytesWritable> 
splitPoints) throws IOException {
+    static void configurePartitioner(Job job, List<ImmutableBytesWritable> 
splitPoints, boolean
+            writeMultipleTables)
+            throws IOException {
         Configuration conf = job.getConfiguration();
         // create the partitions file
         FileSystem fs = FileSystem.get(conf);
-        Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), 
"partitions_" + RandomUtil.randomUUID());
+        String hbaseTmpFsDir =
+                conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+                        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+        Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + 
RandomUtil.randomUUID());
         fs.makeQualified(partitionsPath);
-        writePartitions(conf, partitionsPath, splitPoints);
+        writePartitions(conf, partitionsPath, splitPoints, 
writeMultipleTables);
         fs.deleteOnExit(partitionsPath);
 
         // configure job to use it
@@ -659,134 +817,134 @@ public class HFileOutputFormat3 extends 
FileOutputFormat<ImmutableBytesWritable,
         TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
     }
 
-    /**
-     * Serialize column family to compression algorithm map to configuration.
-     * Invoked while configuring the MR job for incremental load.
-     *
-     * @param table to read the properties from
-     * @param conf to persist serialized values into
-     * @throws IOException
-     *           on failure to read column family descriptors
-     */
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
     @VisibleForTesting
-    static void configureCompression(Configuration conf, HTableDescriptor 
tableDescriptor)
+    static String 
serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 
List<TableDescriptor> allTables)
             throws UnsupportedEncodingException {
-        StringBuilder compressionConfigValue = new StringBuilder();
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
-        }
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+        StringBuilder attributeValue = new StringBuilder();
         int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                compressionConfigValue.append('&');
+        for (TableDescriptor tableDescriptor : allTables) {
+            if (tableDescriptor == null) {
+                // could happen with mock table instance
+                // CODEREVIEW: Can I set an empty string in conf if mock table 
instance?
+                return "";
+            }
+            for (ColumnFamilyDescriptor familyDescriptor : 
tableDescriptor.getColumnFamilies()) {
+                if (i++ > 0) {
+                    attributeValue.append('&');
+                }
+                attributeValue.append(URLEncoder.encode(
+                        
Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 
familyDescriptor.getName())),
+                        "UTF-8"));
+                attributeValue.append('=');
+                
attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
             }
-            
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
 "UTF-8"));
-            compressionConfigValue.append('=');
-            
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(),
 "UTF-8"));
         }
         // Get rid of the last ampersand
-        conf.set(COMPRESSION_FAMILIES_CONF_KEY, 
compressionConfigValue.toString());
+        return attributeValue.toString();
     }
 
     /**
-     * Serialize column family to block size map to configuration.
+     * Serialize column family to compression algorithm map to configuration.
      * Invoked while configuring the MR job for incremental load.
+     *
      * @param tableDescriptor to read the properties from
      * @param conf to persist serialized values into
+     * @throws IOException
+     * on failure to read column family descriptors
+     */
+    @VisibleForTesting
+    static Function<ColumnFamilyDescriptor, String> compressionDetails = 
familyDescriptor ->
+            familyDescriptor.getCompressionType().getName();
+
+    /**
+     * Serialize column family to block size map to configuration. Invoked 
while
+     * configuring the MR job for incremental load.
      *
+     * @param tableDescriptor
+     * to read the properties from
+     * @param conf
+     * to persist serialized values into
      * @throws IOException
-     *           on failure to read column family descriptors
+     * on failure to read column family descriptors
      */
     @VisibleForTesting
-    static void configureBlockSize(HTableDescriptor tableDescriptor, 
Configuration conf)
-            throws UnsupportedEncodingException {
-        StringBuilder blockSizeConfigValue = new StringBuilder();
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
-        }
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-        int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                blockSizeConfigValue.append('&');
-            }
-            
blockSizeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
 "UTF-8"));
-            blockSizeConfigValue.append('=');
-            
blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(familyDescriptor.getBlocksize()),
 "UTF-8"));
-        }
-        // Get rid of the last ampersand
-        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 
blockSizeConfigValue.toString());
-    }
+    static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 
familyDescriptor -> String
+            .valueOf(familyDescriptor.getBlocksize());
 
     /**
-     * Serialize column family to bloom type map to configuration.
-     * Invoked while configuring the MR job for incremental load.
-     * @param tableDescriptor to read the properties from
-     * @param conf to persist serialized values into
+     * Serialize column family to bloom type map to configuration. Invoked 
while
+     * configuring the MR job for incremental load.
      *
+     * @param tableDescriptor
+     * to read the properties from
+     * @param conf
+     * to persist serialized values into
      * @throws IOException
-     *           on failure to read column family descriptors
+     * on failure to read column family descriptors
      */
     @VisibleForTesting
-    static void configureBloomType(HTableDescriptor tableDescriptor, 
Configuration conf)
-            throws UnsupportedEncodingException {
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
-        }
-        StringBuilder bloomTypeConfigValue = new StringBuilder();
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-        int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                bloomTypeConfigValue.append('&');
-            }
-            
bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
 "UTF-8"));
-            bloomTypeConfigValue.append('=');
-            String bloomType = 
familyDescriptor.getBloomFilterType().toString();
-            if (bloomType == null) {
-                bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
-            }
-            bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+    static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = 
familyDescriptor -> {
+        String bloomType = familyDescriptor.getBloomFilterType().toString();
+        if (bloomType == null) {
+            bloomType = 
ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
         }
-        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 
bloomTypeConfigValue.toString());
-    }
+        return bloomType;
+    };
 
     /**
      * Serialize column family to data block encoding map to configuration.
      * Invoked while configuring the MR job for incremental load.
      *
-     * @param table to read the properties from
-     * @param conf to persist serialized values into
+     * @param tableDescriptor
+     * to read the properties from
+     * @param conf
+     * to persist serialized values into
      * @throws IOException
-     *           on failure to read column family descriptors
+     * on failure to read column family descriptors
      */
     @VisibleForTesting
-    static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, 
Configuration conf)
-            throws UnsupportedEncodingException {
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
+    static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = 
familyDescriptor -> {
+        DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+        if (encoding == null) {
+            encoding = DataBlockEncoding.NONE;
         }
-        StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-        int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                dataBlockEncodingConfigValue.append('&');
-            }
-            
dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
 "UTF-8"));
-            dataBlockEncodingConfigValue.append('=');
-            DataBlockEncoding encoding = 
familyDescriptor.getDataBlockEncoding();
-            if (encoding == null) {
-                encoding = DataBlockEncoding.NONE;
-            }
-            
dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), 
"UTF-8"));
+        return encoding.toString();
+    };
+
+    /**
+     * Copy from HBase's 
org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+     * so that it's protect function can be used.
+     */
+    final private static int validateCompositeKey(byte[] keyBytes) {
+
+        int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator);
+
+        // Either the separator was not found or a tablename wasn't present or 
a key wasn't present
+        if (separatorIdx == -1) {
+            throw new IllegalArgumentException("Invalid format for composite 
key [" + Bytes
+                    .toStringBinary(keyBytes) + "]. Cannot extract tablename 
and suffix from key");
         }
-        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 
dataBlockEncodingConfigValue.toString());
+        return separatorIdx;
     }
-}
+
+    /**
+     * Copy from HBase's 
org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+     * so that it's protect function can be used.
+     */
+    protected static byte[] getTableName(byte[] keyBytes) {
+        int separatorIdx = validateCompositeKey(keyBytes);
+        return Bytes.copy(keyBytes, 0, separatorIdx);
+    }
+
+    /**
+     * Copy from HBase's 
org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+     * so that it's protect function can be used.
+     */
+
+    protected static byte[] getSuffix(byte[] keyBytes) {
+        int separatorIdx = validateCompositeKey(keyBytes);
+        return Bytes.copy(keyBytes, separatorIdx + 1, keyBytes.length - 
separatorIdx - 1);
+    }
+
+}
\ No newline at end of file
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index fc0adf1..268e162 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -217,6 +217,7 @@
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
@@ -224,7 +225,8 @@
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-runner</artifactId>
+            <artifactId>jetty-util</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>

Reply via email to