This is an automated email from the ASF dual-hosted git repository.

alexott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a31d6a  [ZEPPELIN-4818] Additional refactoring of Cassandra 
interpreter for new driver versions
3a31d6a is described below

commit 3a31d6a9dc33341bb79b7c066864a3deeedb6522
Author: Alex Ott <alex...@gmail.com>
AuthorDate: Mon Jun 22 19:25:23 2020 +0200

    [ZEPPELIN-4818] Additional refactoring of Cassandra interpreter for new 
driver versions
    
    ### What is this PR for?
    
    New DataStax Java Driver allows to override configuration options by 
different ways - via config file, system properties, etc.
    
    This PR implements following:
    
    * Allow to override any option of the Java driver by putting additional 
options into interpreter's configuration.  For example, this allows to use 
DataStax Astra (Cassandra as a Service) without need to make any changes in the 
code of the interpreter.  It also allows to configure any additional parameter 
not exposed directly by interpreter
    * Upgrades driver to latest released version
    * Configures embedded Cassandra for faster startup when executing unit tests
    
    ### What type of PR is it?
    
    Improvement
    
    ### What is the Jira issue?
    
    * https://issues.apache.org/jira/browse/ZEPPELIN-4818
    
    ### How should this be tested?
    
    * https://travis-ci.org/github/alexott/zeppelin/builds/700969578
    * tested manually
    
    Author: Alex Ott <alex...@gmail.com>
    
    Closes #3815 from alexott/ZEPPELIN-4818 and squashes the following commits:
    
    f34738f75 [Alex Ott] [ZEPPELIN-4818] Upgrade to driver 4.7.2
    5ef5c8823 [Alex Ott] [ZEPPELIN-4818] Allow to provide any Java driver 
setting
    a458df7ba [Alex Ott] [ZEPPELIN-4818] Explicitly set timeout for DDL 
statements
    42dc4e65c [Alex Ott] [ZEPPELIN-4818] Upgrade driver to 4.7.1
    db3950f1b [Alex Ott] [ZEPPELIN-4818] Speedup unit test
---
 cassandra/pom.xml                                  |   2 +-
 .../zeppelin/cassandra/CassandraInterpreter.java   | 229 ++++++++++++++++-----
 .../src/main/resources/interpreter-setting.json    |  14 +-
 .../zeppelin/cassandra/EnhancedSession.scala       |  13 +-
 .../zeppelin/cassandra/JavaDriverConfig.scala      | 207 -------------------
 .../cassandra/CassandraInterpreterTest.java        |  17 ++
 docs/interpreter/cassandra.md                      |  19 +-
 7 files changed, 226 insertions(+), 275 deletions(-)

diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index 9a93fd3..3d8a71a 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -33,7 +33,7 @@
     <description>Zeppelin cassandra support</description>
 
     <properties>
-        <cassandra.driver.version>4.6.1</cassandra.driver.version>
+        <cassandra.driver.version>4.7.2</cassandra.driver.version>
         <snappy.version>1.1.7.3</snappy.version>
         <lz4.version>1.6.0</lz4.version>
         <scalate.version>1.7.1</scalate.version>
diff --git 
a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
 
b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index 0f8197b..7d397e5 100644
--- 
a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ 
b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -20,8 +20,8 @@ import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.CqlSessionBuilder;
 import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
 import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.config.DriverOption;
 import 
com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
-import 
com.datastax.oss.driver.internal.core.loadbalancing.DcInferringLoadBalancingPolicy;
 import com.datastax.oss.driver.shaded.guava.common.net.InetAddresses;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -42,7 +42,9 @@ import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static java.lang.Integer.parseInt;
