KYLIN-2788 fix bug in HFileOutputFormat2

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/14eeb339
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/14eeb339
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/14eeb339

Branch: refs/heads/master
Commit: 14eeb3398db9e055c5a70ae55708ec2929a0b6b3
Parents: aae24e3
Author: shaofengshi <[email protected]>
Authored: Mon Sep 25 18:31:03 2017 +0800
Committer: Li Yang <[email protected]>
Committed: Tue Sep 26 18:14:24 2017 +0800

----------------------------------------------------------------------
 .../kylin/storage/hbase/steps/CubeHFileJob.java |   3 +-
 .../storage/hbase/steps/HFileOutputFormat3.java | 733 +++++++++++++++++++
 2 files changed, 734 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/14eeb339/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 1a624c4..7dc67e5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.SequenceFile;
@@ -95,7 +94,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             HTable htable = new HTable(hbaseConf, 
getOptionValue(OPTION_HTABLE_NAME).toUpperCase());
 
             // Automatic config !
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+            HFileOutputFormat3.configureIncrementalLoad(job, htable);
             reconfigurePartitions(hbaseConf, partitionFilePath);
 
             // set block replication to 3 for hfiles

http://git-wip-us.apache.org/repos/asf/kylin/blob/14eeb339/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
new file mode 100644
index 0000000..d50166e
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, 
with fix attempt on KYLIN-2788
+ *
+ * Writes HFiles. Passed Cells must arrive in order.
+ * Writes current time as the sequence id for the file. Sets the major 
compacted
+ * attribute on created @{link {@link HFile}s. Calling write(null,null) will 
forcibly roll
+ * all HFiles being written.
+ * <p>
+ * Using this class as part of a MapReduce job is best done
+ * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}.
+ */
[email protected]
[email protected]
+public class HFileOutputFormat3
+        extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+    static Log LOG = LogFactory.getLog(HFileOutputFormat3.class);
+
+    // The following constants are private since these are used by
+    // HFileOutputFormat2 to internally transfer data between job setup and
+    // reducer run using conf.
+    // These should not be changed by the client.
+    private static final String COMPRESSION_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.compression";
+    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.bloomtype";
+    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.blocksize";
+    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+
+    // This constant is public since the client can modify this when setting
+    // up their conf object and thus refer to this symbol.
+    // It is present for backwards compatibility reasons. Use it only to
+    // override the auto-detection of datablock encoding.
+    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+
+    @Override
+    public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
+            final TaskAttemptContext context) throws IOException, 
InterruptedException {
+        return createRecordWriter(context);
+    }
+
+    static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+    createRecordWriter(final TaskAttemptContext context)
+            throws IOException, InterruptedException {
+
+        final Configuration conf = context.getConfiguration();
+        Path outputPath = FileOutputFormat.getOutputPath(context);
+        // Get the path of the temporary output file
+        FileOutputCommitter committer = (FileOutputCommitter) 
ReflectionUtils.newInstance(conf.getClass(
+                "mapreduce.output.committer.class", FileOutputCommitter.class,
+                org.apache.hadoop.mapreduce.OutputCommitter.class), 
outputPath, context);
+
+        final Path outputdir = committer.getWorkPath();
+        LOG.debug("Task output path: " + outputdir);
+        final FileSystem fs = outputdir.getFileSystem(conf);
+        // These configs. are from hbase-*.xml
+        final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+                HConstants.DEFAULT_MAX_FILE_SIZE);
+        // Invented config.  Add to hbase-*.xml if other than default 
compression.
+        final String defaultCompressionStr = conf.get("hfile.compression",
+                Compression.Algorithm.NONE.getName());
+        final Algorithm defaultCompression = AbstractHFileWriter
+                .compressionByName(defaultCompressionStr);
+        final boolean compactionExclude = conf.getBoolean(
+                "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+        // create a map from column family to the compression algorithm
+        final Map<byte[], Algorithm> compressionMap = 
createFamilyCompressionMap(conf);
+        final Map<byte[], BloomType> bloomTypeMap = 
createFamilyBloomTypeMap(conf);
+        final Map<byte[], Integer> blockSizeMap = 
createFamilyBlockSizeMap(conf);
+
+        String dataBlockEncodingStr = 
conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+        final Map<byte[], DataBlockEncoding> datablockEncodingMap
+                = createFamilyDataBlockEncodingMap(conf);
+        final DataBlockEncoding overriddenEncoding;
+        if (dataBlockEncodingStr != null) {
+            overriddenEncoding = 
DataBlockEncoding.valueOf(dataBlockEncodingStr);
+        } else {
+            overriddenEncoding = null;
+        }
+
+        return new RecordWriter<ImmutableBytesWritable, V>() {
+            // Map of families to writers and how much has been output on the 
writer.
+            private final Map<byte [], WriterLength> writers =
+                    new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
+            private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+            private final byte [] now = 
Bytes.toBytes(System.currentTimeMillis());
+            private boolean rollRequested = false;
+
+            @Override
+            public void write(ImmutableBytesWritable row, V cell)
+                    throws IOException {
+                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+                if (row == null && kv == null) {
+                    rollWriters();
+                    return;
+                }
+                byte [] rowKey = CellUtil.cloneRow(kv);
+                long length = kv.getLength();
+                byte [] family = CellUtil.cloneFamily(kv);
+                WriterLength wl = this.writers.get(family);
+                if (wl == null) {
+                    fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+                }
+                if (wl != null && wl.written + length >= maxsize) {
+                    this.rollRequested = true;
+                }
+                if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) 
!= 0) {
+                    rollWriters();
+                }
+                if (wl == null || wl.writer == null) {
+                    wl = getNewWriter(family, conf);
+                }
+                kv.updateLatestStamp(this.now);
+                wl.writer.append(kv);
+                wl.written += length;
+                this.previousRow = rowKey;
+            }
+
+            private void rollWriters() throws IOException {
+                for (WriterLength wl : this.writers.values()) {
+                    if (wl.writer != null) {
+                        LOG.info("Writer=" + wl.writer.getPath() +
+                                ((wl.written == 0)? "": ", wrote=" + 
wl.written));
+                        close(wl.writer);
+                    }
+                    wl.writer = null;
+                    wl.written = 0;
+                }
+                this.rollRequested = false;
+            }
+
+            
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
+                    justification="Not important")
+            private WriterLength getNewWriter(byte[] family, Configuration 
conf)
+                    throws IOException {
+                WriterLength wl = new WriterLength();
+                Path familydir = new Path(outputdir, Bytes.toString(family));
+                Algorithm compression = compressionMap.get(family);
+                compression = compression == null ? defaultCompression : 
compression;
+                BloomType bloomType = bloomTypeMap.get(family);
+                bloomType = bloomType == null ? BloomType.NONE : bloomType;
+                Integer blockSize = blockSizeMap.get(family);
+                blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : 
blockSize;
+                DataBlockEncoding encoding = overriddenEncoding;
+                encoding = encoding == null ? datablockEncodingMap.get(family) 
: encoding;
+                encoding = encoding == null ? DataBlockEncoding.NONE : 
encoding;
+                Configuration tempConf = new Configuration(conf);
+                tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+                HFileContextBuilder contextBuilder = new HFileContextBuilder()
+                        .withCompression(compression)
+                        .withChecksumType(HStore.getChecksumType(conf))
+                        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+                        .withBlockSize(blockSize);
+                contextBuilder.withDataBlockEncoding(encoding);
+                HFileContext hFileContext = contextBuilder.build();
+
+                wl.writer = new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
+                        .withOutputDir(familydir).withBloomType(bloomType)
+                        .withComparator(KeyValue.COMPARATOR)
+                        .withFileContext(hFileContext).build();
+
+                this.writers.put(family, wl);
+                return wl;
+            }
+
+            private void close(final StoreFile.Writer w) throws IOException {
+                if (w != null) {
+                    w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+                            Bytes.toBytes(System.currentTimeMillis()));
+                    w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+                            
Bytes.toBytes(context.getTaskAttemptID().toString()));
+                    w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+                            Bytes.toBytes(true));
+                    
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+                            Bytes.toBytes(compactionExclude));
+                    w.appendTrackedTimestampsToMetadata();
+                    w.close();
+                }
+            }
+
+            @Override
+            public void close(TaskAttemptContext c)
+                    throws IOException, InterruptedException {
+                for (WriterLength wl: this.writers.values()) {
+                    close(wl.writer);
+                }
+            }
+        };
+    }
+
+    /*
+     * Data structure to hold a Writer and amount of data written on it.
+     */
+    static class WriterLength {
+        long written = 0;
+        StoreFile.Writer writer = null;
+    }
+
+    /**
+     * Return the start keys of all of the regions in this table,
+     * as a list of ImmutableBytesWritable.
+     */
+    private static List<ImmutableBytesWritable> 
getRegionStartKeys(RegionLocator table)
+            throws IOException {
+        byte[][] byteKeys = table.getStartKeys();
+        ArrayList<ImmutableBytesWritable> ret =
+                new ArrayList<ImmutableBytesWritable>(byteKeys.length);
+        for (byte[] byteKey : byteKeys) {
+            ret.add(new ImmutableBytesWritable(byteKey));
+        }
+        return ret;
+    }
+
+    /**
+     * Write out a {@link SequenceFile} that can be read by
+     * {@link TotalOrderPartitioner} that contains the split points in 
startKeys.
+     */
+    @SuppressWarnings("deprecation")
+    private static void writePartitions(Configuration conf, Path 
partitionsPath,
+                                        List<ImmutableBytesWritable> 
startKeys) throws IOException {
+        LOG.info("Writing partition information to " + partitionsPath);
+        if (startKeys.isEmpty()) {
+            throw new IllegalArgumentException("No regions passed");
+        }
+
+        // We're generating a list of split points, and we don't ever
+        // have keys < the first region (which has an empty start key)
+        // so we need to remove it. Otherwise we would end up with an
+        // empty reducer with index 0
+        TreeSet<ImmutableBytesWritable> sorted =
+                new TreeSet<ImmutableBytesWritable>(startKeys);
+
+        ImmutableBytesWritable first = sorted.first();
+        if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+            throw new IllegalArgumentException(
+                    "First region of table should have empty start key. 
Instead has: "
+                            + Bytes.toStringBinary(first.get()));
+        }
+        sorted.remove(first);
+
+        // Write the actual file
+        FileSystem fs = partitionsPath.getFileSystem(conf);
+        SequenceFile.Writer writer = SequenceFile.createWriter(
+                fs, conf, partitionsPath, ImmutableBytesWritable.class,
+                NullWritable.class);
+
+        try {
+            for (ImmutableBytesWritable startKey : sorted) {
+                writer.append(startKey, NullWritable.get());
+            }
+        } finally {
+            writer.close();
+        }
+    }
+
+    /**
+     * Configure a MapReduce Job to perform an incremental load into the given
+     * table. This
+     * <ul>
+     *   <li>Inspects the table to configure a total order partitioner</li>
+     *   <li>Uploads the partitions file to the cluster and adds it to the 
DistributedCache</li>
+     *   <li>Sets the number of reduce tasks to match the current number of 
regions</li>
+     *   <li>Sets the output key/value class to match HFileOutputFormat2's 
requirements</li>
+     *   <li>Sets the reducer up to perform the appropriate sorting (either 
KeyValueSortReducer or
+     *     PutSortReducer)</li>
+     * </ul>
+     * The user should be sure to set the map output value class to either 
KeyValue or Put before
+     * running this function.
+     *
+     * @deprecated Use {@link #configureIncrementalLoad(Job, Table, 
RegionLocator)} instead.
+     */
+    @Deprecated
+    public static void configureIncrementalLoad(Job job, HTable table)
+            throws IOException {
+        configureIncrementalLoad(job, table.getTableDescriptor(), 
table.getRegionLocator());
+    }
+
+    /**
+     * Configure a MapReduce Job to perform an incremental load into the given
+     * table. This
+     * <ul>
+     *   <li>Inspects the table to configure a total order partitioner</li>
+     *   <li>Uploads the partitions file to the cluster and adds it to the 
DistributedCache</li>
+     *   <li>Sets the number of reduce tasks to match the current number of 
regions</li>
+     *   <li>Sets the output key/value class to match HFileOutputFormat2's 
requirements</li>
+     *   <li>Sets the reducer up to perform the appropriate sorting (either 
KeyValueSortReducer or
+     *     PutSortReducer)</li>
+     * </ul>
+     * The user should be sure to set the map output value class to either 
KeyValue or Put before
+     * running this function.
+     */
+    public static void configureIncrementalLoad(Job job, Table table, 
RegionLocator regionLocator)
+            throws IOException {
+        configureIncrementalLoad(job, table.getTableDescriptor(), 
regionLocator);
+    }
+
+    /**
+     * Configure a MapReduce Job to perform an incremental load into the given
+     * table. This
+     * <ul>
+     *   <li>Inspects the table to configure a total order partitioner</li>
+     *   <li>Uploads the partitions file to the cluster and adds it to the 
DistributedCache</li>
+     *   <li>Sets the number of reduce tasks to match the current number of 
regions</li>
+     *   <li>Sets the output key/value class to match HFileOutputFormat2's 
requirements</li>
+     *   <li>Sets the reducer up to perform the appropriate sorting (either 
KeyValueSortReducer or
+     *     PutSortReducer)</li>
+     * </ul>
+     * The user should be sure to set the map output value class to either 
KeyValue or Put before
+     * running this function.
+     */
+    public static void configureIncrementalLoad(Job job, HTableDescriptor 
tableDescriptor,
+                                                RegionLocator regionLocator) 
throws IOException {
+        configureIncrementalLoad(job, tableDescriptor, regionLocator, 
HFileOutputFormat3.class);
+    }
+
+    static void configureIncrementalLoad(Job job, HTableDescriptor 
tableDescriptor,
+                                         RegionLocator regionLocator, Class<? 
extends OutputFormat<?, ?>> cls) throws IOException,
+            UnsupportedEncodingException {
+        Configuration conf = job.getConfiguration();
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(KeyValue.class);
+        job.setOutputFormatClass(cls);
+
+        // Based on the configured map output class, set the correct reducer 
to properly
+        // sort the incoming values.
+        // TODO it would be nice to pick one or the other of these formats.
+        if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(KeyValueSortReducer.class);
+        } else if (Put.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(PutSortReducer.class);
+        } else if (Text.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(TextSortReducer.class);
+        } else {
+            LOG.warn("Unknown map output value type:" + 
job.getMapOutputValueClass());
+        }
+
+        conf.setStrings("io.serializations", conf.get("io.serializations"),
+                MutationSerialization.class.getName(), 
ResultSerialization.class.getName(),
+                KeyValueSerialization.class.getName());
+
+        // Use table's region boundaries for TOP split points.
+        LOG.info("Looking up current regions for table " + 
tableDescriptor.getTableName());
+        List<ImmutableBytesWritable> startKeys = 
getRegionStartKeys(regionLocator);
+        LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+                "to match current region count");
+        job.setNumReduceTasks(startKeys.size());
+
+        configurePartitioner(job, startKeys);
+        // Set compression algorithms based on column families
+        configureCompression(conf, tableDescriptor);
+        configureBloomType(tableDescriptor, conf);
+        configureBlockSize(tableDescriptor, conf);
+        configureDataBlockEncoding(tableDescriptor, conf);
+
+        TableMapReduceUtil.addDependencyJars(job);
+        TableMapReduceUtil.initCredentials(job);
+        LOG.info("Incremental table " + regionLocator.getName() + " output 
configured.");
+    }
+
+    public static void configureIncrementalLoadMap(Job job, Table table) 
throws IOException {
+        Configuration conf = job.getConfiguration();
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(KeyValue.class);
+        job.setOutputFormatClass(HFileOutputFormat3.class);
+
+        // Set compression algorithms based on column families
+        configureCompression(conf, table.getTableDescriptor());
+        configureBloomType(table.getTableDescriptor(), conf);
+        configureBlockSize(table.getTableDescriptor(), conf);
+        HTableDescriptor tableDescriptor = table.getTableDescriptor();
+        configureDataBlockEncoding(tableDescriptor, conf);
+
+        TableMapReduceUtil.addDependencyJars(job);
+        TableMapReduceUtil.initCredentials(job);
+        LOG.info("Incremental table " + table.getName() + " output 
configured.");
+    }
+
+    /**
+     * Runs inside the task to deserialize column family to compression 
algorithm
+     * map from the configuration.
+     *
+     * @param conf to read the serialized values from
+     * @return a map from column family to the configured compression algorithm
+     */
+    @VisibleForTesting
+    static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
+                                                                     conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                COMPRESSION_FAMILIES_CONF_KEY);
+        Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
+                Algorithm>(Bytes.BYTES_COMPARATOR);
+        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+            Algorithm algorithm = 
AbstractHFileWriter.compressionByName(e.getValue());
+            compressionMap.put(e.getKey(), algorithm);
+        }
+        return compressionMap;
+    }
+
+    /**
+     * Runs inside the task to deserialize column family to bloom filter type
+     * map from the configuration.
+     *
+     * @param conf to read the serialized values from
+     * @return a map from column family to the the configured bloom filter type
+     */
+    @VisibleForTesting
+    static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) 
{
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                BLOOM_TYPE_FAMILIES_CONF_KEY);
+        Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
+                BloomType>(Bytes.BYTES_COMPARATOR);
+        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+            BloomType bloomType = BloomType.valueOf(e.getValue());
+            bloomTypeMap.put(e.getKey(), bloomType);
+        }
+        return bloomTypeMap;
+    }
+
+    /**
+     * Runs inside the task to deserialize column family to block size
+     * map from the configuration.
+     *
+     * @param conf to read the serialized values from
+     * @return a map from column family to the configured block size
+     */
+    @VisibleForTesting
+    static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                BLOCK_SIZE_FAMILIES_CONF_KEY);
+        Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
+                Integer>(Bytes.BYTES_COMPARATOR);
+        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+            Integer blockSize = Integer.parseInt(e.getValue());
+            blockSizeMap.put(e.getKey(), blockSize);
+        }
+        return blockSizeMap;
+    }
+
+    /**
+     * Runs inside the task to deserialize column family to data block encoding
+     * type map from the configuration.
+     *
+     * @param conf to read the serialized values from
+     * @return a map from column family to HFileDataBlockEncoder for the
+     *         configured data block type for the family
+     */
+    @VisibleForTesting
+    static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
+            Configuration conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
+        Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
+                DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
+        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+            encoderMap.put(e.getKey(), 
DataBlockEncoding.valueOf((e.getValue())));
+        }
+        return encoderMap;
+    }
+
+
+    /**
+     * Run inside the task to deserialize column family to given conf value 
map.
+     *
+     * @param conf to read the serialized values from
+     * @param confName conf key to read from the configuration
+     * @return a map of column family to the given configuration value
+     */
+    private static Map<byte[], String> createFamilyConfValueMap(
+            Configuration conf, String confName) {
+        Map<byte[], String> confValMap = new TreeMap<byte[], 
String>(Bytes.BYTES_COMPARATOR);
+        String confVal = conf.get(confName, "");
+        for (String familyConf : confVal.split("&")) {
+            String[] familySplit = familyConf.split("=");
+            if (familySplit.length != 2) {
+                continue;
+            }
+            try {
+                confValMap.put(URLDecoder.decode(familySplit[0], 
"UTF-8").getBytes(),
+                        URLDecoder.decode(familySplit[1], "UTF-8"));
+            } catch (UnsupportedEncodingException e) {
+                // will not happen with UTF-8 encoding
+                throw new AssertionError(e);
+            }
+        }
+        return confValMap;
+    }
+
+    /**
+     * Configure <code>job</code> with a TotalOrderPartitioner, partitioning 
against
+     * <code>splitPoints</code>. Cleans up the partitions file after job 
exists.
+     */
+    static void configurePartitioner(Job job, List<ImmutableBytesWritable> 
splitPoints)
+            throws IOException {
+        Configuration conf = job.getConfiguration();
+        // create the partitions file
+        FileSystem fs = FileSystem.get(conf);
+        Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), 
"partitions_" + UUID.randomUUID());
+        fs.makeQualified(partitionsPath);
+        writePartitions(conf, partitionsPath, splitPoints);
+        fs.deleteOnExit(partitionsPath);
+
+        // configure job to use it
+        job.setPartitionerClass(TotalOrderPartitioner.class);
+        TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
+    }
+
+    /**
+     * Serialize column family to compression algorithm map to configuration.
+     * Invoked while configuring the MR job for incremental load.
+     *
+     * @param table to read the properties from
+     * @param conf to persist serialized values into
+     * @throws IOException
+     *           on failure to read column family descriptors
+     */
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+            value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+    @VisibleForTesting
+    static void configureCompression(Configuration conf, HTableDescriptor 
tableDescriptor)
+            throws UnsupportedEncodingException {
+        StringBuilder compressionConfigValue = new StringBuilder();
+        if(tableDescriptor == null){
+            // could happen with mock table instance
+            return;
+        }
+        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+        int i = 0;
+        for (HColumnDescriptor familyDescriptor : families) {
+            if (i++ > 0) {
+                compressionConfigValue.append('&');
+            }
+            compressionConfigValue.append(URLEncoder.encode(
+                    familyDescriptor.getNameAsString(), "UTF-8"));
+            compressionConfigValue.append('=');
+            compressionConfigValue.append(URLEncoder.encode(
+                    familyDescriptor.getCompression().getName(), "UTF-8"));
+        }
+        // Get rid of the last ampersand
+        conf.set(COMPRESSION_FAMILIES_CONF_KEY, 
compressionConfigValue.toString());
+    }
+
+    /**
+     * Serialize column family to block size map to configuration.
+     * Invoked while configuring the MR job for incremental load.
+     * @param tableDescriptor to read the properties from
+     * @param conf to persist serialized values into
+     *
+     * @throws IOException
+     *           on failure to read column family descriptors
+     */
+    @VisibleForTesting
+    static void configureBlockSize(HTableDescriptor tableDescriptor, 
Configuration conf)
+            throws UnsupportedEncodingException {
+        StringBuilder blockSizeConfigValue = new StringBuilder();
+        if (tableDescriptor == null) {
+            // could happen with mock table instance
+            return;
+        }
+        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+        int i = 0;
+        for (HColumnDescriptor familyDescriptor : families) {
+            if (i++ > 0) {
+                blockSizeConfigValue.append('&');
+            }
+            blockSizeConfigValue.append(URLEncoder.encode(
+                    familyDescriptor.getNameAsString(), "UTF-8"));
+            blockSizeConfigValue.append('=');
+            blockSizeConfigValue.append(URLEncoder.encode(
+                    String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
+        }
+        // Get rid of the last ampersand
+        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 
blockSizeConfigValue.toString());
+    }
+
+    /**
+     * Serialize column family to bloom type map to configuration.
+     * Invoked while configuring the MR job for incremental load.
+     * @param tableDescriptor to read the properties from
+     * @param conf to persist serialized values into
+     *
+     * @throws IOException
+     *           on failure to read column family descriptors
+     */
+    @VisibleForTesting
+    static void configureBloomType(HTableDescriptor tableDescriptor, 
Configuration conf)
+            throws UnsupportedEncodingException {
+        if (tableDescriptor == null) {
+            // could happen with mock table instance
+            return;
+        }
+        StringBuilder bloomTypeConfigValue = new StringBuilder();
+        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+        int i = 0;
+        for (HColumnDescriptor familyDescriptor : families) {
+            if (i++ > 0) {
+                bloomTypeConfigValue.append('&');
+            }
+            bloomTypeConfigValue.append(URLEncoder.encode(
+                    familyDescriptor.getNameAsString(), "UTF-8"));
+            bloomTypeConfigValue.append('=');
+            String bloomType = 
familyDescriptor.getBloomFilterType().toString();
+            if (bloomType == null) {
+                bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+            }
+            bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+        }
+        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 
bloomTypeConfigValue.toString());
+    }
+
+    /**
+     * Serialize column family to data block encoding map to configuration.
+     * Invoked while configuring the MR job for incremental load.
+     *
+     * @param table to read the properties from
+     * @param conf to persist serialized values into
+     * @throws IOException
+     *           on failure to read column family descriptors
+     */
+    @VisibleForTesting
+    static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
+                                           Configuration conf) throws 
UnsupportedEncodingException {
+        if (tableDescriptor == null) {
+            // could happen with mock table instance
+            return;
+        }
+        StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
+        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+        int i = 0;
+        for (HColumnDescriptor familyDescriptor : families) {
+            if (i++ > 0) {
+                dataBlockEncodingConfigValue.append('&');
+            }
+            dataBlockEncodingConfigValue.append(
+                    URLEncoder.encode(familyDescriptor.getNameAsString(), 
"UTF-8"));
+            dataBlockEncodingConfigValue.append('=');
+            DataBlockEncoding encoding = 
familyDescriptor.getDataBlockEncoding();
+            if (encoding == null) {
+                encoding = DataBlockEncoding.NONE;
+            }
+            
dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
+                    "UTF-8"));
+        }
+        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+                dataBlockEncodingConfigValue.toString());
+    }
+}

Reply via email to