This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master-hadoop3 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e230d8bac0e54ae96f2de69c560f6ff605d7e84c Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Wed Jun 24 20:46:44 2020 +0800 Fix dependency issue in HDP3 and revert cod change to HBase bulk load --- core-metrics/pom.xml | 1 + pom.xml | 19 +- server-base/pom.xml | 2 +- server/pom.xml | 2 +- .../kylin/storage/hbase/steps/CreateHTableJob.java | 14 +- .../kylin/storage/hbase/steps/CubeHFileJob.java | 22 +- .../storage/hbase/steps/HFileOutputFormat3.java | 818 ++++++++++++--------- stream-receiver/pom.xml | 4 +- 8 files changed, 524 insertions(+), 358 deletions(-) diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml index eca9e5e..90056bb 100644 --- a/core-metrics/pom.xml +++ b/core-metrics/pom.xml @@ -48,6 +48,7 @@ <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> + <scope>compile</scope> </dependency> <!-- Env & Test --> diff --git a/pom.xml b/pom.xml index 77e2338..7ffe25f 100644 --- a/pom.xml +++ b/pom.xml @@ -594,7 +594,7 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> - <scope>provided</scope> + <scope>compile</scope> </dependency> <dependency> <groupId>com.jcraft</groupId> @@ -704,11 +704,23 @@ <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> + <exclusions> + <exclusion> + <artifactId>jetty-runner</artifactId> + <groupId>org.eclipse.jetty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> <version>${hive-hcatalog.version}</version> + <exclusions> + <exclusion> + <artifactId>jetty-runner</artifactId> + <groupId>org.eclipse.jetty</groupId> + </exclusion> + </exclusions> </dependency> <!-- Yarn dependencies --> <dependency> @@ -1084,11 +1096,6 @@ </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-runner</artifactId> - <version>${jetty.version}</version> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> <scope>test</scope> diff --git a/server-base/pom.xml b/server-base/pom.xml index 09a6858..cf8c380 100644 --- a/server-base/pom.xml +++ b/server-base/pom.xml @@ -266,7 +266,7 @@ <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> - <scope>provided</scope> + <scope>compile</scope> </dependency> </dependencies> diff --git a/server/pom.xml b/server/pom.xml index dc7bb44..19c013c 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -35,7 +35,7 @@ <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> - <scope>provided</scope> + <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.kylin</groupId> diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index b26f336..d961849 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.storage.hbase.steps; @@ -135,7 +135,7 @@ public class CreateHTableJob extends AbstractHadoopJob { Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); HTable htable = (HTable) conn.getTable(TableName.valueOf(hbaseTableName)); - HFileOutputFormat3.configureIncrementalLoadMap(job, htable); + HFileOutputFormat3.configureIncrementalLoadMap(job, htable.getDescriptor()); logger.info("Saving HBase configuration to {}", hbaseConfPath); FileSystem fs = HadoopUtil.getWorkingFileSystem(); @@ -160,7 +160,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, - final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) + final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) throws IOException { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); @@ -258,7 +258,7 @@ public class CreateHTableJob extends AbstractHadoopJob { } protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, - final Path outputFolder, final KylinConfig kylinConfig) throws IOException { + final Path outputFolder, final KylinConfig kylinConfig) throws IOException { if (outputFolder == null) { logger.warn("outputFolder for hfile split file is null, skip inner region split"); @@ -346,4 +346,4 @@ public class CreateHTableJob extends AbstractHadoopJob { int exitCode = ToolRunner.run(new CreateHTableJob(), args); System.exit(exitCode); } -} +} \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 28752ca..1e0e216 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -6,20 +6,18 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.storage.hbase.steps; -import static org.apache.hadoop.hbase.HBaseConfiguration.merge; - import java.io.IOException; import java.util.Collection; @@ -54,6 +52,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hbase.HBaseConfiguration.merge; + /** * @author George Song (ysong1) */ @@ -61,7 +61,6 @@ public class CubeHFileJob extends AbstractHadoopJob { protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class); - @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -84,7 +83,7 @@ public class CubeHFileJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); - // construct configuration for the MR job cluster + // use current hbase configuration Configuration configuration = new Configuration(HBaseConnection.getCurrentHBaseConfiguration()); String[] allServices = getAllServices(configuration); merge(configuration, getConf()); @@ -110,7 +109,6 @@ public class CubeHFileJob extends AbstractHadoopJob { RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName)); // Automatic config ! HFileOutputFormat3.configureIncrementalLoad(job, table, regionLocator); - HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir()); reconfigurePartitions(hbaseConf, partitionFilePath); job.setInputFormatClass(SequenceFileInputFormat.class); @@ -121,7 +119,7 @@ public class CubeHFileJob extends AbstractHadoopJob { job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class); // set block replication to 3 for hfiles - job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); + configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); this.deletePath(job.getConfiguration(), output); @@ -173,9 +171,9 @@ public class CubeHFileJob extends AbstractHadoopJob { private String[] getAllServices(Configuration hbaseConf) { Collection<String> hbaseHdfsServices - = hbaseConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES); + = hbaseConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES); Collection<String> mainNameServices - = getConf().getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES); + = getConf().getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES); mainNameServices.addAll(hbaseHdfsServices); return mainNameServices.toArray(new String[0]); } @@ -185,4 +183,4 @@ public class CubeHFileJob extends AbstractHadoopJob { System.exit(exitCode); } -} +} \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java index 2f139b5..8579ded 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java @@ -14,49 +14,52 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ + package org.apache.kylin.storage.hbase.steps; -import java.io.File; -import java.io.FileOutputStream; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URLDecoder; import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.function.Function; +import java.util.stream.Collectors; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.HStoreFile; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -66,17 +69,21 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; -import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.PutSortReducer; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TextSortReducer; +import org.apache.hadoop.hbase.mapreduce.CellSerialization; +import org.apache.hadoop.hbase.mapreduce.CellSortReducer; +import org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -89,71 +96,134 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.kylin.common.util.RandomUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.kylin.shaded.com.google.common.base.Strings; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** - * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788 - * + * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-4293|HBASE-22887 + * <p> * Writes HFiles. Passed Cells must arrive in order. * Writes current time as the sequence id for the file. Sets the major compacted * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll * all HFiles being written. * <p> * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}. + * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. */ @InterfaceAudience.Public -@InterfaceStability.Evolving -public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, Cell> { - static Log LOG = LogFactory.getLog(HFileOutputFormat3.class); +public class HFileOutputFormat3 + extends FileOutputFormat<ImmutableBytesWritable, Cell> { + private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat3.class); + + static class TableInfo { + private TableDescriptor tableDesctiptor; + private RegionLocator regionLocator; + + public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { + this.tableDesctiptor = tableDesctiptor; + this.regionLocator = regionLocator; + } + + /** + * The modification for the returned HTD doesn't affect the inner TD. + * + * @return A clone of inner table descriptor + * @deprecated use {@link #getTableDescriptor} + */ + @Deprecated + public HTableDescriptor getHTableDescriptor() { + return new HTableDescriptor(tableDesctiptor); + } + + public TableDescriptor getTableDescriptor() { + return tableDesctiptor; + } + + public RegionLocator getRegionLocator() { + return regionLocator; + } + } + + protected static final byte[] tableSeparator = Bytes.toBytes(";"); + + protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { + return Bytes.add(tableName, tableSeparator, suffix); + } // The following constants are private since these are used by // HFileOutputFormat2 to internally transfer data between job setup and // reducer run using conf. // These should not be changed by the client. - private static final String COMPRESSION_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.compression"; - private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; - private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; - private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; + static final String COMPRESSION_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.compression"; + static final String BLOOM_TYPE_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomtype"; + static final String BLOCK_SIZE_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.blocksize"; + static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; // This constant is public since the client can modify this when setting // up their conf object and thus refer to this symbol. // It is present for backwards compatibility reasons. Use it only to // override the auto-detection of datablock encoding. - public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; /** * Keep locality while generating HFiles for bulkload. See HBASE-12596 */ - public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled"; + public static final String LOCALITY_SENSITIVE_CONF_KEY = + "hbase.bulkload.locality.sensitive.enabled"; private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; - private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + static final String OUTPUT_TABLE_NAME_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.table.name"; + static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = + "hbase.mapreduce.use.multi.table.hfileoutputformat"; - private static final String BULKLOAD_HCONNECTION_CONF_KEY = "hbase.bulkload.hconnection.configuration"; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; + public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @Override - public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { + public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( + final TaskAttemptContext context) throws IOException, InterruptedException { return createRecordWriter(context, this.getOutputCommitter(context)); } - static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(final TaskAttemptContext context, - final OutputCommitter committer) throws IOException, InterruptedException { + protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { + return combineTableNameSuffix(tableName, family); + } + + static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> + createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) + throws IOException { // Get the path of the temporary output file - final Path outputdir = ((FileOutputCommitter) committer).getWorkPath(); + final Path outputDir = ((FileOutputCommitter) committer).getWorkPath(); final Configuration conf = context.getConfiguration(); - LOG.debug("Task output path: " + outputdir); - final FileSystem fs = outputdir.getFileSystem(conf); + final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + if (writeTableNames == null || writeTableNames.isEmpty()) { + throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY + + " cannot be empty"); + } + final FileSystem fs = outputDir.getFileSystem(conf); // These configs. are from hbase-*.xml - final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); + final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); - final boolean compactionExclude = conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", - false); + final String defaultCompressionStr = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + final Algorithm defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr); + final boolean compactionExclude = conf.getBoolean( + "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); + + final Set<String> allTableNames = Arrays.stream(writeTableNames.split( + Bytes.toString(tableSeparator))).collect(Collectors.toSet()); // create a map from column family to the compression algorithm final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); @@ -161,7 +231,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); - final Map<byte[], DataBlockEncoding> datablockEncodingMap = createFamilyDataBlockEncodingMap(conf); + final Map<byte[], DataBlockEncoding> datablockEncodingMap + = createFamilyDataBlockEncodingMap(conf); final DataBlockEncoding overriddenEncoding; if (dataBlockEncodingStr != null) { overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); @@ -169,55 +240,83 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, overriddenEncoding = null; } - final Configuration hConnectionConf = getConfigureHConnection(conf); - return new RecordWriter<ImmutableBytesWritable, V>() { // Map of families to writers and how much has been output on the writer. - private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR); - private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte[] now = Bytes.toBytes(System.currentTimeMillis()); - private boolean rollRequested = false; + private final Map<byte[], WriterLength> writers = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + private final Map<byte[], byte[]> previousRows = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + private final long now = EnvironmentEdgeManager.currentTime(); @Override - public void write(ImmutableBytesWritable row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + public void write(ImmutableBytesWritable row, V cell) + throws IOException { + Cell kv = cell; + // null input == user explicitly wants to flush if (row == null && kv == null) { - rollWriters(); + rollWriters(null); return; } + byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); + int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; byte[] family = CellUtil.cloneFamily(kv); - WriterLength wl = this.writers.get(family); - if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); + byte[] tableNameBytes = null; + if (writeMultipleTables) { + tableNameBytes = HFileOutputFormat3.getTableName(row.get()); + if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { + throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + + "' not" + " expected"); + } + } else { + tableNameBytes = Bytes.toBytes(writeTableNames); } - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; + byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); + WriterLength wl = this.writers.get(tableAndFamily); + + // If this is a new column family, verify that the directory exists + if (wl == null) { + Path writerPath = null; + if (writeMultipleTables) { + writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes + .toString(family))); + } else { + writerPath = new Path(outputDir, Bytes.toString(family)); + } + fs.mkdirs(writerPath); + configureStoragePolicy(conf, fs, tableAndFamily, writerPath); } - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); + + // This can only happen once a row is finished though + if (wl != null && wl.written + length >= maxsize + && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) { + rollWriters(wl); } + + // create a new WAL writer, if necessary if (wl == null || wl.writer == null) { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; - String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + + String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(hConnectionConf); - RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { + try (Connection connection = ConnectionFactory.createConnection(conf); + RegionLocator locator = + connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); } catch (Throwable e) { - LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), e); + LOG.warn("There's something wrong when locating rowkey: " + + Bytes.toString(rowKey) + " for tablename: " + tableName, e); loc = null; } } if (null == loc) { if (LOG.isTraceEnabled()) { - LOG.trace("failed to get region location, so use default writer: " + + LOG.trace("failed to get region location, so use default writer for rowkey: " + Bytes.toString(rowKey)); } - wl = getNewWriter(family, conf, null); + wl = getNewWriter(tableNameBytes, family, conf, null); } else { if (LOG.isDebugEnabled()) { LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); @@ -229,86 +328,124 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" + loc.getPort() + ", so use default writer"); } - wl = getNewWriter(family, conf, null); + wl = getNewWriter(tableNameBytes, family, conf, null); } else { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); } - wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); + wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[]{initialIsa + }); } } } else { - wl = getNewWriter(family, conf, null); + wl = getNewWriter(tableNameBytes, family, conf, null); } } - kv.updateLatestStamp(this.now); + + // we now have the proper WAL writer. full steam ahead + PrivateCellUtil.updateLatestStamp(cell, this.now); wl.writer.append(kv); wl.written += length; - this.previousRow = rowKey; + + // Copy the row so we know when a row transition. + this.previousRows.put(family, rowKey); } - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); - close(wl.writer); + private void rollWriters(WriterLength writerLength) throws IOException { + if (writerLength != null) { + closeWriter(writerLength); + } else { + for (WriterLength wl : this.writers.values()) { + closeWriter(wl); } - wl.writer = null; - wl.written = 0; } - this.rollRequested = false; } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf, InetSocketAddress[] favoredNodes) - throws IOException { + private void closeWriter(WriterLength wl) throws IOException { + if (wl.writer != null) { + LOG.info( + "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); + close(wl.writer); + } + wl.writer = null; + wl.written = 0; + } + + /* + * Create a new StoreFile.Writer. + * @param family + * @return A WriterLength, containing a new StoreFile.Writer. + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", + justification = "Not important") + private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration + conf, InetSocketAddress[] favoredNodes) throws IOException { + byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); + Path familydir = new Path(outputDir, Bytes.toString(family)); + if (writeMultipleTables) { + familydir = new Path(outputDir, + new Path(Bytes.toString(tableName), Bytes.toString(family))); + } WriterLength wl = new WriterLength(); - Path familydir = new Path(outputdir, Bytes.toString(family)); - Algorithm compression = compressionMap.get(family); + Algorithm compression = compressionMap.get(tableAndFamily); compression = compression == null ? defaultCompression : compression; - BloomType bloomType = bloomTypeMap.get(family); + BloomType bloomType = bloomTypeMap.get(tableAndFamily); bloomType = bloomType == null ? BloomType.NONE : bloomType; - Integer blockSize = blockSizeMap.get(family); + Integer blockSize = blockSizeMap.get(tableAndFamily); blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; DataBlockEncoding encoding = overriddenEncoding; - encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; + encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; encoding = encoding == null ? DataBlockEncoding.NONE : encoding; Configuration tempConf = new Configuration(conf); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) + HFileContextBuilder contextBuilder = new HFileContextBuilder() + .withCompression(compression) .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize); + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(blockSize); + + if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { + contextBuilder.withIncludesTags(true); + } + contextBuilder.withDataBlockEncoding(encoding); HFileContext hFileContext = contextBuilder.build(); - if (null == favoredNodes) { - StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs); - wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType) - .withComparator(new CellComparatorImpl.MetaCellComparator()).withFileContext(hFileContext).build(); + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); } else { - StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs); - wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType) - .withComparator(new CellComparatorImpl.MetaCellComparator()) - .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build(); + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); } - this.writers.put(family, wl); + this.writers.put(tableAndFamily, wl); return wl; } private void close(final StoreFileWriter w) throws IOException { if (w != null) { - w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); - w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); - w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); + w.appendFileInfo(BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + w.appendFileInfo(BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + w.appendFileInfo(MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); w.appendTrackedTimestampsToMetadata(); w.close(); } } @Override - public void close(TaskAttemptContext c) throws IOException, InterruptedException { + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { for (WriterLength wl : this.writers.values()) { close(wl.writer); } @@ -316,6 +453,21 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, }; } + /** + * Configure block storage policy for CF after the directory is created. + */ + static void configureStoragePolicy(final Configuration conf, final FileSystem fs, + byte[] tableAndFamily, Path cfPath) { + if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { + return; + } + + String policy = + conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), + conf.get(STORAGE_POLICY_PROPERTY)); + FSUtils.setStoragePolicy(fs, cfPath, policy); + } + /* * Data structure to hold a Writer and amount of data written on it. */ @@ -328,11 +480,27 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * Return the start keys of all of the regions in this table, * as a list of ImmutableBytesWritable. */ - private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) throws IOException { - byte[][] byteKeys = table.getStartKeys(); - ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(byteKeys.length); - for (byte[] byteKey : byteKeys) { - ret.add(new ImmutableBytesWritable(byteKey)); + private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, + boolean writeMultipleTables) + throws IOException { + + ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); + for (RegionLocator regionLocator : regionLocators) { + TableName tableName = regionLocator.getName(); + LOG.info("Looking up current regions for table " + tableName); + byte[][] byteKeys = regionLocator.getStartKeys(); + for (byte[] byteKey : byteKeys) { + byte[] fullKey = byteKey; //HFileOutputFormat2 use case + if (writeMultipleTables) { + //MultiTableHFileOutputFormat use case + fullKey = combineTableNameSuffix(tableName.getName(), byteKey); + } + if (LOG.isDebugEnabled()) { + LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + + Bytes.toStringBinary(fullKey) + "]"); + } + ret.add(new ImmutableBytesWritable(fullKey)); + } } return ret; } @@ -342,8 +510,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * {@link TotalOrderPartitioner} that contains the split points in startKeys. */ @SuppressWarnings("deprecation") - private static void writePartitions(Configuration conf, Path partitionsPath, List<ImmutableBytesWritable> startKeys) - throws IOException { + private static void writePartitions(Configuration conf, Path partitionsPath, + List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { throw new IllegalArgumentException("No regions passed"); @@ -353,18 +521,22 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, // have keys < the first region (which has an empty start key) // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 - TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>(startKeys); - + TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); ImmutableBytesWritable first = sorted.first(); - if (!Arrays.equals(first.get(), HConstants.EMPTY_BYTE_ARRAY)) { - throw new IllegalArgumentException("First region of table should have empty start key. Instead has: " - + Bytes.toStringBinary(first.get())); + if (writeMultipleTables) { + first = new ImmutableBytesWritable(HFileOutputFormat3.getSuffix(sorted.first().get())); + } + if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { + throw new IllegalArgumentException( + "First region of table should have empty start key. Instead has: " + + Bytes.toStringBinary(first.get())); } - sorted.remove(first); + sorted.remove(sorted.first()); // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class, + SequenceFile.Writer writer = SequenceFile.createWriter( + fs, conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); try { @@ -376,70 +548,6 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, } } - public static File configureHConnection(Job job, Configuration hConnectionConf, File tempDir) throws IOException { - File tempFile = new File(tempDir, "HConfiguration-" + System.currentTimeMillis() + ".xml"); - tempFile.deleteOnExit(); - - FileOutputStream os = new FileOutputStream(tempFile); - hConnectionConf.writeXml(os); - os.close(); - - String tmpFiles = job.getConfiguration().get("tmpfiles", null); - if (tmpFiles == null) { - tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath()); - } else { - tmpFiles += "," + fixWindowsPath("file://" + tempFile.getAbsolutePath()); - } - job.getConfiguration().set("tmpfiles", tmpFiles); - LOG.info("A temporary file " + tempFile.getAbsolutePath() - + " is created for storing hconnection related configuration!!!"); - - job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, tempFile.getName()); - return tempFile; - } - - public static Configuration getConfigureHConnection(Configuration jobConf) { - if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) { - return jobConf; - } - File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY)); - Configuration hConnectionConf = new Configuration(false); - hConnectionConf.addResource(new Path(tempFile.toURI())); - return hConnectionConf; - } - - public static String fixWindowsPath(String path) { - // fix windows path - if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) { - path = path.replace("file://", "file:///"); - } - if (path.startsWith("file:///")) { - path = path.replace('\\', '/'); - } - return path; - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - * - * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. - */ - @Deprecated - public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator()); - } - /** * Configure a MapReduce Job to perform an incremental load into the given * table. This @@ -454,8 +562,9 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. */ - public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) + throws IOException { + configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } /** @@ -472,23 +581,34 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. */ - public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator) - throws IOException { - configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat3.class); + public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, + RegionLocator regionLocator) throws IOException { + ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); + singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); + configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat3.class); } - static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator, - Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException { + static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, + Class<? extends OutputFormat<?, ?>> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(MapReduceExtendedCell.class); job.setOutputFormatClass(cls); + if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { + throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); + } + boolean writeMultipleTables = false; + if (MultiTableHFileOutputFormat.class.equals(cls)) { + writeMultipleTables = true; + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); + } // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); + if (KeyValue.class.equals(job.getMapOutputValueClass()) + || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(CellSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { @@ -497,50 +617,75 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } - conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), - ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + CellSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { - // record this table name for creating writer by favored nodes LOG.info("bulkload locality sensitive enabled"); - conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString()); } - + + /* Now get the region start keys for every table required */ + List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); + List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); + List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); + + for (TableInfo tableInfo : multiTableInfo) { + regionLocators.add(tableInfo.getRegionLocator()); + allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); + tableDescriptors.add(tableInfo.getTableDescriptor()); + } + // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes + .toString(tableSeparator))); + List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + tableDescriptor.getTableName()); - List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); + LOG.info("Configuring " + startKeys.size() + " reduce partitions " + + "to match current region count for all tables"); job.setNumReduceTasks(startKeys.size()); - configurePartitioner(job, startKeys); + configurePartitioner(job, startKeys, writeMultipleTables); // Set compression algorithms based on column families - configureCompression(conf, tableDescriptor); - configureBloomType(tableDescriptor, conf); - configureBlockSize(tableDescriptor, conf); - configureDataBlockEncoding(tableDescriptor, conf); + + conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, + tableDescriptors)); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, + tableDescriptors)); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, + tableDescriptors)); + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + regionLocator.getName() + " output configured."); + LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); } - public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { + public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws + IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); + job.setOutputValueClass(MapReduceExtendedCell.class); job.setOutputFormatClass(HFileOutputFormat3.class); + ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); + singleTableDescriptor.add(tableDescriptor); + + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); // Set compression algorithms based on column families - configureCompression(conf, table.getTableDescriptor()); - configureBloomType(table.getTableDescriptor(), conf); - configureBlockSize(table.getTableDescriptor(), conf); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - configureDataBlockEncoding(tableDescriptor, conf); + conf.set(COMPRESSION_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + table.getName() + " output configured."); + LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); } /** @@ -551,9 +696,11 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * @return a map from column family to the configured compression algorithm */ @VisibleForTesting - static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); - Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR); + static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration + conf) { + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + COMPRESSION_FAMILIES_CONF_KEY); + Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Map.Entry<byte[], String> e : stringMap.entrySet()) { Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); compressionMap.put(e.getKey(), algorithm); @@ -570,8 +717,9 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, */ @VisibleForTesting static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); - Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], BloomType>(Bytes.BYTES_COMPARATOR); + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + BLOOM_TYPE_FAMILIES_CONF_KEY); + Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Map.Entry<byte[], String> e : stringMap.entrySet()) { BloomType bloomType = BloomType.valueOf(e.getValue()); bloomTypeMap.put(e.getKey(), bloomType); @@ -588,8 +736,9 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, */ @VisibleForTesting static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); - Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + BLOCK_SIZE_FAMILIES_CONF_KEY); + Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Map.Entry<byte[], String> e : stringMap.entrySet()) { Integer blockSize = Integer.parseInt(e.getValue()); blockSizeMap.put(e.getKey(), blockSize); @@ -603,27 +752,31 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * * @param conf to read the serialized values from * @return a map from column family to HFileDataBlockEncoder for the - * configured data block type for the family + * configured data block type for the family */ @VisibleForTesting - static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); - Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], DataBlockEncoding>(Bytes.BYTES_COMPARATOR); + static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( + Configuration conf) { + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + DATABLOCK_ENCODING_FAMILIES_CONF_KEY); + Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Map.Entry<byte[], String> e : stringMap.entrySet()) { encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); } return encoderMap; } + /** * Run inside the task to deserialize column family to given conf value map. * - * @param conf to read the serialized values from + * @param conf to read the serialized values from * @param confName conf key to read from the configuration * @return a map of column family to the given configuration value */ - private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { - Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR); + private static Map<byte[], String> createFamilyConfValueMap( + Configuration conf, String confName) { + Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); String confVal = conf.get(confName, ""); for (String familyConf : confVal.split("&")) { String[] familySplit = familyConf.split("="); @@ -631,7 +784,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, continue; } try { - confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8), + confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), URLDecoder.decode(familySplit[1], "UTF-8")); } catch (UnsupportedEncodingException e) { // will not happen with UTF-8 encoding @@ -645,13 +798,18 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ - static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { + static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean + writeMultipleTables) + throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file FileSystem fs = FileSystem.get(conf); - Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID()); + String hbaseTmpFsDir = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + RandomUtil.randomUUID()); fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints); + writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); fs.deleteOnExit(partitionsPath); // configure job to use it @@ -659,134 +817,134 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); } - /** - * Serialize column family to compression algorithm map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") @VisibleForTesting - static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) + static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables) throws UnsupportedEncodingException { - StringBuilder compressionConfigValue = new StringBuilder(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); + StringBuilder attributeValue = new StringBuilder(); int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - compressionConfigValue.append('&'); + for (TableDescriptor tableDescriptor : allTables) { + if (tableDescriptor == null) { + // could happen with mock table instance + // CODEREVIEW: Can I set an empty string in conf if mock table instance? + return ""; + } + for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { + if (i++ > 0) { + attributeValue.append('&'); + } + attributeValue.append(URLEncoder.encode( + Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), + "UTF-8")); + attributeValue.append('='); + attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); } - compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - compressionConfigValue.append('='); - compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); } // Get rid of the last ampersand - conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString()); + return attributeValue.toString(); } /** - * Serialize column family to block size map to configuration. + * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. + * * @param tableDescriptor to read the properties from * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> + familyDescriptor.getCompressionType().getName(); + + /** + * Serialize column family to block size map to configuration. Invoked while + * configuring the MR job for incremental load. * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into * @throws IOException - * on failure to read column family descriptors + * on failure to read column family descriptors */ @VisibleForTesting - static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - StringBuilder blockSizeConfigValue = new StringBuilder(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - blockSizeConfigValue.append('&'); - } - blockSizeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - blockSizeConfigValue.append('='); - blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString()); - } + static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String + .valueOf(familyDescriptor.getBlocksize()); /** - * Serialize column family to bloom type map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into + * Serialize column family to bloom type map to configuration. Invoked while + * configuring the MR job for incremental load. * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into * @throws IOException - * on failure to read column family descriptors + * on failure to read column family descriptors */ @VisibleForTesting - static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - StringBuilder bloomTypeConfigValue = new StringBuilder(); - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - bloomTypeConfigValue.append('&'); - } - bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - bloomTypeConfigValue.append('='); - String bloomType = familyDescriptor.getBloomFilterType().toString(); - if (bloomType == null) { - bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; - } - bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); + static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { + String bloomType = familyDescriptor.getBloomFilterType().toString(); + if (bloomType == null) { + bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); } - conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); - } + return bloomType; + }; /** * Serialize column family to data block encoding map to configuration. * Invoked while configuring the MR job for incremental load. * - * @param table to read the properties from - * @param conf to persist serialized values into + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into * @throws IOException - * on failure to read column family descriptors + * on failure to read column family descriptors */ @VisibleForTesting - static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; + static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { + DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); + if (encoding == null) { + encoding = DataBlockEncoding.NONE; } - StringBuilder dataBlockEncodingConfigValue = new StringBuilder(); - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - dataBlockEncodingConfigValue.append('&'); - } - dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - dataBlockEncodingConfigValue.append('='); - DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); - if (encoding == null) { - encoding = DataBlockEncoding.NONE; - } - dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8")); + return encoding.toString(); + }; + + /** + * Copy from HBase's org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat, + * so that it's protect function can be used. + */ + final private static int validateCompositeKey(byte[] keyBytes) { + + int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator); + + // Either the separator was not found or a tablename wasn't present or a key wasn't present + if (separatorIdx == -1) { + throw new IllegalArgumentException("Invalid format for composite key [" + Bytes + .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key"); } - conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString()); + return separatorIdx; } -} + + /** + * Copy from HBase's org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat, + * so that it's protect function can be used. + */ + protected static byte[] getTableName(byte[] keyBytes) { + int separatorIdx = validateCompositeKey(keyBytes); + return Bytes.copy(keyBytes, 0, separatorIdx); + } + + /** + * Copy from HBase's org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat, + * so that it's protect function can be used. + */ + + protected static byte[] getSuffix(byte[] keyBytes) { + int separatorIdx = validateCompositeKey(keyBytes); + return Bytes.copy(keyBytes, separatorIdx + 1, keyBytes.length - separatorIdx - 1); + } + +} \ No newline at end of file diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml index fc0adf1..268e162 100644 --- a/stream-receiver/pom.xml +++ b/stream-receiver/pom.xml @@ -217,6 +217,7 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> + <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> @@ -224,7 +225,8 @@ </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-runner</artifactId> + <artifactId>jetty-util</artifactId> + <scope>compile</scope> </dependency> <dependency>