@@ -129,14 +131,16 @@ public class CassandraInterpreter extends Interpreter {
   public static final String CASSANDRA_FORMAT_LOCALE =
           "cassandra.format.locale";
 
+  public static final String NONE_VALUE = "none";
+  public static final String DEFAULT_VALUE = "DEFAULT";
   public static final String DEFAULT_HOST = "127.0.0.1";
   public static final String DEFAULT_PORT = "9042";
   public static final String DEFAULT_KEYSPACE = "system";
   public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT";
-  public static final String DEFAULT_COMPRESSION = "none";
+  public static final String DEFAULT_COMPRESSION = NONE_VALUE;
   public static final String DEFAULT_CONNECTIONS_PER_HOST = "1";
   public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024";
-  public static final String DEFAULT_POLICY = "DEFAULT";
+  public static final String DEFAULT_POLICY = DEFAULT_VALUE;
   public static final String DEFAULT_PARALLELISM = "10";
   public static final String DEFAULT_POOL_TIMEOUT = "5000";
   public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
@@ -149,10 +153,19 @@ public class CassandraInterpreter extends Interpreter {
   public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12";
 
   static final List NO_COMPLETION = new ArrayList<>();
+  public static final String DATASTAX_JAVA_DRIVER_PREFIX = 
"datastax-java-driver.";
+  public static final String MILLISECONDS_STR = " milliseconds";
+  public static final String SECONDS_STR = " seconds";
 
   InterpreterLogic helper;
   CqlSession session;
-  private JavaDriverConfig driverConfig = new JavaDriverConfig();
+  private static final Map<String, DriverOption> optionMap = new HashMap<>();
+
+  static {
+    for (DefaultDriverOption opt: DefaultDriverOption.values()) {
+      optionMap.put(opt.getPath(), opt);
+    }
+  }
 
   public CassandraInterpreter(Properties properties) {
     super(properties);
@@ -160,64 +173,52 @@ public class CassandraInterpreter extends Interpreter {
 
   @Override
   public void open() {
-
-    final String[] addresses = getProperty(CASSANDRA_HOSTS, 
DEFAULT_HOST).split(",");
+    final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST)
+            .trim().split(",");
     final int port = parseInt(getProperty(CASSANDRA_PORT, DEFAULT_PORT));
     Collection<InetSocketAddress> hosts = new ArrayList<>();
     for (String address : addresses) {
-      if (InetAddresses.isInetAddress(address)) {
-        hosts.add(new InetSocketAddress(address, port));
-      } else {
-        // TODO(alex): maybe it won't be necessary in 4.4
-        hosts.add(InetSocketAddress.createUnresolved(address, port));
+      if (!StringUtils.isBlank(address)) {
+        logger.debug("Adding contact point: {}", address);
+        if (InetAddresses.isInetAddress(address)) {
+          hosts.add(new InetSocketAddress(address, port));
+        } else {
+          hosts.add(InetSocketAddress.createUnresolved(address, port));
+        }
       }
     }
 
-    LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " +
-            getProperty(CASSANDRA_HOSTS) + "on port " + port);
-
-    // start generation of the config
-    ProgrammaticDriverConfigLoaderBuilder configBuilder = 
DriverConfigLoader.programmaticBuilder();
-
-    driverConfig.setCompressionProtocol(this, configBuilder);
-    driverConfig.setPoolingOptions(this, configBuilder);
-    driverConfig.setProtocolVersion(this, configBuilder);
-    driverConfig.setQueryOptions(this, configBuilder);
-    driverConfig.setReconnectionPolicy(this, configBuilder);
-    driverConfig.setRetryPolicy(this, configBuilder);
-    driverConfig.setSocketOptions(this, configBuilder);
-    driverConfig.setSpeculativeExecutionPolicy(this, configBuilder);
-
-    //
-    configBuilder.withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
-            DcInferringLoadBalancingPolicy.class);
-    configBuilder.withBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, 
false);
+    LOGGER.info("Bootstrapping Cassandra Java Driver to connect to {} on port 
{}",
+            getProperty(CASSANDRA_HOSTS), port);
 
-    
configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
-            parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
-                    DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)));
-
-    DriverConfigLoader loader = configBuilder.endProfile().build();
-    // TODO(alex): think how to dump built configuration...
-    logger.debug(loader.toString());
-    // end generation of config
+    DriverConfigLoader loader = createLoader();
 
+    LOGGER.debug("Creating cluster builder");
     CqlSessionBuilder clusterBuilder = CqlSession.builder()
-            .addContactPoints(hosts)
-            .withAuthCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
-                    getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
-            .withApplicationName("")
+            .withApplicationName("Zeppelin")
             .withApplicationVersion("");
