This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 7d1fa4238749a6dc18af55e288cef8437694cdf0 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Fri Oct 15 13:03:04 2021 +0800 [Dependency] Upgrade thirdparty libs (#6766) Upgrade the following dependecies: libevent -> 2.1.12 OpenSSL 1.0.2k -> 1.1.1l thrift 0.9.3 -> 0.13.0 protobuf 3.5.1 -> 3.14.0 gflags 2.2.0 -> 2.2.2 glog 0.3.3 -> 0.4.0 googletest 1.8.0 -> 1.10.0 snappy 1.1.7 -> 1.1.8 gperftools 2.7 -> 2.9.1 lz4 1.7.5 -> 1.9.3 curl 7.54.1 -> 7.79.0 re2 2017-05-01 -> 2021-02-02 zstd 1.3.7 -> 1.5.0 brotli 1.0.7 -> 1.0.9 flatbuffers 1.10.0 -> 2.0.0 apache-arrow 0.15.1 -> 5.0.0 CRoaring 0.2.60 -> 0.3.4 orc 1.5.8 -> 1.6.6 libdivide 4.0.0 -> 5.0 brpc 0.97 -> 1.0.0-rc02 librdkafka 1.7.0 -> 1.8.0 after this pr compile doris should use build-env:1.4.0 --- pom.xml | 55 +++++++++++++-- pom_3.0.xml | 79 ++++++++++++++++++++-- .../apache/doris/spark/backend/BackendClient.java | 28 ++++---- .../apache/doris/spark/rdd/ScalaValueReader.scala | 20 +++--- .../doris/spark/serialization/TestRowBatch.java | 6 +- 5 files changed, 151 insertions(+), 37 deletions(-) diff --git a/pom.xml b/pom.xml index e015f06..d8ebfe7 100644 --- a/pom.xml +++ b/pom.xml @@ -31,8 +31,8 @@ <properties> <scala.version>2.11</scala.version> <spark.version>2.3.4</spark.version> - <libthrift.version>0.9.3</libthrift.version> - <arrow.version>0.15.1</arrow.version> + <libthrift.version>0.13.0</libthrift.version> + <arrow.version>5.0.0</arrow.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> @@ -107,7 +107,12 @@ <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version> </dependency> - + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${arrow.version}</version> + <scope>runtime</scope> + </dependency> <!--Test--> <dependency> <groupId>org.hamcrest</groupId> @@ -145,14 +150,55 @@ <version>${spark.version}</version> <scope>test</scope> </dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.10.0</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.10.0</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.27.Final</version> + <scope>provided</scope> + </dependency> + </dependencies> <build> <plugins> + <!-- add gensrc java build src dir --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.2.0</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <!-- add arbitrary num of src dirs here --> + <source>${project.build.directory}/generated-sources/thrift/</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> <plugin> <groupId>org.apache.thrift.tools</groupId> <artifactId>maven-thrift-plugin</artifactId> <version>0.1.11</version> + <configuration> + <generator>java:fullcamel</generator> + </configuration> <executions> <execution> <id>thrift-sources</id> @@ -242,7 +288,8 @@ <version>0.7.8</version> <configuration> <excludes> - <exclude>**/thrift/**</exclude></excludes> + <exclude>**/thrift/**</exclude> + </excludes> </configuration> <executions> <execution> diff --git a/pom_3.0.xml b/pom_3.0.xml index 4973ff8..6c8eee5 100644 --- a/pom_3.0.xml +++ b/pom_3.0.xml @@ -31,9 +31,13 @@ <properties> <scala.version>2.12</scala.version> <spark.version>3.1.2</spark.version> - <libthrift.version>0.9.3</libthrift.version> - <arrow.version>1.0.1</arrow.version> + <libthrift.version>0.13.0</libthrift.version> + <arrow.version>5.0.0</arrow.version> + <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> + <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> + <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty> </properties> <profiles> @@ -103,7 +107,13 @@ <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version> </dependency> - + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${arrow.version}</version> + <scope>runtime</scope> + </dependency> + <!--Test--> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> @@ -154,12 +164,36 @@ </dependencies> - <build> + <build> <plugins> + <!-- add gensrc java build src dir --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.2.0</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <!-- add arbitrary num of src dirs here --> + <source>${project.build.directory}/generated-sources/thrift/</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> <plugin> <groupId>org.apache.thrift.tools</groupId> <artifactId>maven-thrift-plugin</artifactId> <version>0.1.11</version> + <configuration> + <generator>java:fullcamel</generator> + </configuration> <executions> <execution> <id>thrift-sources</id> @@ -277,14 +311,47 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.8.1</version> + <version>${maven-compiler-plugin.version}</version> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>${maven-javadoc-plugin.version}</version> + <configuration> + <source>8</source> + <failOnError>false</failOnError> + <aggregate>true</aggregate> + </configuration> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>${maven-source-plugin.version}</version> + <configuration> + <attach>true</attach> + </configuration> + <executions> + <execution> + <phase>compile</phase> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> - diff --git a/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/src/main/java/org/apache/doris/spark/backend/BackendClient.java index 90baf79..b5f6112 100644 --- a/src/main/java/org/apache/doris/spark/backend/BackendClient.java +++ b/src/main/java/org/apache/doris/spark/backend/BackendClient.java @@ -127,14 +127,14 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to openScanner {}.", attempt, routing); try { - TScanOpenResult result = client.open_scanner(openParams); + TScanOpenResult result = client.openScanner(openParams); if (result == null) { logger.warn("Open scanner result from {} is null.", routing); continue; } - if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) { + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { logger.warn("The status of open scanner result from {} is '{}', error message is: {}.", - routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()); + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); continue; } return result; @@ -163,14 +163,14 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to getNext {}.", attempt, routing); try { - result = client.get_next(nextBatchParams); + result = client.getNext(nextBatchParams); if (result == null) { logger.warn("GetNext result from {} is null.", routing); continue; } - if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) { + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { logger.warn("The status of get next result from {} is '{}', error message is: {}.", - routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()); + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); continue; } return result; @@ -179,11 +179,11 @@ public class BackendClient { ex = e; } } - if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code()))) { - logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(), - result.getStatus().getError_msgs()); - throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(), - result.getStatus().getError_msgs()); + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + throw new DorisInternalException(routing.toString(), result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); } logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); @@ -206,14 +206,14 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to closeScanner {}.", attempt, routing); try { - TScanCloseResult result = client.close_scanner(closeParams); + TScanCloseResult result = client.closeScanner(closeParams); if (result == null) { logger.warn("CloseScanner result from {} is null.", routing); continue; } - if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) { + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { logger.warn("The status of get next result from {} is '{}', error message is: {}.", - routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()); + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); continue; } break; diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index f3334b9..a1b26e4 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -107,9 +107,9 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { DORIS_EXEC_MEM_LIMIT_DEFAULT } - params.setBatch_size(batchSize) - params.setQuery_timeout(queryDorisTimeout) - params.setMem_limit(execMemLimit) + params.setBatchSize(batchSize) + params.setQueryTimeout(queryDorisTimeout) + params.setMemLimit(execMemLimit) params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) @@ -117,25 +117,25 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { s"cluster: ${params.getCluster}, " + s"database: ${params.getDatabase}, " + s"table: ${params.getTable}, " + - s"tabletId: ${params.getTablet_ids}, " + + s"tabletId: ${params.getTabletIds}, " + s"batch size: $batchSize, " + s"query timeout: $queryDorisTimeout, " + s"execution memory limit: $execMemLimit, " + s"user: ${params.getUser}, " + - s"query plan: ${params.opaqued_query_plan}") + s"query plan: ${params.getOpaquedQueryPlan}") params } protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams)) - protected val contextId: String = openResult.getContext_id + protected val contextId: String = openResult.getContextId protected val schema: Schema = - SchemaUtils.convertToSchema(openResult.getSelected_columns) + SchemaUtils.convertToSchema(openResult.getSelectedColumns) protected val asyncThread: Thread = new Thread { override def run { val nextBatchParams = new TScanNextBatchParams - nextBatchParams.setContext_id(contextId) + nextBatchParams.setContextId(contextId) while (!eos.get) { nextBatchParams.setOffset(offset) val nextResult = lockClient(_.getNext(nextBatchParams)) @@ -194,7 +194,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { rowBatch.close } val nextBatchParams = new TScanNextBatchParams - nextBatchParams.setContext_id(contextId) + nextBatchParams.setContextId(contextId) nextBatchParams.setOffset(offset) val nextResult = lockClient(_.getNext(nextBatchParams)) eos.set(nextResult.isEos) @@ -221,7 +221,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { def close(): Unit = { val closeParams = new TScanCloseParams - closeParams.context_id = contextId + closeParams.setContextId(contextId) lockClient(_.closeScanner(closeParams)) } diff --git a/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index 64eaf63..e5f5ee7 100644 --- a/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -222,7 +222,7 @@ public class TestRowBatch { arrowStreamWriter.close(); TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); + status.setStatusCode(TStatusCode.OK); TScanBatchResult scanBatchResult = new TScanBatchResult(); scanBatchResult.setStatus(status); scanBatchResult.setEos(false); @@ -344,7 +344,7 @@ public class TestRowBatch { arrowStreamWriter.close(); TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); + status.setStatusCode(TStatusCode.OK); TScanBatchResult scanBatchResult = new TScanBatchResult(); scanBatchResult.setStatus(status); scanBatchResult.setEos(false); @@ -406,7 +406,7 @@ public class TestRowBatch { arrowStreamWriter.close(); TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); + status.setStatusCode(TStatusCode.OK); TScanBatchResult scanBatchResult = new TScanBatchResult(); scanBatchResult.setStatus(status); scanBatchResult.setEos(false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org