This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.x-hadoop3.1 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d1d2a5d0992d045ebdd9ac217a6a54975c339f87 Author: Cheng Wang <cheng.w...@kyligence.io> AuthorDate: Tue Apr 25 18:45:57 2017 +0800 KYLIN-2565, upgrade to Hadoop3.0 --- .../mr/common/DefaultSslProtocolSocketFactory.java | 150 ------ .../kylin/engine/mr/common/HadoopStatusGetter.java | 280 ++++++++++ .../apache/kylin/engine/spark/SparkCountDemo.java | 80 +++ .../org/apache/kylin/engine/spark/SparkCubing.java | 591 +++++++++++++++++++++ pom.xml | 19 +- server-base/pom.xml | 5 + .../org/apache/kylin/rest/security/MockHTable.java | 112 ++-- .../kylin/storage/hbase/HBaseConnection.java | 5 + .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 15 +- .../v2/coprocessor/endpoint/CubeVisitService.java | 4 +- .../kylin/storage/hbase/steps/CubeHFileJob.java | 12 + .../storage/hbase/steps/HBaseCuboidWriter.java | 133 +++++ .../kylin/storage/hbase/util/CubeMigrationCLI.java | 2 +- .../storage/hbase/util/DeployCoprocessorCLI.java | 3 +- .../storage/hbase/util/ExtendCubeToHybridCLI.java | 2 +- .../hbase/util/GridTableHBaseBenchmark.java | 2 +- .../kylin/storage/hbase/util/PingHBaseCLI.java | 3 +- .../storage/hbase/steps/CubeHFileMapperTest.java | 22 +- .../kylin/storage/hbase/steps/TestHbaseClient.java | 14 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 14 +- .../apache/kylin/tool/CubeMigrationCheckCLI.java | 17 +- .../apache/kylin/tool/ExtendCubeToHybridCLI.java | 2 +- .../org/apache/kylin/tool/StorageCleanupJob.java | 1 + 23 files changed, 1240 insertions(+), 248 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java deleted file mode 100644 index d66e4eb..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.common; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.net.UnknownHostException; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; - -import org.apache.commons.httpclient.ConnectTimeoutException; -import org.apache.commons.httpclient.HttpClientError; -import org.apache.commons.httpclient.params.HttpConnectionParams; -import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory; -import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory { - /** Log object for this class. */ - private static Logger logger = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class); - private SSLContext sslcontext = null; - - /** - * Constructor for DefaultSslProtocolSocketFactory. - */ - public DefaultSslProtocolSocketFactory() { - super(); - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int) - */ - public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort); - } - - /** - * Attempts to get a new socket connection to the given host within the - * given time limit. - * - * <p> - * To circumvent the limitations of older JREs that do not support connect - * timeout a controller thread is executed. The controller thread attempts - * to create a new socket within the given limit of time. If socket - * constructor does not return until the timeout expires, the controller - * terminates and throws an {@link ConnectTimeoutException} - * </p> - * - * @param host - * the host name/IP - * @param port - * the port on the host - * @param localAddress - * the local host name/IP to bind the socket to - * @param localPort - * the port on the local machine - * @param params - * {@link HttpConnectionParams Http connection parameters} - * - * @return Socket a new socket - * - * @throws IOException - * if an I/O error occurs while creating the socket - * @throws UnknownHostException - * if the IP address of the host cannot be determined - * @throws ConnectTimeoutException - * DOCUMENT ME! - * @throws IllegalArgumentException - * DOCUMENT ME! - */ - public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException { - if (params == null) { - throw new IllegalArgumentException("Parameters may not be null"); - } - - int timeout = params.getConnectionTimeout(); - - if (timeout == 0) { - return createSocket(host, port, localAddress, localPort); - } else { - // To be eventually deprecated when migrated to Java 1.4 or above - return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout); - } - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int) - */ - public Socket createSocket(String host, int port) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(host, port); - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean) - */ - public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose); - } - - public boolean equals(Object obj) { - return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class)); - } - - public int hashCode() { - return DefaultX509TrustManager.class.hashCode(); - } - - private static SSLContext createEasySSLContext() { - try { - SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null); - - return context; - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new HttpClientError(e.toString()); - } - } - - private SSLContext getSSLContext() { - if (this.sslcontext == null) { - this.sslcontext = createEasySSLContext(); - } - - return this.sslcontext; - } -} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java new file mode 100644 index 0000000..0245c1c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.nio.charset.Charset; +import java.security.KeyManagementException; +import java.security.Principal; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.auth.AuthSchemeRegistry; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.params.AuthPolicy; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.DefaultHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + */ +public class HadoopStatusGetter { + + private final String mrJobId; + private final String yarnUrl; + + protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusGetter.class); + + public HadoopStatusGetter(String yarnUrl, String mrJobId) { + this.yarnUrl = yarnUrl; + this.mrJobId = mrJobId; + } + + public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberosAuth) throws IOException { + String applicationId = mrJobId.replace("job", "application"); + String url = yarnUrl.replace("${job_id}", applicationId); + String response = useKerberosAuth ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); + logger.debug("Hadoop job " + mrJobId + " status : " + response); + JsonNode root = new ObjectMapper().readTree(response); + RMAppState state = RMAppState.valueOf(root.findValue("state").textValue()); + FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").textValue()); + return Pair.of(state, finalStatus); + } + + private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; + + private String getHttpResponseWithKerberosAuth(String url) throws IOException { + String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); + if (krb5ConfigPath == null) { + krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; + } + boolean skipPortAtKerberosDatabaseLookup = true; + System.setProperty("java.security.krb5.conf", krb5ConfigPath); + System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); + + DefaultHttpClient client = new DefaultHttpClient(); + AuthSchemeRegistry authSchemeRegistry = new AuthSchemeRegistry(); + authSchemeRegistry.register(AuthPolicy.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)); + client.setAuthSchemes(authSchemeRegistry); + + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + Credentials useJaasCreds = new Credentials() { + public String getPassword() { + return null; + } + + public Principal getUserPrincipal() { + return null; + } + }; + credentialsProvider.setCredentials(new AuthScope(null, -1, null), useJaasCreds); + client.setCredentialsProvider(credentialsProvider); + + String response = null; + while (response == null) { + if (url.startsWith("https://")) { + registerEasyHttps(client); + } + if (url.contains("anonymous=true") == false) { + url += url.contains("?") ? "&" : "?"; + url += "anonymous=true"; + } + HttpGet httpget = new HttpGet(url); + httpget.addHeader("accept", "application/json"); + try { + HttpResponse httpResponse = client.execute(httpget); + String redirect = null; + org.apache.http.Header h = httpResponse.getFirstHeader("Location"); + if (h != null) { + redirect = h.getValue(); + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000L); + continue; + } + } else { + h = httpResponse.getFirstHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000L); + continue; + } + } + } + } + + if (redirect == null) { + response = IOUtils.toString(httpResponse.getEntity().getContent(), Charset.defaultCharset()); + logger.debug("Job " + mrJobId + " get status check result.\n"); + } else { + url = redirect; + logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error(e.getMessage()); + } finally { + httpget.releaseConnection(); + } + } + + return response; + } + + private String getHttpResponse(String url) throws IOException { + HttpClient client = new DefaultHttpClient(); + + String response = null; + while (response == null) { // follow redirects via 'refresh' + if (url.startsWith("https://")) { + registerEasyHttps(client); + } + if (url.contains("anonymous=true") == false) { + url += url.contains("?") ? "&" : "?"; + url += "anonymous=true"; + } + + HttpGet get = new HttpGet(url); + get.addHeader("accept", "application/json"); + + try { + HttpResponse res = client.execute(get); + + String redirect = null; + Header h = res.getFirstHeader("Location"); + if (h != null) { + redirect = h.getValue(); + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000L); + continue; + } + } else { + h = res.getFirstHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000L); + continue; + } + } + } + } + + if (redirect == null) { + response = res.getStatusLine().toString(); + logger.debug("Job " + mrJobId + " get status check result.\n"); + } else { + url = redirect; + logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error(e.getMessage()); + } finally { + get.releaseConnection(); + } + } + + return response; + } + + private static void registerEasyHttps(HttpClient client) { + SSLContext sslContext; + try { + sslContext = SSLContext.getInstance("SSL"); + + // set up a TrustManager that trusts everything + try { + sslContext.init(null, new TrustManager[] { new DefaultX509TrustManager(null) { + public X509Certificate[] getAcceptedIssuers() { + logger.debug("getAcceptedIssuers"); + return null; + } + + public void checkClientTrusted(X509Certificate[] certs, String authType) { + logger.debug("checkClientTrusted"); + } + + public void checkServerTrusted(X509Certificate[] certs, String authType) { + logger.debug("checkServerTrusted"); + } + } }, new SecureRandom()); + } catch (KeyManagementException e) { + } + SSLSocketFactory ssf = new SSLSocketFactory(sslContext, SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + ClientConnectionManager ccm = client.getConnectionManager(); + SchemeRegistry sr = ccm.getSchemeRegistry(); + sr.register(new Scheme("https", 443, ssf)); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + private static boolean isValidURL(String value) { + if (StringUtils.isNotEmpty(value)) { + java.net.URL url; + try { + url = new java.net.URL(value); + } catch (MalformedURLException var5) { + return false; + } + + return StringUtils.isNotEmpty(url.getProtocol()) && StringUtils.isNotEmpty(url.getHost()); + } + + return false; + } + +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java new file mode 100644 index 0000000..a079a57 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.spark; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; + +import scala.Tuple2; + +/** + */ +public class SparkCountDemo extends AbstractApplication { + + private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); + + private Options options; + + public SparkCountDemo() { + options = new Options(); + // options.addOption(OPTION_INPUT_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system + SparkConf conf = new SparkConf().setAppName("Simple Application"); + JavaSparkContext sc = new JavaSparkContext(conf); + final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() { + + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, s.length()); + } + }).sortByKey(); + logData.persist(StorageLevel.MEMORY_AND_DISK_SER()); + + System.out.println("line number:" + logData.count()); + + logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() { + @Override + public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { + ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes()); + KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes()); + return new Tuple2(key, value); + } + }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class); + + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java new file mode 100644 index 0000000..a87d66b --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -0,0 +1,591 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.spark; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; +import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.kv.CubeDimEncMap; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.cube.model.DimensionDesc; +import org.apache.kylin.cube.model.RowKeyDesc; +import org.apache.kylin.cube.util.CubingUtils; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; +import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; +import org.apache.kylin.engine.spark.util.IteratorUtils; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.steps.CreateHTableJob; +import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; +import org.apache.spark.Partitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.HiveContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.primitives.UnsignedBytes; + +import scala.Tuple2; + +/** + */ +public class SparkCubing extends AbstractApplication { + + protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class); + + private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); + private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); + private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor"); + + private Options options; + + public SparkCubing() { + options = new Options(); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_CONF_PATH); + options.addOption(OPTION_COPROCESSOR); + + } + + @Override + protected Options getOptions() { + return options; + } + + public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException { + File metaDir = new File(folder); + if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) { + System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath()); + logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath()); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath()); + kylinConfig.setMetadataUrl(metaDir.getAbsolutePath()); + return kylinConfig; + } else { + return KylinConfig.getInstanceFromEnv(); + } + } + + private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { + ClassUtil.addClasspath(confPath); + final File[] files = new File(confPath).listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + if (pathname.getAbsolutePath().endsWith(".xml")) { + return true; + } + if (pathname.getAbsolutePath().endsWith(".properties")) { + return true; + } + return false; + } + }); + if (files == null) { + return; + } + for (File file : files) { + sc.addFile(file.getAbsolutePath()); + } + } + + private void writeDictionary(DataFrame intermediateTable, String cubeName, String segmentId) throws Exception { + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); + final String[] columns = intermediateTable.columns(); + final CubeSegment seg = cubeInstance.getSegmentById(segmentId); + final CubeDesc cubeDesc = cubeInstance.getDescriptor(); + final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap(); + final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc); + final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); + final long start = System.currentTimeMillis(); + final RowKeyDesc rowKey = cubeDesc.getRowkey(); + for (int i = 0; i < baseCuboidColumn.size(); i++) { + TblColRef col = baseCuboidColumn.get(i); + if (!rowKey.isUseDictionary(col)) { + continue; + } + final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i]; + tblColRefMap.put(rowKeyColumnIndex, col); + } + + Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); + for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) { + final String column = columns[entry.getKey()]; + final TblColRef tblColRef = entry.getValue(); + final DataFrame frame = intermediateTable.select(column).distinct(); + + final Row[] rows = frame.collect(); + dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() { + @Override + public Iterator<String> iterator() { + return new Iterator<String>() { + int i = 0; + + @Override + public boolean hasNext() { + return i < rows.length; + } + + @Override + public String next() { + if (hasNext()) { + final Row row = rows[i++]; + final Object o = row.get(0); + return o != null ? o.toString() : null; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }))); + } + final long end = System.currentTimeMillis(); + CubingUtils.writeDictionary(seg, dictionaryMap, start, end); + try { + CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); + cubeBuilder.setToUpdateSegs(seg); + cubeManager.updateCube(cubeBuilder); + } catch (IOException e) { + throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); + } + } + + private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); + CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + CubeDesc cubeDesc = cubeInstance.getDescriptor(); + CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); + List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); + final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap(); + for (Long id : allCuboidIds) { + zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); + } + + CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + + final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes(); + final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); + final ByteArray[] row_hashcodes = new ByteArray[nRowKey]; + + for (Long cuboidId : allCuboidIds) { + Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)]; + + long mask = Long.highestOneBit(baseCuboidId); + int position = 0; + for (int i = 0; i < nRowKey; i++) { + if ((mask & cuboidId) > 0) { + cuboidBitSet[position] = i; + position++; + } + mask = mask >> 1; + } + allCuboidsBitSet.put(cuboidId, cuboidBitSet); + } + for (int i = 0; i < nRowKey; ++i) { + row_hashcodes[i] = new ByteArray(); + } + + final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() { + + final HashFunction hashFunction = Hashing.murmur3_128(); + + @Override + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception { + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hashFunction.newHasher(); + String colValue = v2.get(rowKeyColumnIndexes[i]); + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { + Hasher hc = hashFunction.newHasher(); + HLLCounter counter = v1.get(entry.getKey()); + final Integer[] cuboidBitSet = entry.getValue(); + for (int position = 0; position < cuboidBitSet.length; position++) { + hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); + } + counter.add(hc.hash().asBytes()); + } + return v1; + } + }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { + @Override + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception { + Preconditions.checkArgument(v1.size() == v2.size()); + Preconditions.checkArgument(v1.size() > 0); + for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { + final HLLCounter counter1 = entry.getValue(); + final HLLCounter counter2 = v2.get(entry.getKey()); + counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); + } + return v1; + } + + }); + return samplingResult; + } + + /** return hfile location */ + private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception { + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + CubeDesc cubeDesc = cubeInstance.getDescriptor(); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); + final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap(); + final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap(); + for (TblColRef tblColRef : baseCuboidColumn) { + columnLengthMap.put(tblColRef, dimEncMap.get(tblColRef).getLengthOfEncoding()); + } + final Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); + for (DimensionDesc dim : cubeDesc.getDimensions()) { + // dictionary + for (TblColRef col : dim.getColumnRefs()) { + if (cubeDesc.getRowkey().isUseDictionary(col)) { + Dictionary<String> dict = cubeSegment.getDictionary(col); + if (dict == null) { + System.err.println("Dictionary for " + col + " was not found."); + continue; + } + dictionaryMap.put(col, dict); + System.out.println("col:" + col + " dictionary size:" + dict.getSize()); + } + } + } + + for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { + FunctionDesc func = measureDesc.getFunction(); + List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func); + for (TblColRef col : colRefs) { + dictionaryMap.put(col, cubeSegment.getDictionary(col)); + } + } + + final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { + + @Override + public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception { + long t = System.currentTimeMillis(); + prepare(); + + final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + + LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); + System.out.println("load properties finished"); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); + AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap); + final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); + Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); + try { + while (listIterator.hasNext()) { + for (List<String> row : listIterator.next()) { + blockingQueue.put(row); + } + } + blockingQueue.put(Collections.<String> emptyList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); + return sparkCuboidWriter.getResult().iterator(); + } + }); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier()); + Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString()); + Preconditions.checkArgument(!FileSystem.get(conf).exists(path)); + String url = conf.get("fs.defaultFS") + path.toString(); + System.out.println("use " + url + " as hfile"); + List<MeasureDesc> measuresDescs = cubeDesc.getMeasures(); + final int measureSize = measuresDescs.size(); + final String[] dataTypes = new String[measureSize]; + for (int i = 0; i < dataTypes.length; i++) { + dataTypes[i] = measuresDescs.get(i).getFunction().getReturnType(); + } + final MeasureAggregators aggs = new MeasureAggregators(measuresDescs); + writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url); + return url; + } + + private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) { + javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { + @Override + public int numPartitions() { + return splitKeys.length + 1; + } + + @Override + public int getPartition(Object key) { + Preconditions.checkArgument(key instanceof byte[]); + for (int i = 0, n = splitKeys.length; i < n; ++i) { + if (UnsignedBytes.lexicographicalComparator().compare((byte[]) key, splitKeys[i]) < 0) { + return i; + } + } + return splitKeys.length; + } + }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() { + @Override + public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { + Iterable<Tuple2<byte[], byte[]>> iterable = new Iterable<Tuple2<byte[], byte[]>>() { + final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes); + final Object[] input = new Object[measureSize]; + final Object[] result = new Object[measureSize]; + + @Override + public Iterator<Tuple2<byte[], byte[]>> iterator() { + return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() { + @Override + public byte[] call(Iterable<byte[]> v1) throws Exception { + final LinkedList<byte[]> list = Lists.newLinkedList(v1); + if (list.size() == 1) { + return list.get(0); + } + aggs.reset(); + for (byte[] v : list) { + codec.decode(ByteBuffer.wrap(v), input); + aggs.aggregate(input); + } + aggs.collectStates(result); + ByteBuffer buffer = codec.encode(result); + byte[] bytes = new byte[buffer.position()]; + System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position()); + return bytes; + } + }); + } + }; + return iterable.iterator(); + } + }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() { + @Override + public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception { + ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1()); + KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2()); + return new Tuple2(key, value); + } + }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf); + } + + public static void prepare() throws Exception { + final File file = new File(SparkFiles.get("kylin.properties")); + final String confPath = file.getParentFile().getAbsolutePath(); + System.out.println("conf directory:" + confPath); + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + ClassUtil.addClasspath(confPath); + } + + private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception { + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100); + final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap); + System.out.println("cube size estimation:" + cubeSizeMap); + final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder' + CubeHTableUtil.createHTable(cubeSegment, splitKeys); + System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created"); + return splitKeys; + } + + private Configuration getConfigurationForHFile(String hTableName) throws IOException { + final Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + Job job = Job.getInstance(conf); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + Connection connection = HBaseConnection.get(); + Table table = connection.getTable(TableName.valueOf(hTableName)); + HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf(hTableName))); + return conf; + } + + private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception { + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); + + FsShell shell = new FsShell(hbaseConf); + try { + shell.run(new String[] { "-chmod", "-R", "777", hfileLocation }); + } catch (Exception e) { + logger.error("Couldnt change the file permissions ", e); + throw new IOException(e); + } + + String[] newArgs = new String[2]; + newArgs[0] = hfileLocation; + newArgs[1] = cubeSegment.getStorageLocationIdentifier(); + + int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), newArgs); + System.out.println("incremental load result:" + ret); + + cubeSegment.setStatus(SegmentStatusEnum.READY); + try { + CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); + cubeInstance.setStatus(RealizationStatusEnum.READY); + cubeSegment.setStatus(SegmentStatusEnum.READY); + cubeBuilder.setToUpdateSegs(cubeSegment); + CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder); + } catch (IOException e) { + throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); + } + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + SparkConf conf = new SparkConf().setAppName("Simple Application"); + //memory conf + conf.set("spark.executor.memory", "6g"); + conf.set("spark.storage.memoryFraction", "0.3"); + + //serialization conf + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); + conf.set("spark.kryo.registrationRequired", "true"); + + JavaSparkContext sc = new JavaSparkContext(conf); + HiveContext sqlContext = new HiveContext(sc.sc()); + final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable); + final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); + final String coprocessor = optionsHelper.getOptionValue(OPTION_COPROCESSOR); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + kylinConfig.overrideCoprocessorLocalJar(coprocessor); + + setupClasspath(sc, confPath); + intermediateTable.cache(); + writeDictionary(intermediateTable, cubeName, segmentId); + final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() { + @Override + public List<String> call(Row v1) throws Exception { + ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size()); + for (int i = 0; i < v1.size(); i++) { + final Object o = v1.get(i); + if (o != null) { + result.add(o.toString()); + } else { + result.add(null); + } + } + return result; + + } + }); + + final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); + final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); + + final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys); + bulkLoadHFile(cubeName, segmentId, hfile); + } + +} diff --git a/pom.xml b/pom.xml index 9365289..ebb2f12 100644 --- a/pom.xml +++ b/pom.xml @@ -39,21 +39,21 @@ <properties> <!-- General Properties --> - <javaVersion>1.7</javaVersion> + <javaVersion>1.8</javaVersion> <maven-model.version>3.3.9</maven-model.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- Hadoop versions --> - <hadoop2.version>2.7.1</hadoop2.version> - <yarn.version>2.7.1</yarn.version> + <hadoop2.version>3.0.0-alpha2</hadoop2.version> + <yarn.version>3.0.0-alpha2</yarn.version> <!-- Hive versions --> - <hive.version>1.2.1</hive.version> - <hive-hcatalog.version>1.2.1</hive-hcatalog.version> + <hive.version>2.1.0</hive.version> + <hive-hcatalog.version>2.1.0</hive-hcatalog.version> <!-- HBase versions --> - <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version> + <hbase-hadoop2.version>2.0.0-SNAPSHOT</hbase-hadoop2.version> <!-- Kafka versions --> <kafka.version>1.0.0</kafka.version> @@ -68,6 +68,7 @@ <!-- Scala versions --> <scala.version>2.11.0</scala.version> + <commons-configuration.version>1.6</commons-configuration.version> <!-- <reflections.version>0.9.10</reflections.version> --> <!-- Calcite Version --> @@ -578,6 +579,12 @@ <version>${yarn.version}</version> </dependency> + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>${commons-configuration.version}</version> + </dependency> + <!-- Calcite dependencies --> <dependency> <groupId>org.apache.calcite</groupId> diff --git a/server-base/pom.xml b/server-base/pom.xml index 5f803fc..4d18501 100644 --- a/server-base/pom.xml +++ b/server-base/pom.xml @@ -194,6 +194,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <scope>provided</scope> + </dependency> </dependencies> <repositories> diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java index 9eb9bb7..fd53b5b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java +++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java @@ -43,6 +43,8 @@ import java.util.TreeMap; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -51,7 +53,6 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.CompareFilter; @@ -98,7 +100,7 @@ public class MockHTable implements Table { private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<>(Bytes.BYTES_COMPARATOR); - private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { + private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); } @@ -163,8 +165,8 @@ public class MockHTable implements Table { throw new RuntimeException(this.getClass() + " does NOT implement this method."); } - private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { - List<KeyValue> ret = new ArrayList<KeyValue>(); + private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { + List<Cell> ret = new ArrayList<>(); for (byte[] family : rowdata.keySet()) for (byte[] qualifier : rowdata.get(family).keySet()) { int versionsAdded = 0; @@ -208,7 +210,6 @@ public class MockHTable implements Table { /** * {@inheritDoc} */ - @Override public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { Object[] results = new Object[actions.size()]; // same size. for (int i = 0; i < actions.size(); i++) { @@ -242,11 +243,6 @@ public class MockHTable implements Table { } - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException { - return new Object[0]; - } - /** * {@inheritDoc} */ @@ -255,7 +251,7 @@ public class MockHTable implements Table { if (!data.containsKey(get.getRow())) return new Result(); byte[] row = get.getRow(); - List<KeyValue> kvs = new ArrayList<KeyValue>(); + List<Cell> kvs = new ArrayList<>(); if (!get.hasFamilies()) { kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); } else { @@ -280,7 +276,7 @@ public class MockHTable implements Table { kvs = filter(filter, kvs); } - return new Result(kvs); + return Result.create(kvs); } /** @@ -318,11 +314,11 @@ public class MockHTable implements Table { break; } - List<KeyValue> kvs = null; + List<Cell> kvs = null; if (!scan.hasFamilies()) { kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions()); } else { - kvs = new ArrayList<KeyValue>(); + kvs = new ArrayList<>(); for (byte[] family : scan.getFamilyMap().keySet()) { if (data.get(row).get(family) == null) continue; @@ -354,7 +350,7 @@ public class MockHTable implements Table { } } if (!kvs.isEmpty()) { - ret.add(new Result(kvs)); + ret.add(Result.create(kvs)); } } @@ -389,12 +385,14 @@ public class MockHTable implements Table { public void close() { } + @Override public boolean renewLease() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + return false; } + @Override public ScanMetrics getScanMetrics() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + return null; } }; } @@ -406,10 +404,10 @@ public class MockHTable implements Table { * @param kvs List of a row's KeyValues * @return List of KeyValues that were not filtered. */ - private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws IOException { + private List<Cell> filter(Filter filter, List<Cell> kvs) throws IOException { filter.reset(); - List<KeyValue> tmp = new ArrayList<KeyValue>(kvs.size()); + List<Cell> tmp = new ArrayList<>(kvs.size()); tmp.addAll(kvs); /* @@ -418,9 +416,9 @@ public class MockHTable implements Table { * See Figure 4-2 on p. 163. */ boolean filteredOnRowKey = false; - List<KeyValue> nkvs = new ArrayList<KeyValue>(tmp.size()); - for (KeyValue kv : tmp) { - if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) { + List<Cell> nkvs = new ArrayList<>(tmp.size()); + for (Cell kv : tmp) { + if (filter.filterRowKey(kv)) { filteredOnRowKey = true; break; } @@ -483,16 +481,16 @@ public class MockHTable implements Table { public void put(Put put) throws IOException { byte[] row = put.getRow(); NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR)); - for (byte[] family : put.getFamilyMap().keySet()) { + for (byte[] family : put.getFamilyCellMap().keySet()) { if (columnFamilies.contains(new String(family)) == false) { throw new RuntimeException("Not Exists columnFamily : " + new String(family)); } NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR)); - for (KeyValue kv : put.getFamilyMap().get(family)) { - kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis())); - byte[] qualifier = kv.getQualifier(); + for (Cell kv : put.getFamilyCellMap().get(family)) { + CellUtil.updateLatestStamp(kv, System.currentTimeMillis()); + byte[] qualifier = kv.getQualifierArray(); NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>()); - qualifierData.put(kv.getTimestamp(), kv.getValue()); + qualifierData.put(kv.getTimestamp(), kv.getValueArray()); } } } @@ -540,22 +538,22 @@ public class MockHTable implements Table { byte[] row = delete.getRow(); if (data.get(row) == null) return; - if (delete.getFamilyMap().size() == 0) { + if (delete.getFamilyCellMap().size() == 0) { data.remove(row); return; } - for (byte[] family : delete.getFamilyMap().keySet()) { + for (byte[] family : delete.getFamilyCellMap().keySet()) { if (data.get(row).get(family) == null) continue; - if (delete.getFamilyMap().get(family).isEmpty()) { + if (delete.getFamilyCellMap().get(family).isEmpty()) { data.get(row).remove(family); continue; } - for (KeyValue kv : delete.getFamilyMap().get(family)) { - if (kv.isDelete()) { - data.get(row).get(kv.getFamily()).clear(); + for (Cell kv : delete.getFamilyCellMap().get(family)) { + if (CellUtil.isDelete(kv)) { + data.get(row).get(kv.getFamilyArray()).clear(); } else { - data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); + data.get(row).get(kv.getFamilyArray()).remove(kv.getQualifierArray()); } } if (data.get(row).get(family).isEmpty()) { @@ -675,40 +673,48 @@ public class MockHTable implements Table { } - public void setOperationTimeout(int operationTimeout) { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + /*** + * + * All values are default + * + * **/ + @Override + public void setOperationTimeout(int i) { + } + @Override public int getOperationTimeout() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + return 0; } - /** @deprecated */ - @Deprecated + @Override public int getRpcTimeout() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + return 0; } - /** @deprecated */ - @Deprecated - public void setRpcTimeout(int rpcTimeout) { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); - } + @Override + public void setRpcTimeout(int i) { - public int getWriteRpcTimeout() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); } - public void setWriteRpcTimeout(int writeRpcTimeout) { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + @Override + public int getReadRpcTimeout() { + return 0; } - public int getReadRpcTimeout() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + @Override + public void setReadRpcTimeout(int i) { + } - public void setReadRpcTimeout(int readRpcTimeout) { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); + @Override + public int getWriteRpcTimeout() { + return 0; } + @Override + public void setWriteRpcTimeout(int i) { + + } } \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 53e8a68..0f71797 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -239,6 +239,11 @@ public class HBaseConnection { // ============================================================================ + public static Connection get() { + String url = KylinConfig.getInstanceFromEnv().getStorageUrl(); + return get(url); + } + // returned Connection can be shared by multiple threads and does not require close() @SuppressWarnings("resource") public static Connection get(StorageURL url) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index a8f4fd8..48dce1f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -18,11 +18,8 @@ package org.apache.kylin.storage.hbase.cube.v2; -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -47,8 +44,10 @@ import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; /** * for test use only @@ -181,7 +180,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { public List<Cell> next() { List<Cell> result = allResultsIterator.next().listCells(); for (Cell cell : result) { - scannedBytes += CellUtil.estimatedSizeOf(cell); + scannedBytes += CellUtil.estimatedSerializedSizeOf(cell); } scannedRows++; return result; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index fd54e2b..89fe56d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig; import org.apache.kylin.common.exceptions.KylinTimeoutException; @@ -178,7 +178,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement List<Cell> result = delegate.next(); rowCount++; for (Cell cell : result) { - rowBytes += CellUtil.estimatedSizeOf(cell); + rowBytes += CellUtil.estimatedSerializedSizeOf(cell); } return result; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 5ffdd48..01158a7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -27,6 +27,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -58,6 +63,7 @@ public class CubeHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); + Connection connection = null; try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); @@ -93,6 +99,10 @@ public class CubeHFileJob extends AbstractHadoopJob { HTable htable = new HTable(configuration, getOptionValue(OPTION_HTABLE_NAME)); + String hTableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); + connection = ConnectionFactory.createConnection(hbaseConf); + Table table = connection.getTable(TableName.valueOf(hTableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName)); // Automatic config ! HFileOutputFormat3.configureIncrementalLoad(job, htable); reconfigurePartitions(configuration, partitionFilePath); @@ -113,6 +123,8 @@ public class CubeHFileJob extends AbstractHadoopJob { } finally { if (job != null) cleanupTempConfFile(job.getConfiguration()); + if (null != connection) + connection.close(); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java new file mode 100644 index 0000000..afc2b4c --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.steps; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.HBaseColumnDesc; +import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; +import org.apache.kylin.gridtable.GTRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + */ +public class HBaseCuboidWriter implements ICuboidWriter { + + private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class); + + private static final int BATCH_PUT_THRESHOLD = 10000; + + private final List<KeyValueCreator> keyValueCreators; + private final int nColumns; + private final Table hTable; + private final CubeDesc cubeDesc; + private final CubeSegment cubeSegment; + private final Object[] measureValues; + + private List<Put> puts = Lists.newArrayList(); + private AbstractRowKeyEncoder rowKeyEncoder; + private byte[] keybuf; + + public HBaseCuboidWriter(CubeSegment segment, Table hTable) { + this.keyValueCreators = Lists.newArrayList(); + this.cubeSegment = segment; + this.cubeDesc = cubeSegment.getCubeDesc(); + for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) { + for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { + keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc)); + } + } + this.nColumns = keyValueCreators.size(); + this.hTable = hTable; + this.measureValues = new Object[cubeDesc.getMeasures().size()]; + } + + private byte[] copy(byte[] array, int offset, int length) { + byte[] result = new byte[length]; + System.arraycopy(array, offset, result, 0, length); + return result; + } + + //TODO:shardingonstreaming + private byte[] createKey(Long cuboidId, GTRecord record) { + if (rowKeyEncoder == null || rowKeyEncoder.getCuboidID() != cuboidId) { + rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, + Cuboid.findForMandatory(cubeDesc, cuboidId)); + keybuf = rowKeyEncoder.createBuf(); + } + rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keybuf); + return keybuf; + + } + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + byte[] key = createKey(cuboidId, record); + final Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidId); + final int nDims = cuboid.getColumns().size(); + final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size()); + + for (int i = 0; i < nColumns; i++) { + final Object[] values = record.getValues(bitSet, measureValues); + final KeyValue keyValue = keyValueCreators.get(i).create(key, 0, key.length, values); + final Put put = new Put(copy(key, 0, key.length)); + byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength()); + byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength()); + byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength()); + put.addColumn(family, qualifier, value); + puts.add(put); + } + if (puts.size() >= BATCH_PUT_THRESHOLD) { + flush(); + } + } + + @Override + public final void flush() throws IOException { + if (!puts.isEmpty()) { + long t = System.currentTimeMillis(); + if (hTable != null) { + hTable.put(puts); + } + logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); + puts.clear(); + } + } + + @Override + public void close() throws IOException { + flush(); + IOUtils.closeQuietly(hTable); + } + +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 292d9d6..b9d5599 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -459,7 +459,7 @@ public class CubeMigrationCLI { value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(cubeId)); - put.add(family, column, value); + put.addColumn(family, column, value); destAclHtable.put(put); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index 23ec77f..46363b2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.Bytes; @@ -501,7 +500,7 @@ public class DeployCoprocessorCLI { Matcher keyMatcher; Matcher valueMatcher; - for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) { + for (Map.Entry<org.apache.hadoop.hbase.util.Bytes, org.apache.hadoop.hbase.util.Bytes> e : tableDescriptor.getValues().entrySet()) { keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get())); if (!keyMatcher.matches()) { continue; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index 092023e..0f9466c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -249,7 +249,7 @@ public class ExtendCubeToHybridCLI { value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(newCubeId)); - put.add(family, column, value); + put.addColumn(family, column, value); aclHtable.put(put); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index a317110..3f034cf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -233,7 +233,7 @@ public class GridTableHBaseBenchmark { byte[] rowkey = Bytes.toBytes(i); Put put = new Put(rowkey); byte[] cell = randomBytes(); - put.add(CF, QN, cell); + put.addColumn(CF, QN, cell); table.put(put); nBytes += cell.length; dot(i, N_ROWS); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java index bba6745..ff038d1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java @@ -50,7 +50,8 @@ public class PingHBaseCLI { if (User.isHBaseSecurityEnabled(hconf)) { try { System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser()); + Connection connection = HBaseConnection.get(); + TokenUtil.obtainAndCacheToken(connection, User.create(UserGroupInformation.getCurrentUser())); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java index eba4a37..ff2ef91 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java @@ -68,13 +68,23 @@ public class CubeHFileMapperTest { Pair<RowKeyWritable, KeyValue> p2 = result.get(1); assertEquals(key, p1.getFirst()); - assertEquals("cf1", new String(p1.getSecond().getFamily())); - assertEquals("usd_amt", new String(p1.getSecond().getQualifier())); - assertEquals("35.43", new String(p1.getSecond().getValue())); + assertEquals("cf1", new String(copy(p1.getSecond()))); + assertEquals("usd_amt", new String(copy(p1.getSecond()))); + assertEquals("35.43", new String(copy(p1.getSecond()))); assertEquals(key, p2.getFirst()); - assertEquals("cf1", new String(p2.getSecond().getFamily())); - assertEquals("item_count", new String(p2.getSecond().getQualifier())); - assertEquals("2", new String(p2.getSecond().getValue())); + assertEquals("cf1", new String(copy(p2.getSecond()))); + assertEquals("item_count", new String(copy(p2.getSecond()))); + assertEquals("2", new String(copy(p2.getSecond()))); + } + + private byte[] copy(KeyValue kv) { + return copy(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()); + } + + private byte[] copy(byte[] array, int offset, int length) { + byte[] result = new byte[length]; + System.arraycopy(array, offset, result, 0, length); + return result; } } diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java index 2b8ecae..b77d2cb 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java @@ -22,8 +22,11 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.Bytes; /** @@ -89,13 +92,16 @@ public class TestHbaseClient { conf.set("hbase.zookeeper.quorum", "hbase_host"); conf.set("zookeeper.znode.parent", "/hbase-unsecure"); - HTable table = new HTable(conf, "test1"); + Connection connection = ConnectionFactory.createConnection(conf); + + Table table = connection.getTable(TableName.valueOf("test1")); Put put = new Put(Bytes.toBytes("row1")); - put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1")); - put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2")); + put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1")); + put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2")); table.put(put); table.close(); + connection.close(); } } diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index 5426b62..3b95a50 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HTableDescriptor; @@ -174,6 +175,7 @@ public class CubeMigrationCLI extends AbstractApplication { Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); hbaseAdmin = new HBaseAdmin(conf); + hdfsFS = HadoopUtil.getWorkingFileSystem(); operations = new ArrayList<Opt>(); copyFilesInMetaStore(cube); @@ -418,10 +420,10 @@ public class CubeMigrationCLI extends AbstractApplication { String tableName = (String) opt.params[0]; System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); logger.info("CHANGE_HTABLE_HOST is completed"); break; } @@ -580,10 +582,10 @@ public class CubeMigrationCLI extends AbstractApplication { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); break; } case COPY_FILE_IN_META: { diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java index 54fbbc0..52bad9d 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java @@ -29,7 +29,9 @@ import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; @@ -61,7 +63,8 @@ public class CubeMigrationCheckCLI { private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube"); private KylinConfig dstCfg; - private HBaseAdmin hbaseAdmin; + private Admin hbaseAdmin; + private Connection connection; private List<String> issueExistHTables; private List<String> inconsistentHTables; @@ -123,6 +126,7 @@ public class CubeMigrationCheckCLI { } fixInconsistent(); printIssueExistingHTables(); + connection.close(); } public CubeMigrationCheckCLI(KylinConfig kylinConfig, Boolean isFix) throws IOException { @@ -130,7 +134,8 @@ public class CubeMigrationCheckCLI { this.ifFix = isFix; Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - hbaseAdmin = new HBaseAdmin(conf); + connection = ConnectionFactory.createConnection(conf); + hbaseAdmin = connection.getAdmin(); issueExistHTables = Lists.newArrayList(); inconsistentHTables = Lists.newArrayList(); @@ -189,10 +194,10 @@ public class CubeMigrationCheckCLI { String[] sepNameList = segFullName.split(","); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(sepNameList[0]); + hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0])); desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(sepNameList[0], desc); - hbaseAdmin.enableTable(sepNameList[0]); + hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc); + hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0])); } } else { logger.info("------ Inconsistent HTables Needed To Be Fixed ------"); diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java index 9c6cba6..b5a8440 100644 --- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java @@ -245,7 +245,7 @@ public class ExtendCubeToHybridCLI { value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(newCubeId)); - put.add(family, column, value); + put.addColumn(family, column, value); aclHtable.put(put); } } diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java index 16aa5ff..f6099eb 100644 --- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -22,6 +22,7 @@ package org.apache.kylin.tool; * Created by xiefan on 17-4-20. */ public class StorageCleanupJob { + public static void main(String[] args) throws Exception { org.apache.kylin.rest.job.StorageCleanupJob cli = new org.apache.kylin.rest.job.StorageCleanupJob(); cli.execute(args);