+    if (!hosts.isEmpty()) {
+      LOGGER.debug("Adding contact points");
+      clusterBuilder.addContactPoints(hosts);
+    }
+
+    String username = getProperty(CASSANDRA_CREDENTIALS_USERNAME, 
NONE_VALUE).trim();
+    String password = getProperty(CASSANDRA_CREDENTIALS_PASSWORD, 
NONE_VALUE).trim();
+    if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password) &&
+            !NONE_VALUE.equalsIgnoreCase(username) && 
!NONE_VALUE.equalsIgnoreCase(password)) {
+      LOGGER.debug("Adding credentials. Username = {}", username);
+      clusterBuilder.withAuthCredentials(username, password);
+    }
 
     String keyspace = getProperty(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE);
     if (StringUtils.isNotBlank(keyspace) && 
!DEFAULT_KEYSPACE.equalsIgnoreCase(keyspace)) {
+      LOGGER.debug("Set default keyspace");
       clusterBuilder.withKeyspace(keyspace);
     }
 
-    final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
-    if (runWithSSL != null && runWithSSL.equals("true")) {
-      LOGGER.debug("Cassandra Interpreter: Using SSL");
-
+    final String runWithSSL = getProperty(CASSANDRA_WITH_SSL, "false");
+    if ("true".equalsIgnoreCase(runWithSSL)) {
+      LOGGER.debug("Using SSL");
       try {
         final SSLContext sslContext;
         {
@@ -235,19 +236,149 @@ public class CassandraInterpreter extends Interpreter {
         }
         clusterBuilder = clusterBuilder.withSslContext(sslContext);
       } catch (Exception e) {
-        LOGGER.error(e.toString());
+        LOGGER.error("Exception initializing SSL {}", e.toString());
       }
     } else {
-      LOGGER.debug("Cassandra Interpreter: Not using SSL");
+      LOGGER.debug("Not using SSL");
     }
 
+    LOGGER.debug("Creating CqlSession");
     session = clusterBuilder.withConfigLoader(loader).build();
+    LOGGER.debug("Session configuration");
+    for (Map.Entry<String, Object> entry:
+            session.getContext().getConfig().getDefaultProfile().entrySet()) {
+      logger.debug("{} = {}", entry.getKey(), entry.getValue().toString());
+    }
+    LOGGER.debug("Creating helper");
     helper = new InterpreterLogic(session, properties);
   }
 
+  private DriverConfigLoader createLoader() {
+    logger.debug("Creating programmatic config loader");
+    // start generation of the config
+    ProgrammaticDriverConfigLoaderBuilder configBuilder = 
DriverConfigLoader.programmaticBuilder();
+
+    Map<DriverOption, String> allOptions = new HashMap<>();
+
+    // set options from main configuration
+    String ts = getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
+            CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT) + 
MILLISECONDS_STR;
+    allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
+    allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, ts);
+    allOptions.put(DefaultDriverOption.REQUEST_TIMEOUT,
+            getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
+                    CassandraInterpreter.DEFAULT_READ_TIMEOUT) + 
MILLISECONDS_STR);
+    addIfNotBlank(allOptions,
+            getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, 
CassandraInterpreter.DEFAULT_TCP_NO_DELAY),
+            DefaultDriverOption.SOCKET_TCP_NODELAY);
+    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_KEEP_ALIVE),
+            DefaultDriverOption.SOCKET_KEEP_ALIVE);
+    addIfNotBlank(allOptions, 
getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES),
+            DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE);
+    addIfNotBlank(allOptions, 
getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES),
+            DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE);
+    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS),
+            DefaultDriverOption.SOCKET_REUSE_ADDRESS);
+    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SO_LINGER),
+            DefaultDriverOption.SOCKET_LINGER_INTERVAL);
+    addIfNotBlank(allOptions,
+            getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE),
+            DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE);
+    allOptions.put(DefaultDriverOption.REQUEST_CONSISTENCY,
+            getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
+                    CassandraInterpreter.DEFAULT_CONSISTENCY));
+    allOptions.put(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY,
+            getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
+                    CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY));
+    allOptions.put(DefaultDriverOption.REQUEST_PAGE_SIZE,
+            getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
+                    CassandraInterpreter.DEFAULT_FETCH_SIZE));
+    ts = getProperty(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION);
+    if (!DEFAULT_VALUE.equalsIgnoreCase(ts)) {
+      // for compatibility with previous configurations
+      if (ts.equals("4") || ts.equals("3")) {
+        ts = "V" + ts;
+      }
+      allOptions.put(DefaultDriverOption.PROTOCOL_VERSION, ts);
+    }
+    addIfNotBlank(allOptions, getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
+            CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase(),
+            DefaultDriverOption.PROTOCOL_COMPRESSION);
+    addIfNotBlankOrDefault(allOptions, getProperty(CASSANDRA_RETRY_POLICY, 
DEFAULT_POLICY),
+            DefaultDriverOption.RETRY_POLICY_CLASS);
+    addIfNotBlankOrDefault(allOptions,
+            getProperty(CASSANDRA_RECONNECTION_POLICY, DEFAULT_POLICY),
+            DefaultDriverOption.RECONNECTION_POLICY_CLASS);
+    addIfNotBlankOrDefault(allOptions,
+            getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, 
DEFAULT_POLICY),
+            DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS);
+    allOptions.put(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE,
+            getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
+                    DEFAULT_CONNECTIONS_PER_HOST));
+    allOptions.put(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE,
+            getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
+                    DEFAULT_CONNECTIONS_PER_HOST));
+    allOptions.put(DefaultDriverOption.CONNECTION_MAX_REQUESTS,
+            getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
+            DEFAULT_MAX_REQUEST_PER_CONNECTION));
+    allOptions.put(DefaultDriverOption.HEARTBEAT_INTERVAL,
+            getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
+                    DEFAULT_HEARTBEAT_INTERVAL) + SECONDS_STR);
+    ts = getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
+            DEFAULT_POOL_TIMEOUT) + MILLISECONDS_STR;
+    allOptions.put(DefaultDriverOption.HEARTBEAT_TIMEOUT, ts);
+    allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
+    allOptions.put(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
+            "DcInferringLoadBalancingPolicy");
+    allOptions.put(DefaultDriverOption.RESOLVE_CONTACT_POINTS, "false");
+    allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
+            getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
+                    DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS) + SECONDS_STR);
+
+    // extract additional options that may override values set by main 
configuration
+    for (String pname: properties.stringPropertyNames()) {
+      if (pname.startsWith(DATASTAX_JAVA_DRIVER_PREFIX)) {
+        String pvalue = properties.getProperty(pname);
+        logger.info("Custom config values: {} = {}", pname, pvalue);
+        String shortName = 
pname.substring(DATASTAX_JAVA_DRIVER_PREFIX.length());
+        if (optionMap.containsKey(shortName)) {
+          allOptions.put(optionMap.get(shortName), pvalue);
+        } else {
+          logger.warn("Incorrect option name: {}", pname);
+        }
+      }
+    }
+
+    for (Map.Entry<DriverOption, String> entry: allOptions.entrySet()) {
+      configBuilder.withString(entry.getKey(), entry.getValue());
+    }
+
+    DriverConfigLoader loader = configBuilder.endProfile().build();
+    logger.debug("Config loader is created");
+
+    return loader;
+  }
+
+  private static void addIfNotBlank(Map<DriverOption, String> allOptions,
+                                    String value,
+                                    DefaultDriverOption option) {
+    if (!StringUtils.isBlank(value)) {
+      allOptions.put(option, value);
+    }
+  }
+
+  private static void addIfNotBlankOrDefault(Map<DriverOption, String> 
allOptions,
+                                             String value,
+                                    DefaultDriverOption option) {
+    if (!StringUtils.isBlank(value) && !DEFAULT_VALUE.equalsIgnoreCase(value)) 
{
+      allOptions.put(option, value);
+    }
+  }
+
   @Override
   public void close() {
-    session.close();
+    if (session != null)
+      session.close();
   }
 
   @Override
