This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 6ed42d2 [ZEPPELIN-4444] Fix schema disagreement when execute DDL statements 6ed42d2 is described below commit 6ed42d20a1994a6a0a050cb157a0bc89c641eb47 Author: Alex Ott <alex...@gmail.com> AuthorDate: Sun Nov 24 16:22:41 2019 +0100 [ZEPPELIN-4444] Fix schema disagreement when execute DDL statements ### What is this PR for? When executing DDL statements (`CREATE`/`ALTER`/`DROP`) it's possible to get schema disagreement in Cassandra, especially for big/geo-distributed clusters. Java driver has a special handling of such statements, with increased timeout, but it's still possible that next DDL statement will be executed before all nodes agree on the schema version, and this could lead to schema disagreement that need to be resolved by administrators. ### What type of PR is it? Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4444 ### How should this be tested? * Added the unit test * Tested manually * Travis CI: https://travis-ci.org/alexott/zeppelin/builds/620614700 Author: Alex Ott <alex...@gmail.com> Closes #3527 from alexott/ZEPPELIN-4444 and squashes the following commits: 4b803b7ab [Alex Ott] [ZEPPELIN-4444] Fix schema disagreement when execute DDL statements --- .../zeppelin/cassandra/EnhancedSession.scala | 43 ++++++++++++- .../zeppelin/cassandra/InterpreterLogic.scala | 2 +- .../zeppelin/cassandra/EnhancedSessionTest.scala | 74 ++++++++++++++++++++++ 3 files changed, 116 insertions(+), 3 deletions(-) diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala index a0c475a..988cdd2 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala @@ -16,9 +16,12 @@ */ package org.apache.zeppelin.cassandra +import java.util.regex.Pattern + import com.datastax.driver.core._ import org.apache.zeppelin.cassandra.TextBlockHierarchy._ import org.apache.zeppelin.interpreter.InterpreterException +import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ @@ -38,7 +41,8 @@ class EnhancedSession(val session: Session) { val materializedViewDisplay = DisplaySystem.MaterializedViewDisplay val helpDisplay = DisplaySystem.HelpDisplay private val noResultDisplay = DisplaySystem.NoResultDisplay - + private val DEFAULT_CHECK_TIME = 200 // half second + private val LOGGER = LoggerFactory.getLogger(classOf[EnhancedSession]) val HTML_MAGIC = "%html \n" @@ -181,6 +185,20 @@ class EnhancedSession(val session: Session) { } + private def execute(st: Statement): Any = { + val rs = session.execute(st) + if (EnhancedSession.isDDLStatement(st)) { + if (!rs.getExecutionInfo.isSchemaInAgreement) { + val metadata = session.getCluster.getMetadata + while(!metadata.checkSchemaAgreement) { + LOGGER.info("Schema is still not in agreement, waiting...") + Thread.sleep(DEFAULT_CHECK_TIME) + } + } + } + rs + } + def execute(st: Any): Any = { st match { case x:DescribeClusterCmd => execute(x) @@ -197,8 +215,29 @@ class EnhancedSession(val session: Session) { case x:DescribeMaterializedViewCmd => execute(x) case x:DescribeMaterializedViewsCmd => execute(x) case x:HelpCmd => execute(x) - case x:Statement => session.execute(x) + case x:Statement => execute(x) case _ => throw new InterpreterException(s"Cannot execute statement '$st' of type ${st.getClass}") } } } + +object EnhancedSession { + private val DDL_REGEX = Pattern.compile("^(CREATE|DROP|ALTER) .*", Pattern.CASE_INSENSITIVE) + + def isDDLStatement(query: String): Boolean = { + DDL_REGEX.matcher(query.trim).matches() + } + + def isDDLStatement(st: Statement): Boolean = { + st match { + case x:BoundStatement => + isDDLStatement(x.preparedStatement.getQueryString) + case x:BatchStatement => + x.getStatements.asScala.seq.exists(isDDLStatement) + case x:RegularStatement => + isDDLStatement(x.getQueryString) + case _ => // only should be for StatementWrapper + true + } + } +} \ No newline at end of file diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala index dd8456a..a871abd 100644 --- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala @@ -332,7 +332,7 @@ class InterpreterLogic(val session: Session) { case Some(value) => statement.replaceAll(escapedExp,value.toString) case None => { val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList - val paramOptions= listChoices.map(choice => new ParamOption(choice, choice)) + val paramOptions = listChoices.map(choice => new ParamOption(choice, choice)) val selected = context.getGui.select(variable, paramOptions.toArray, listChoices.head) statement.replaceAll(escapedExp,selected.toString) } diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala new file mode 100644 index 0000000..c9ba19f --- /dev/null +++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.cassandra + +import com.datastax.driver.core.{BatchStatement, SimpleStatement} +import org.scalatest.FlatSpec + +class EnhancedSessionTest extends FlatSpec { + + "Query" should "be detected as DDL for create" in { + assertResult(true){ + EnhancedSession.isDDLStatement("create TABLE if not exists test.test(id int primary key);") + } + } + + it should "be detected as DDL for drop" in { + assertResult(true) { + EnhancedSession.isDDLStatement("DROP KEYSPACE if exists test;") + } + } + + it should "be detected as DDL for alter" in { + assertResult(true) { + EnhancedSession.isDDLStatement("ALTER TABLE test.test WITH comment = 'some comment' ;") + } + } + + it should "not be detected as DDL for select" in { + assertResult(false) { + EnhancedSession.isDDLStatement("select * from test.test;") + } + } + + it should "be detected as DDL for create in simple statement" in { + assertResult(true) { + EnhancedSession.isDDLStatement(new SimpleStatement("create TABLE if not exists test.test(id int primary key);")) + } + } + + it should "be detected as DDL for create in batch statement" in { + val batch = new BatchStatement + batch.add(new SimpleStatement("create TABLE if not exists test.test(id int primary key);")) + batch.add(new SimpleStatement("insert into test.test(id) values(1);")) + assertResult(true) { + EnhancedSession.isDDLStatement(batch) + } + } + + it should "not be detected as DDL for only inserts in batch statement" in { + val batch = new BatchStatement + batch.add(new SimpleStatement("insert into test.test(id) values(1);")) + batch.add(new SimpleStatement("insert into test.test(id) values(2);")) + batch.add(new SimpleStatement("insert into test.test(id) values(3);")) + assertResult(false) { + EnhancedSession.isDDLStatement(batch) + } + } + +}