This is an automated email from the ASF dual-hosted git repository. alexott pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 3a31d6a [ZEPPELIN-4818] Additional refactoring of Cassandra interpreter for new driver versions 3a31d6a is described below commit 3a31d6a9dc33341bb79b7c066864a3deeedb6522 Author: Alex Ott <alex...@gmail.com> AuthorDate: Mon Jun 22 19:25:23 2020 +0200 [ZEPPELIN-4818] Additional refactoring of Cassandra interpreter for new driver versions ### What is this PR for? New DataStax Java Driver allows to override configuration options by different ways - via config file, system properties, etc. This PR implements following: * Allow to override any option of the Java driver by putting additional options into interpreter's configuration. For example, this allows to use DataStax Astra (Cassandra as a Service) without need to make any changes in the code of the interpreter. It also allows to configure any additional parameter not exposed directly by interpreter * Upgrades driver to latest released version * Configures embedded Cassandra for faster startup when executing unit tests ### What type of PR is it? Improvement ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4818 ### How should this be tested? * https://travis-ci.org/github/alexott/zeppelin/builds/700969578 * tested manually Author: Alex Ott <alex...@gmail.com> Closes #3815 from alexott/ZEPPELIN-4818 and squashes the following commits: f34738f75 [Alex Ott] [ZEPPELIN-4818] Upgrade to driver 4.7.2 5ef5c8823 [Alex Ott] [ZEPPELIN-4818] Allow to provide any Java driver setting a458df7ba [Alex Ott] [ZEPPELIN-4818] Explicitly set timeout for DDL statements 42dc4e65c [Alex Ott] [ZEPPELIN-4818] Upgrade driver to 4.7.1 db3950f1b [Alex Ott] [ZEPPELIN-4818] Speedup unit test --- cassandra/pom.xml | 2 +- .../zeppelin/cassandra/CassandraInterpreter.java | 229 ++++++++++++++++----- .../src/main/resources/interpreter-setting.json | 14 +- .../zeppelin/cassandra/EnhancedSession.scala | 13 +- .../zeppelin/cassandra/JavaDriverConfig.scala | 207 ------------------- .../cassandra/CassandraInterpreterTest.java | 17 ++ docs/interpreter/cassandra.md | 19 +- 7 files changed, 226 insertions(+), 275 deletions(-) diff --git a/cassandra/pom.xml b/cassandra/pom.xml index 9a93fd3..3d8a71a 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -33,7 +33,7 @@ <description>Zeppelin cassandra support</description> <properties> - <cassandra.driver.version>4.6.1</cassandra.driver.version> + <cassandra.driver.version>4.7.2</cassandra.driver.version> <snappy.version>1.1.7.3</snappy.version> <lz4.version>1.6.0</lz4.version> <scalate.version>1.7.1</scalate.version> diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java index 0f8197b..7d397e5 100644 --- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java +++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java @@ -20,8 +20,8 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.DriverOption; import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; -import com.datastax.oss.driver.internal.core.loadbalancing.DcInferringLoadBalancingPolicy; import com.datastax.oss.driver.shaded.guava.common.net.InetAddresses; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; @@ -42,7 +42,9 @@ import java.nio.file.Paths; import java.security.KeyStore; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import static java.lang.Integer.parseInt; @@ -129,14 +131,16 @@ public class CassandraInterpreter extends Interpreter { public static final String CASSANDRA_FORMAT_LOCALE = "cassandra.format.locale"; + public static final String NONE_VALUE = "none"; + public static final String DEFAULT_VALUE = "DEFAULT"; public static final String DEFAULT_HOST = "127.0.0.1"; public static final String DEFAULT_PORT = "9042"; public static final String DEFAULT_KEYSPACE = "system"; public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT"; - public static final String DEFAULT_COMPRESSION = "none"; + public static final String DEFAULT_COMPRESSION = NONE_VALUE; public static final String DEFAULT_CONNECTIONS_PER_HOST = "1"; public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024"; - public static final String DEFAULT_POLICY = "DEFAULT"; + public static final String DEFAULT_POLICY = DEFAULT_VALUE; public static final String DEFAULT_PARALLELISM = "10"; public static final String DEFAULT_POOL_TIMEOUT = "5000"; public static final String DEFAULT_HEARTBEAT_INTERVAL = "30"; @@ -149,10 +153,19 @@ public class CassandraInterpreter extends Interpreter { public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12"; static final List NO_COMPLETION = new ArrayList<>(); + public static final String DATASTAX_JAVA_DRIVER_PREFIX = "datastax-java-driver."; + public static final String MILLISECONDS_STR = " milliseconds"; + public static final String SECONDS_STR = " seconds"; InterpreterLogic helper; CqlSession session; - private JavaDriverConfig driverConfig = new JavaDriverConfig(); + private static final Map<String, DriverOption> optionMap = new HashMap<>(); + + static { + for (DefaultDriverOption opt: DefaultDriverOption.values()) { + optionMap.put(opt.getPath(), opt); + } + } public CassandraInterpreter(Properties properties) { super(properties); @@ -160,64 +173,52 @@ public class CassandraInterpreter extends Interpreter { @Override public void open() { - - final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST).split(","); + final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST) + .trim().split(","); final int port = parseInt(getProperty(CASSANDRA_PORT, DEFAULT_PORT)); Collection<InetSocketAddress> hosts = new ArrayList<>(); for (String address : addresses) { - if (InetAddresses.isInetAddress(address)) { - hosts.add(new InetSocketAddress(address, port)); - } else { - // TODO(alex): maybe it won't be necessary in 4.4 - hosts.add(InetSocketAddress.createUnresolved(address, port)); + if (!StringUtils.isBlank(address)) { + logger.debug("Adding contact point: {}", address); + if (InetAddresses.isInetAddress(address)) { + hosts.add(new InetSocketAddress(address, port)); + } else { + hosts.add(InetSocketAddress.createUnresolved(address, port)); + } } } - LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " + - getProperty(CASSANDRA_HOSTS) + "on port " + port); - - // start generation of the config - ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder(); - - driverConfig.setCompressionProtocol(this, configBuilder); - driverConfig.setPoolingOptions(this, configBuilder); - driverConfig.setProtocolVersion(this, configBuilder); - driverConfig.setQueryOptions(this, configBuilder); - driverConfig.setReconnectionPolicy(this, configBuilder); - driverConfig.setRetryPolicy(this, configBuilder); - driverConfig.setSocketOptions(this, configBuilder); - driverConfig.setSpeculativeExecutionPolicy(this, configBuilder); - - // - configBuilder.withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, - DcInferringLoadBalancingPolicy.class); - configBuilder.withBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, false); + LOGGER.info("Bootstrapping Cassandra Java Driver to connect to {} on port {}", + getProperty(CASSANDRA_HOSTS), port); - configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, - parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS, - DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS))); - - DriverConfigLoader loader = configBuilder.endProfile().build(); - // TODO(alex): think how to dump built configuration... - logger.debug(loader.toString()); - // end generation of config + DriverConfigLoader loader = createLoader(); + LOGGER.debug("Creating cluster builder"); CqlSessionBuilder clusterBuilder = CqlSession.builder() - .addContactPoints(hosts) - .withAuthCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME), - getProperty(CASSANDRA_CREDENTIALS_PASSWORD)) - .withApplicationName("") + .withApplicationName("Zeppelin") .withApplicationVersion(""); + if (!hosts.isEmpty()) { + LOGGER.debug("Adding contact points"); + clusterBuilder.addContactPoints(hosts); + } + + String username = getProperty(CASSANDRA_CREDENTIALS_USERNAME, NONE_VALUE).trim(); + String password = getProperty(CASSANDRA_CREDENTIALS_PASSWORD, NONE_VALUE).trim(); + if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password) && + !NONE_VALUE.equalsIgnoreCase(username) && !NONE_VALUE.equalsIgnoreCase(password)) { + LOGGER.debug("Adding credentials. Username = {}", username); + clusterBuilder.withAuthCredentials(username, password); + } String keyspace = getProperty(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE); if (StringUtils.isNotBlank(keyspace) && !DEFAULT_KEYSPACE.equalsIgnoreCase(keyspace)) { + LOGGER.debug("Set default keyspace"); clusterBuilder.withKeyspace(keyspace); } - final String runWithSSL = getProperty(CASSANDRA_WITH_SSL); - if (runWithSSL != null && runWithSSL.equals("true")) { - LOGGER.debug("Cassandra Interpreter: Using SSL"); - + final String runWithSSL = getProperty(CASSANDRA_WITH_SSL, "false"); + if ("true".equalsIgnoreCase(runWithSSL)) { + LOGGER.debug("Using SSL"); try { final SSLContext sslContext; { @@ -235,19 +236,149 @@ public class CassandraInterpreter extends Interpreter { } clusterBuilder = clusterBuilder.withSslContext(sslContext); } catch (Exception e) { - LOGGER.error(e.toString()); + LOGGER.error("Exception initializing SSL {}", e.toString()); } } else { - LOGGER.debug("Cassandra Interpreter: Not using SSL"); + LOGGER.debug("Not using SSL"); } + LOGGER.debug("Creating CqlSession"); session = clusterBuilder.withConfigLoader(loader).build(); + LOGGER.debug("Session configuration"); + for (Map.Entry<String, Object> entry: + session.getContext().getConfig().getDefaultProfile().entrySet()) { + logger.debug("{} = {}", entry.getKey(), entry.getValue().toString()); + } + LOGGER.debug("Creating helper"); helper = new InterpreterLogic(session, properties); } + private DriverConfigLoader createLoader() { + logger.debug("Creating programmatic config loader"); + // start generation of the config + ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder(); + + Map<DriverOption, String> allOptions = new HashMap<>(); + + // set options from main configuration + String ts = getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS, + CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT) + MILLISECONDS_STR; + allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts); + allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, ts); + allOptions.put(DefaultDriverOption.REQUEST_TIMEOUT, + getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, + CassandraInterpreter.DEFAULT_READ_TIMEOUT) + MILLISECONDS_STR); + addIfNotBlank(allOptions, + getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY), + DefaultDriverOption.SOCKET_TCP_NODELAY); + addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_KEEP_ALIVE), + DefaultDriverOption.SOCKET_KEEP_ALIVE); + addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES), + DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE); + addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES), + DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE); + addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS), + DefaultDriverOption.SOCKET_REUSE_ADDRESS); + addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SO_LINGER), + DefaultDriverOption.SOCKET_LINGER_INTERVAL); + addIfNotBlank(allOptions, + getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE), + DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE); + allOptions.put(DefaultDriverOption.REQUEST_CONSISTENCY, + getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY, + CassandraInterpreter.DEFAULT_CONSISTENCY)); + allOptions.put(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, + getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY, + CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY)); + allOptions.put(DefaultDriverOption.REQUEST_PAGE_SIZE, + getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE, + CassandraInterpreter.DEFAULT_FETCH_SIZE)); + ts = getProperty(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION); + if (!DEFAULT_VALUE.equalsIgnoreCase(ts)) { + // for compatibility with previous configurations + if (ts.equals("4") || ts.equals("3")) { + ts = "V" + ts; + } + allOptions.put(DefaultDriverOption.PROTOCOL_VERSION, ts); + } + addIfNotBlank(allOptions, getProperty(CASSANDRA_COMPRESSION_PROTOCOL, + CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase(), + DefaultDriverOption.PROTOCOL_COMPRESSION); + addIfNotBlankOrDefault(allOptions, getProperty(CASSANDRA_RETRY_POLICY, DEFAULT_POLICY), + DefaultDriverOption.RETRY_POLICY_CLASS); + addIfNotBlankOrDefault(allOptions, + getProperty(CASSANDRA_RECONNECTION_POLICY, DEFAULT_POLICY), + DefaultDriverOption.RECONNECTION_POLICY_CLASS); + addIfNotBlankOrDefault(allOptions, + getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, DEFAULT_POLICY), + DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS); + allOptions.put(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, + getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL, + DEFAULT_CONNECTIONS_PER_HOST)); + allOptions.put(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, + getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE, + DEFAULT_CONNECTIONS_PER_HOST)); + allOptions.put(DefaultDriverOption.CONNECTION_MAX_REQUESTS, + getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION, + DEFAULT_MAX_REQUEST_PER_CONNECTION)); + allOptions.put(DefaultDriverOption.HEARTBEAT_INTERVAL, + getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, + DEFAULT_HEARTBEAT_INTERVAL) + SECONDS_STR); + ts = getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, + DEFAULT_POOL_TIMEOUT) + MILLISECONDS_STR; + allOptions.put(DefaultDriverOption.HEARTBEAT_TIMEOUT, ts); + allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts); + allOptions.put(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, + "DcInferringLoadBalancingPolicy"); + allOptions.put(DefaultDriverOption.RESOLVE_CONTACT_POINTS, "false"); + allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, + getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS, + DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS) + SECONDS_STR); + + // extract additional options that may override values set by main configuration + for (String pname: properties.stringPropertyNames()) { + if (pname.startsWith(DATASTAX_JAVA_DRIVER_PREFIX)) { + String pvalue = properties.getProperty(pname); + logger.info("Custom config values: {} = {}", pname, pvalue); + String shortName = pname.substring(DATASTAX_JAVA_DRIVER_PREFIX.length()); + if (optionMap.containsKey(shortName)) { + allOptions.put(optionMap.get(shortName), pvalue); + } else { + logger.warn("Incorrect option name: {}", pname); + } + } + } + + for (Map.Entry<DriverOption, String> entry: allOptions.entrySet()) { + configBuilder.withString(entry.getKey(), entry.getValue()); + } + + DriverConfigLoader loader = configBuilder.endProfile().build(); + logger.debug("Config loader is created"); + + return loader; + } + + private static void addIfNotBlank(Map<DriverOption, String> allOptions, + String value, + DefaultDriverOption option) { + if (!StringUtils.isBlank(value)) { + allOptions.put(option, value); + } + } + + private static void addIfNotBlankOrDefault(Map<DriverOption, String> allOptions, + String value, + DefaultDriverOption option) { + if (!StringUtils.isBlank(value) && !DEFAULT_VALUE.equalsIgnoreCase(value)) { + allOptions.put(option, value); + } + } + @Override public void close() { - session.close(); + if (session != null) + session.close(); } @Override diff --git a/cassandra/src/main/resources/interpreter-setting.json b/cassandra/src/main/resources/interpreter-setting.json index 34de7a1..5c32ad3 100644 --- a/cassandra/src/main/resources/interpreter-setting.json +++ b/cassandra/src/main/resources/interpreter-setting.json @@ -21,8 +21,8 @@ "cassandra.protocol.version": { "envName": null, "propertyName": "cassandra.protocol.version", - "defaultValue": "4", - "description": "Cassandra protocol version. Default = 4", + "defaultValue": "DEFAULT", + "description": "Cassandra protocol version. Default = auto-detect", "type": "string" }, "cassandra.cluster": { @@ -105,22 +105,22 @@ "cassandra.pooling.connection.per.host.local": { "envName": null, "propertyName": "cassandra.pooling.connection.per.host.local", - "defaultValue": "8", - "description": "Cassandra connections per host local. Protocol V2 and below default = 8 Protocol V3 and above default = 1", + "defaultValue": "1", + "description": "Cassandra connections per host local. Protocol V3 and above default = 1", "type": "number" }, "cassandra.pooling.connection.per.host.remote": { "envName": null, "propertyName": "cassandra.pooling.connection.per.host.remote", - "defaultValue": "2", - "description": "Cassandra connections per host remote. Protocol V2 and below default = 2 Protocol V3 and above default = 1", + "defaultValue": "1", + "description": "Cassandra connections per host remote. Protocol V3 and above default = 1", "type": "number" }, "cassandra.pooling.max.request.per.connection": { "envName": null, "propertyName": "cassandra.pooling.max.request.per.connection", "defaultValue": "1024", - "description": "Cassandra max requests per connection. Protocol V2 and below default = 128 Protocol V3 and above default = 1024", + "description": "Cassandra max requests per connection. Protocol V3 and above default = 1024", "type": "number" }, "cassandra.pooling.pool.timeout.millisecs": { diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala index 31f5142..46ce83e 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala @@ -16,6 +16,7 @@ */ package org.apache.zeppelin.cassandra +import java.time.Duration import java.util.regex.Pattern import com.datastax.oss.driver.api.core.CqlSession @@ -46,8 +47,10 @@ class EnhancedSession(val session: CqlSession) { private val noResultDisplay = DisplaySystem.NoResultDisplay private val DEFAULT_CHECK_TIME: Int = 200 private val MAX_SCHEMA_AGREEMENT_WAIT: Int = 120000 // 120 seconds + private val defaultDDLTimeout: Duration = Duration.ofSeconds(MAX_SCHEMA_AGREEMENT_WAIT / 10000) private val LOGGER = LoggerFactory.getLogger(classOf[EnhancedSession]) + val HTML_MAGIC = "%html \n" val displayNoResult: String = HTML_MAGIC + noResultDisplay.formatNoResult @@ -191,8 +194,14 @@ class EnhancedSession(val session: CqlSession) { } private def executeStatement[StatementT <: Statement[StatementT]](st: StatementT): Any = { - val rs: ResultSet = session.execute(st) - if (EnhancedSession.isDDLStatement(st)) { + val isDDL = EnhancedSession.isDDLStatement(st) + val newSt = if (isDDL) { + st.setTimeout(defaultDDLTimeout) + } else { + st + } + val rs: ResultSet = session.execute(newSt) + if (isDDL) { if (!rs.getExecutionInfo.isSchemaInAgreement) { val startTime = System.currentTimeMillis() while(!session.checkSchemaAgreement()) { diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala deleted file mode 100644 index 1834e7e..0000000 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cassandra - -import java.lang.Boolean._ - -import com.datastax.oss.driver.api.core.ProtocolVersion -import com.datastax.oss.driver.api.core.config.{DefaultDriverOption, ProgrammaticDriverConfigLoaderBuilder} -import org.apache.commons.lang3.StringUtils._ -import org.apache.zeppelin.interpreter.Interpreter -import org.apache.zeppelin.cassandra.CassandraInterpreter._ -import org.slf4j.{Logger, LoggerFactory} - -/** - * Utility class to extract and configure the Java driver - */ -class JavaDriverConfig { - val LOGGER: Logger = LoggerFactory.getLogger(classOf[JavaDriverConfig]) - - def setSocketOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS, - CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT).toInt - configBuilder.withInt(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, connectTimeoutMillis) - configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, connectTimeoutMillis) - - val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, - CassandraInterpreter.DEFAULT_READ_TIMEOUT).toInt - configBuilder.withInt(DefaultDriverOption.REQUEST_TIMEOUT, readTimeoutMillis) - - val tcpNoDelay = intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY) - if (isNotBlank(tcpNoDelay)) { - configBuilder.withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, parseBoolean(tcpNoDelay)) - } - - val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE) - if (isNotBlank(keepAlive)) { - configBuilder.withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, parseBoolean(keepAlive)) - } - - val receivedBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES) - if (isNotBlank(receivedBuffSize)) { - configBuilder.withInt(DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE, receivedBuffSize.toInt) - } - - val sendBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES) - if (isNotBlank(sendBuffSize)) { - configBuilder.withInt(DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE, sendBuffSize.toInt) - } - - val reuseAddress: String = intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS) - if (isNotBlank(reuseAddress)) { - configBuilder.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, parseBoolean(reuseAddress)) - } - - val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER) - if (isNotBlank(soLinger)) { - configBuilder.withInt(DefaultDriverOption.SOCKET_LINGER_INTERVAL, soLinger.toInt) - } - } - - def setQueryOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val consistencyLevel = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY, - CassandraInterpreter.DEFAULT_CONSISTENCY) - configBuilder.withString(DefaultDriverOption.REQUEST_CONSISTENCY, consistencyLevel) - - val serialConsistencyLevel = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY, - CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY) - configBuilder.withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, serialConsistencyLevel) - - val fetchSize = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE, - CassandraInterpreter.DEFAULT_FETCH_SIZE).toInt - configBuilder.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, fetchSize) - - configBuilder.withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE, - parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE))) - } - - val PROTOCOL_MAPPING: Map[String, ProtocolVersion] = Map("3" -> ProtocolVersion.V3, "4" -> ProtocolVersion.V4, - "5" -> ProtocolVersion.V5, "DSE1" -> ProtocolVersion.DSE_V1, "DSE2" -> ProtocolVersion.DSE_V2) - - def setProtocolVersion(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION, - CassandraInterpreter.DEFAULT_PROTOCOL_VERSION) - LOGGER.debug("Protocol version : " + protocolVersion) - - protocolVersion match { - case "1" | "2" => - throw new RuntimeException(s"Protocol V${protocolVersion} isn't supported") - case _ => - configBuilder.withString(DefaultDriverOption.PROTOCOL_VERSION, - PROTOCOL_MAPPING.getOrElse(protocolVersion, ProtocolVersion.DEFAULT).name()) - } - } - - def setPoolingOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options : \n\n") - - val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL, - DEFAULT_CONNECTIONS_PER_HOST).toInt - configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, coreConnPerHostLocal) - poolingOptionsInfo - .append("\t") - .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL) - .append(" : ") - .append(coreConnPerHostLocal) - .append("\n") - - val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE, - DEFAULT_CONNECTIONS_PER_HOST).toInt - configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, coreConnPerHostRemote) - poolingOptionsInfo - .append("\t") - .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE) - .append(" : ") - .append(coreConnPerHostRemote) - .append("\n") - - val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION, - DEFAULT_MAX_REQUEST_PER_CONNECTION).toInt - configBuilder.withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, maxReqPerConnLocal) - poolingOptionsInfo - .append("\t") - .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION) - .append(" : ") - .append(maxReqPerConnLocal) - .append("\n") - - val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, - DEFAULT_HEARTBEAT_INTERVAL).toInt - configBuilder.withInt(DefaultDriverOption.HEARTBEAT_INTERVAL, heartbeatIntervalSeconds) - poolingOptionsInfo - .append("\t") - .append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS) - .append(" : ") - .append(heartbeatIntervalSeconds) - .append("\n") - - val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, - DEFAULT_POOL_TIMEOUT).toInt - configBuilder.withInt(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeoutSeconds) - poolingOptionsInfo - .append("\t") - .append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS) - .append(" : ") - .append(idleTimeoutSeconds) - .append("\n") - - LOGGER.debug(poolingOptionsInfo.append("\n").toString) - } - - def setCompressionProtocol(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val compressionProtocol = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL, - CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase - LOGGER.debug("Compression protocol : " + compressionProtocol) - - compressionProtocol match { - case "snappy" | "lz4" => - configBuilder.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, compressionProtocol) - case _ => () - } - } - - private def isNotDefaultParameter(param: String) = { - !(isBlank(param) || DEFAULT_POLICY == param) - } - - def setRetryPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY) - LOGGER.debug("Retry Policy : " + retryPolicy) - - if (isNotDefaultParameter(retryPolicy)) { - configBuilder.withString(DefaultDriverOption.RETRY_POLICY_CLASS, retryPolicy) - } - } - - def setReconnectionPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val reconnectionPolicy: String = intpr.getProperty(CASSANDRA_RECONNECTION_POLICY) - LOGGER.debug("Reconnection Policy : " + reconnectionPolicy) - - if (isNotDefaultParameter(reconnectionPolicy)) { - configBuilder.withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, reconnectionPolicy) - } - } - - def setSpeculativeExecutionPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = { - val specExecPolicy: String = intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY) - LOGGER.debug("Speculative Execution Policy : " + specExecPolicy) - - if (isNotDefaultParameter(specExecPolicy)) { - configBuilder.withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, specExecPolicy) - } - } -} diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java index ffdb7d9..5f09394 100644 --- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java +++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.cassandra; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; @@ -73,6 +75,11 @@ public class CassandraInterpreterTest { //extends AbstractCassandraUnit4CQLTestC @BeforeClass public static synchronized void setUp() throws IOException, InterruptedException { + System.setProperty("cassandra.skip_wait_for_gossip_to_settle", "0"); + System.setProperty("cassandra.load_ring_state", "false"); + System.setProperty("cassandra.initial_token", "0"); + System.setProperty("cassandra.num_tokens", "nil"); + System.setProperty("cassandra.allocate_tokens_for_local_replication_factor", "nil"); EmbeddedCassandraServerHelper.startEmbeddedCassandra(); CqlSession session = EmbeddedCassandraServerHelper.getSession(); new CQLDataLoader(session).load(new ClassPathCQLDataSet("prepare_all.cql", "zeppelin")); @@ -106,6 +113,7 @@ public class CassandraInterpreterTest { //extends AbstractCassandraUnit4CQLTestC properties.setProperty(CASSANDRA_HOSTS, EmbeddedCassandraServerHelper.getHost()); properties.setProperty(CASSANDRA_PORT, Integer.toString(EmbeddedCassandraServerHelper.getNativeTransportPort())); + properties.setProperty("datastax-java-driver.advanced.connection.pool.local.size", "1"); interpreter = new CassandraInterpreter(properties); interpreter.open(); } @@ -122,6 +130,15 @@ public class CassandraInterpreterTest { //extends AbstractCassandraUnit4CQLTestC } @Test + public void should_set_custom_option() throws Exception { + assertThat(interpreter.session).isNotNull(); + DriverExecutionProfile config = interpreter.session.getContext() + .getConfig().getDefaultProfile(); + assertThat(config.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 10)) + .isEqualTo(1); + } + + @Test public void should_interpret_simple_select() throws Exception { //Given diff --git a/docs/interpreter/cassandra.md b/docs/interpreter/cassandra.md index b4613e2..69487e4 100644 --- a/docs/interpreter/cassandra.md +++ b/docs/interpreter/cassandra.md @@ -668,10 +668,9 @@ The **isolated** mode is the most extreme and will create as many JVM/`com.datas ## Interpreter Configuration To configure the **Cassandra** interpreter, go to the **Interpreter** menu and scroll down to change the parameters. -The **Cassandra** interpreter is using the official **[Datastax Java Driver for Apache Cassandra]®** and most of the parameters are used -to configure the Java driver +The **Cassandra** interpreter is using the official **[Datastax Java Driver for Apache Cassandra]®** and most of the parameters are used to configure the Java driver -Below are the configuration parameters and their default values. +Below are the configuration parameters supported by interpreter and their default values. <table class="table-configuration"> <tr> @@ -741,12 +740,12 @@ Below are the configuration parameters and their default values. </tr> <tr> <td>`cassandra.pooling.connection.per.host.local`</td> - <td>Protocol V2 and below default = 2. Protocol V3 and above default = 1</td> - <td>2</td> + <td>Protocol V3 and above default = 1</td> + <td>1</td> </tr> <tr> <td>`cassandra.pooling.connection.per.host.remote`</td> - <td>Protocol V2 and below default = 1. Protocol V3 and above default = 1</td> + <td>Protocol V3 and above default = 1</td> <td>1</td> </tr> <tr> @@ -756,8 +755,8 @@ Below are the configuration parameters and their default values. </tr> <tr> <td>`cassandra.pooling.max.request.per.connection`</td> - <td>Protocol V2 and below default = 128. Protocol V3 and above default = 1024</td> - <td>128</td> + <td>Protocol V3 and above default = 1024</td> + <td>1024</td> </tr> <tr> <td>`cassandra.pooling.pool.timeout.millisecs`</td> @@ -766,7 +765,7 @@ Below are the configuration parameters and their default values. </tr> <tr> <td>`cassandra.protocol.version`</td> - <td>Cassandra binary protocol version (`3`, `4`, `DSE1`, `DSE2`)</td> + <td>Cassandra binary protocol version (`V3`, `V4`, ...)</td> <td>`DEFAULT` (detected automatically)</td> </tr> <tr> @@ -902,6 +901,8 @@ Below are the configuration parameters and their default values. </tr> </table> +Besides these parameters, it's also possible to set other driver parameters by adding them into interpreter configuration. The configuration key should have full form with `datastax-java-driver` prefix, as [described in documentation](https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/). For example, to specify 5 seconds request timeout, you can use `datastax-java-driver.basic.request.timeout` with value of `5 seconds`. Full list of available configura [...] + ## Change Log **4.0** _(Zeppelin {{ site.ZEPPELIN_VERSION }})_ :