diff --git a/cassandra/src/main/resources/interpreter-setting.json 
b/cassandra/src/main/resources/interpreter-setting.json
index 34de7a1..5c32ad3 100644
--- a/cassandra/src/main/resources/interpreter-setting.json
+++ b/cassandra/src/main/resources/interpreter-setting.json
@@ -21,8 +21,8 @@
       "cassandra.protocol.version": {
         "envName": null,
         "propertyName": "cassandra.protocol.version",
-        "defaultValue": "4",
-        "description": "Cassandra protocol version. Default = 4",
+        "defaultValue": "DEFAULT",
+        "description": "Cassandra protocol version. Default = auto-detect",
         "type": "string"
       },
       "cassandra.cluster": {
@@ -105,22 +105,22 @@
       "cassandra.pooling.connection.per.host.local": {
         "envName": null,
         "propertyName": "cassandra.pooling.connection.per.host.local",
-        "defaultValue": "8",
-        "description": "Cassandra connections per host local. Protocol V2 and 
below default = 8 Protocol V3 and above default = 1",
+        "defaultValue": "1",
+        "description": "Cassandra connections per host local. Protocol V3 and 
above default = 1",
         "type": "number"
       },
       "cassandra.pooling.connection.per.host.remote": {
         "envName": null,
         "propertyName": "cassandra.pooling.connection.per.host.remote",
-        "defaultValue": "2",
-        "description": "Cassandra connections per host remote. Protocol V2 and 
below default = 2 Protocol V3 and above default = 1",
+        "defaultValue": "1",
+        "description": "Cassandra connections per host remote. Protocol V3 and 
above default = 1",
         "type": "number"
       },
       "cassandra.pooling.max.request.per.connection": {
         "envName": null,
         "propertyName": "cassandra.pooling.max.request.per.connection",
         "defaultValue": "1024",
-        "description": "Cassandra max requests per connection. Protocol V2 and 
below default = 128 Protocol V3 and above default = 1024",
+        "description": "Cassandra max requests per connection. Protocol V3 and 
above default = 1024",
         "type": "number"
       },
       "cassandra.pooling.pool.timeout.millisecs": {
diff --git 
a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala 
b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
index 31f5142..46ce83e 100644
--- 
a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
+++ 
b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.zeppelin.cassandra
 
+import java.time.Duration
 import java.util.regex.Pattern
 
 import com.datastax.oss.driver.api.core.CqlSession
@@ -46,8 +47,10 @@ class EnhancedSession(val session: CqlSession) {
   private val noResultDisplay = DisplaySystem.NoResultDisplay
   private val DEFAULT_CHECK_TIME: Int = 200
   private val MAX_SCHEMA_AGREEMENT_WAIT: Int = 120000 // 120 seconds
+  private val defaultDDLTimeout: Duration = 
Duration.ofSeconds(MAX_SCHEMA_AGREEMENT_WAIT / 10000)
   private val LOGGER = LoggerFactory.getLogger(classOf[EnhancedSession])
 
+
   val HTML_MAGIC = "%html \n"
 
   val displayNoResult: String = HTML_MAGIC + noResultDisplay.formatNoResult
@@ -191,8 +194,14 @@ class EnhancedSession(val session: CqlSession) {
   }
 
   private def executeStatement[StatementT <: Statement[StatementT]](st: 
StatementT): Any = {
-    val rs: ResultSet = session.execute(st)
-    if (EnhancedSession.isDDLStatement(st)) {
+    val isDDL = EnhancedSession.isDDLStatement(st)
+    val newSt = if (isDDL) {
+      st.setTimeout(defaultDDLTimeout)
+    } else {
+      st
+    }
+    val rs: ResultSet = session.execute(newSt)
+    if (isDDL) {
       if (!rs.getExecutionInfo.isSchemaInAgreement) {
         val startTime = System.currentTimeMillis()
         while(!session.checkSchemaAgreement()) {
diff --git 
a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala 
b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
deleted file mode 100644
index 1834e7e..0000000
--- 
a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.cassandra
-
-import java.lang.Boolean._
-
-import com.datastax.oss.driver.api.core.ProtocolVersion
-import com.datastax.oss.driver.api.core.config.{DefaultDriverOption, 
ProgrammaticDriverConfigLoaderBuilder}
-import org.apache.commons.lang3.StringUtils._
-import org.apache.zeppelin.interpreter.Interpreter
-import org.apache.zeppelin.cassandra.CassandraInterpreter._
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * Utility class to extract and configure the Java driver
- */
-class JavaDriverConfig {
-  val LOGGER: Logger = LoggerFactory.getLogger(classOf[JavaDriverConfig])
-
-  def setSocketOptions(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val connectTimeoutMillis: Int = 
intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
-      CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT).toInt
-    configBuilder.withInt(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, 
connectTimeoutMillis)
-    configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, 
connectTimeoutMillis)
-
-    val readTimeoutMillis: Int = 
intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
-      CassandraInterpreter.DEFAULT_READ_TIMEOUT).toInt
-    configBuilder.withInt(DefaultDriverOption.REQUEST_TIMEOUT, 
readTimeoutMillis)
-
-    val tcpNoDelay = intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, 
CassandraInterpreter.DEFAULT_TCP_NO_DELAY)
-    if (isNotBlank(tcpNoDelay)) {
-      configBuilder.withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, 
parseBoolean(tcpNoDelay))
-    }
-
-    val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE)
-    if (isNotBlank(keepAlive)) {
-      configBuilder.withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, 
parseBoolean(keepAlive))
-    }
-
-    val receivedBuffSize: String = 
intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
-    if (isNotBlank(receivedBuffSize)) {
-      configBuilder.withInt(DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE, 
receivedBuffSize.toInt)
-    }
-
-    val sendBuffSize: String = 
intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
-    if (isNotBlank(sendBuffSize)) {
-      configBuilder.withInt(DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE, 
sendBuffSize.toInt)
-    }
-
-    val reuseAddress: String = 
intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS)
-    if (isNotBlank(reuseAddress)) {
-      configBuilder.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, 
parseBoolean(reuseAddress))
-    }
-
-    val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER)
-    if (isNotBlank(soLinger)) {
-      configBuilder.withInt(DefaultDriverOption.SOCKET_LINGER_INTERVAL, 
soLinger.toInt)
-    }
-  }
-
-  def setQueryOptions(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val consistencyLevel = 
intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
-      CassandraInterpreter.DEFAULT_CONSISTENCY)
-    configBuilder.withString(DefaultDriverOption.REQUEST_CONSISTENCY, 
consistencyLevel)
-
-    val serialConsistencyLevel = 
intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
-      CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY)
-    configBuilder.withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, 
serialConsistencyLevel)
-
-    val fetchSize = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
-      CassandraInterpreter.DEFAULT_FETCH_SIZE).toInt
-    configBuilder.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, fetchSize)
-
-    configBuilder.withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE,
-      parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)))
-  }
-
-  val PROTOCOL_MAPPING: Map[String, ProtocolVersion] = Map("3" -> 
ProtocolVersion.V3, "4" -> ProtocolVersion.V4,
-    "5" -> ProtocolVersion.V5, "DSE1" -> ProtocolVersion.DSE_V1, "DSE2" -> 
ProtocolVersion.DSE_V2)
-
-  def setProtocolVersion(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION,
-      CassandraInterpreter.DEFAULT_PROTOCOL_VERSION)
-    LOGGER.debug("Protocol version : " + protocolVersion)
-
-    protocolVersion match {
-      case "1" | "2" =>
-        throw new RuntimeException(s"Protocol V${protocolVersion} isn't 
supported")
-      case _ =>
-        configBuilder.withString(DefaultDriverOption.PROTOCOL_VERSION,
-          PROTOCOL_MAPPING.getOrElse(protocolVersion, 
ProtocolVersion.DEFAULT).name())
-    }
-  }
-
-  def setPoolingOptions(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options 
: \n\n")
-
-    val coreConnPerHostLocal: Int = 
intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
-      DEFAULT_CONNECTIONS_PER_HOST).toInt
-    configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 
coreConnPerHostLocal)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL)
-      .append(" : ")
-      .append(coreConnPerHostLocal)
-      .append("\n")
-
-    val coreConnPerHostRemote: Int = 
intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
-      DEFAULT_CONNECTIONS_PER_HOST).toInt
-    configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 
coreConnPerHostRemote)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE)
-      .append(" : ")
-      .append(coreConnPerHostRemote)
-      .append("\n")
-
-    val maxReqPerConnLocal: Int = 
intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
-      DEFAULT_MAX_REQUEST_PER_CONNECTION).toInt
-    configBuilder.withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, 
maxReqPerConnLocal)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION)
-      .append(" : ")
-      .append(maxReqPerConnLocal)
-      .append("\n")
-
-    val heartbeatIntervalSeconds: Int = 
intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
-      DEFAULT_HEARTBEAT_INTERVAL).toInt
-    configBuilder.withInt(DefaultDriverOption.HEARTBEAT_INTERVAL, 
heartbeatIntervalSeconds)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS)
-      .append(" : ")
-      .append(heartbeatIntervalSeconds)
-      .append("\n")
-
-    val idleTimeoutSeconds: Int = 
intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
-      DEFAULT_POOL_TIMEOUT).toInt
-    configBuilder.withInt(DefaultDriverOption.HEARTBEAT_TIMEOUT, 
idleTimeoutSeconds)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS)
-      .append(" : ")
-      .append(idleTimeoutSeconds)
-      .append("\n")
-
-    LOGGER.debug(poolingOptionsInfo.append("\n").toString)
-  }
-
-  def setCompressionProtocol(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val compressionProtocol = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
-      CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase
-    LOGGER.debug("Compression protocol : " + compressionProtocol)
-
-    compressionProtocol match {
-      case "snappy" | "lz4" =>
-        configBuilder.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, 
compressionProtocol)
-      case _ => ()
-    }
-  }
-
-  private def isNotDefaultParameter(param: String) = {
-    !(isBlank(param) || DEFAULT_POLICY == param)
-  }
-
-  def setRetryPolicy(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY)
-    LOGGER.debug("Retry Policy : " + retryPolicy)
-
-    if (isNotDefaultParameter(retryPolicy)) {
-      configBuilder.withString(DefaultDriverOption.RETRY_POLICY_CLASS, 
retryPolicy)
-    }
-  }
-
-  def setReconnectionPolicy(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val reconnectionPolicy: String = 
intpr.getProperty(CASSANDRA_RECONNECTION_POLICY)
-    LOGGER.debug("Reconnection Policy : " + reconnectionPolicy)
-
-    if (isNotDefaultParameter(reconnectionPolicy)) {
-      configBuilder.withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, 
reconnectionPolicy)
-    }
-  }
-
-  def setSpeculativeExecutionPolicy(intpr: Interpreter, configBuilder: 
ProgrammaticDriverConfigLoaderBuilder): Unit = {
-    val specExecPolicy: String = 
intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY)
-    LOGGER.debug("Speculative Execution Policy : " + specExecPolicy)
-
-    if (isNotDefaultParameter(specExecPolicy)) {
-      
configBuilder.withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS,
 specExecPolicy)
-    }
-  }
-}
diff --git 
a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
 
b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
index ffdb7d9..5f09394 100644
--- 
a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
+++ 
b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
@@ -17,6 +17,8 @@
 package org.apache.zeppelin.cassandra;
 
 import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -73,6 +75,11 @@ public class CassandraInterpreterTest { //extends 
AbstractCassandraUnit4CQLTestC
 
   @BeforeClass
   public static synchronized void setUp() throws IOException, 
InterruptedException {
+    System.setProperty("cassandra.skip_wait_for_gossip_to_settle", "0");
+    System.setProperty("cassandra.load_ring_state", "false");
+    System.setProperty("cassandra.initial_token", "0");
+    System.setProperty("cassandra.num_tokens", "nil");
+    
System.setProperty("cassandra.allocate_tokens_for_local_replication_factor", 
"nil");
     EmbeddedCassandraServerHelper.startEmbeddedCassandra();
     CqlSession session = EmbeddedCassandraServerHelper.getSession();
     new CQLDataLoader(session).load(new ClassPathCQLDataSet("prepare_all.cql", 
"zeppelin"));
@@ -106,6 +113,7 @@ public class CassandraInterpreterTest { //extends 
AbstractCassandraUnit4CQLTestC
     properties.setProperty(CASSANDRA_HOSTS, 
EmbeddedCassandraServerHelper.getHost());
     properties.setProperty(CASSANDRA_PORT,
             
Integer.toString(EmbeddedCassandraServerHelper.getNativeTransportPort()));
+    
properties.setProperty("datastax-java-driver.advanced.connection.pool.local.size",
 "1");
     interpreter = new CassandraInterpreter(properties);
     interpreter.open();
   }
@@ -122,6 +130,15 @@ public class CassandraInterpreterTest { //extends 
AbstractCassandraUnit4CQLTestC
   }
 
   @Test
+  public void should_set_custom_option() throws Exception {
+    assertThat(interpreter.session).isNotNull();
+    DriverExecutionProfile config = interpreter.session.getContext()
+            .getConfig().getDefaultProfile();
+    assertThat(config.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 
10))
+            .isEqualTo(1);
+  }
+
+  @Test
   public void should_interpret_simple_select() throws Exception {
     //Given
 
diff --git a/docs/interpreter/cassandra.md b/docs/interpreter/cassandra.md
index b4613e2..69487e4 100644
--- a/docs/interpreter/cassandra.md
+++ b/docs/interpreter/cassandra.md
@@ -668,10 +668,9 @@ The **isolated** mode is the most extreme and will create 
as many JVM/`com.datas
 ## Interpreter Configuration
 
 To configure the **Cassandra** interpreter, go to the **Interpreter** menu and 
scroll down to change the parameters.
-The **Cassandra** interpreter is using the official **[Datastax Java Driver 
for Apache Cassandra]®** and most of the parameters are used
-to configure the Java driver
+The **Cassandra** interpreter is using the official **[Datastax Java Driver 
for Apache Cassandra]®** and most of the parameters are used to configure the 
Java driver
 
-Below are the configuration parameters and their default values.
+Below are the configuration parameters supported by interpreter and their 
default values.
 
  <table class="table-configuration">
    <tr>
@@ -741,12 +740,12 @@ Below are the configuration parameters and their default 
values.
    </tr>
    <tr>
      <td>`cassandra.pooling.connection.per.host.local`</td>
-     <td>Protocol V2 and below default = 2. Protocol V3 and above default = 
1</td>
-     <td>2</td>
+     <td>Protocol V3 and above default = 1</td>
+     <td>1</td>
    </tr>
    <tr>
      <td>`cassandra.pooling.connection.per.host.remote`</td>
-     <td>Protocol V2 and below default = 1. Protocol V3 and above default = 
1</td>
+     <td>Protocol V3 and above default = 1</td>
      <td>1</td>
    </tr>
    <tr>
@@ -756,8 +755,8 @@ Below are the configuration parameters and their default 
values.
    </tr>
    <tr>
      <td>`cassandra.pooling.max.request.per.connection`</td>
-     <td>Protocol V2 and below default = 128. Protocol V3 and above default = 
1024</td>
-     <td>128</td>
+     <td>Protocol V3 and above default = 1024</td>
+     <td>1024</td>
    </tr>
    <tr>
      <td>`cassandra.pooling.pool.timeout.millisecs`</td>
@@ -766,7 +765,7 @@ Below are the configuration parameters and their default 
values.
    </tr>
    <tr>
      <td>`cassandra.protocol.version`</td>
-     <td>Cassandra binary protocol version (`3`, `4`, `DSE1`, `DSE2`)</td>
+     <td>Cassandra binary protocol version (`V3`, `V4`, ...)</td>
      <td>`DEFAULT` (detected automatically)</td>
    </tr>
    <tr>
@@ -902,6 +901,8 @@ Below are the configuration parameters and their default 
values.
    </tr>
  </table>
 
+Besides these parameters, it's also possible to set other driver parameters by 
adding them into interpreter configuration.  The configuration key should have 
full form with `datastax-java-driver` prefix, as [described in 
documentation](https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/).
  For example, to specify 5 seconds request timeout, you can use 
`datastax-java-driver.basic.request.timeout` with value of `5 seconds`.  Full 
list of available configura [...]
+
 ## Change Log
 
 **4.0** _(Zeppelin {{ site.ZEPPELIN_VERSION }})_ :

Reply via email to