This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 00e69022127 branch-2.1: [chore](arrow-flight-sql) Add Arrow Flight Sql demo for Java #45306 (#45389) 00e69022127 is described below commit 00e69022127fa8793cd0bbe34483421b1c5cdd3e Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Mon Dec 16 14:56:56 2024 +0800 branch-2.1: [chore](arrow-flight-sql) Add Arrow Flight Sql demo for Java #45306 (#45389) Cherry-picked from #45306 Co-authored-by: Xinyi Zou <zouxi...@selectdb.com> --- samples/arrow-flight-sql/java/README.md | 35 ++++ samples/arrow-flight-sql/java/pom.xml | 186 +++++++++++++++++++++ .../doris/arrowflight/demo/ArrowBatchReader.java | 110 ++++++++++++ .../java/doris/arrowflight/demo/Configuration.java | 42 +++++ .../doris/arrowflight/demo/FlightAdbcDriver.java | 90 ++++++++++ .../doris/arrowflight/demo/FlightJdbcDriver.java | 86 ++++++++++ .../doris/arrowflight/demo/FlightSqlClient.java | 144 ++++++++++++++++ .../doris/arrowflight/demo/JdbcDriverManager.java | 90 ++++++++++ .../arrowflight/demo/JdbcResultSetReader.java | 63 +++++++ .../src/main/java/doris/arrowflight/demo/Main.java | 30 ++++ .../doris/arrowflight/demo/ConfigurationTest.java | 35 ++++ 11 files changed, 911 insertions(+) diff --git a/samples/arrow-flight-sql/java/README.md b/samples/arrow-flight-sql/java/README.md new file mode 100644 index 00000000000..1eb574e2ab6 --- /dev/null +++ b/samples/arrow-flight-sql/java/README.md @@ -0,0 +1,35 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to use: + + 1. mvn clean install -U + 2. mvn package + 3. java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -cp java-0.1.jar doris.arrowflight.demo.Main "sql" "fe_ip" "fe_arrow_flight_port" "fe_query_port" + +# What can this demo do: + + This is a java demo for doris arrow flight sql, you can use this to test various connection + methods for sending queries to the doris arrow flight server, help you understand how to use arrow flight sql + and test performance. You should install maven prior to run this demo. + +# Performance test + + Section 6.2 of https://github.com/apache/doris/issues/25514 is the performance test + results of the doris arrow flight sql using java. \ No newline at end of file diff --git a/samples/arrow-flight-sql/java/pom.xml b/samples/arrow-flight-sql/java/pom.xml new file mode 100644 index 00000000000..d08e30d69ae --- /dev/null +++ b/samples/arrow-flight-sql/java/pom.xml @@ -0,0 +1,186 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>doris.arrowflight.demo</groupId> + <artifactId>java</artifactId> + <version>0.1</version> + + <properties> + <maven.compiler.source>17</maven.compiler.source> + <maven.compiler.target>17</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <!-- + Notice !!! Arrow and ADBC versions cannot be combined arbitrarily. + + 1. For ADBC 0.15.0, please use Arrow 18.0.0 and after, not compatible with previous versions of Arrow. + 2. Try not to use Arrow 17.0.0, may get the following error: + ``` + Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2 + at com.google.protobuf.GeneratedMessageV3$FieldAccessorTable.ensureFieldAccessorsInitialized(GeneratedMessageV3.java:2094) + at org.apache.arrow.flight.sql.impl.FlightSql$ActionCreatePreparedStatementRequest.internalGetFieldAccessorTable(FlightSql.java:16332) + at com.google.protobuf.GeneratedMessageV3.getDescriptorForType(GeneratedMessageV3.java:139) + at com.google.protobuf.Any.pack(Any.java:61) + at org.apache.arrow.flight.sql.FlightSqlClient.prepare(FlightSqlClient.java:767) + at org.apache.arrow.flight.sql.FlightSqlClient.prepare(FlightSqlClient.java:746) + at org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.prepare(ArrowFlightSqlClientHandler.java:310) + ``` + similar issue: https://github.com/protocolbuffers/protobuf/issues/15762 + 3. A more stable version is Arrow 15.0.2 and ADBC 0.12.0, but we always hope to embrace the future with new versions! + --> + <arrow.version>18.1.0</arrow.version> + <adbc.version>0.15.0</adbc.version> + <log4j.version>2.17.1</log4j.version> + </properties> + <dependencies> + <!-- If Maven cannot find the Arrow or Adbc version of the dependency, update the Maven central repository. + Arrow and Adbc are updated frequently, and we always try to use the latest version as much as possible. --> + <dependency> + <groupId>org.apache.arrow.adbc</groupId> + <artifactId>adbc-driver-jdbc</artifactId> + <version>${adbc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow.adbc</groupId> + <artifactId>adbc-driver-flight-sql</artifactId> + <version>${adbc.version}</version> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>8.0.33</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-core</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-sql</artifactId> + <version>${arrow.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.arrow.adbc</groupId> + <artifactId>adbc-core</artifactId> + <version>${adbc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow.adbc</groupId> + <artifactId>adbc-driver-manager</artifactId> + <version>${adbc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow.adbc</groupId> + <artifactId>adbc-sql</artifactId> + <version>${adbc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-sql-jdbc-core</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.13.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>RELEASE</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Java Compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.10.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.4.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/ArrowBatchReader.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/ArrowBatchReader.java new file mode 100644 index 00000000000..2522187db6d --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/ArrowBatchReader.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + + +/** + * Iterate over each batch in ArrowReader. + * ArrowReader is the iterator returned by ADBC Client when executing a query. + */ +public class ArrowBatchReader { + + @FunctionalInterface + public interface LoadArrowBatchFunc { + void load(ArrowReader reader) throws IOException; + } + + /** + * Print one row in VectorSchemaRoot, if the output format is incorrect, may need to modify + * the output method of different types of ValueVector. + */ + public static void printRow(VectorSchemaRoot root, int rowIndex) { + if (root == null || rowIndex < 0 || rowIndex >= root.getRowCount()) { + System.out.println("Invalid row index: " + rowIndex); + return; + } + + System.out.print("> "); + for (Field field : root.getSchema().getFields()) { + ValueVector vector = root.getVector(field.getName()); + if (vector != null) { + if (vector instanceof org.apache.arrow.vector.DateDayVector) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + int dayOffset = ((org.apache.arrow.vector.DateDayVector) vector).get(rowIndex); + LocalDate date = LocalDate.ofEpochDay(dayOffset); + System.out.print(date.format(formatter)); + } else if (vector instanceof org.apache.arrow.vector.BitVector) { + System.out.print(((org.apache.arrow.vector.BitVector) vector).get(rowIndex) == 1); + } else { + // other types field + System.out.print(vector.getObject(rowIndex).toString()); + } + System.out.print(", "); + } + } + System.out.println(); + } + + /** + * Iterate over each batch in ArrowReader with the least cost, only record the number of rows and batches, + * usually used to test performance. + */ + public static LoadArrowBatchFunc loadArrowBatch = reader -> { + int rowCount = 0; + int batchCount = 0; + while (reader.loadNextBatch()) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + if (batchCount == 0) { + System.out.println("> " + root.getSchema().toString()); + printRow(root, 1); // only print first line + } + rowCount += root.getRowCount(); + batchCount += 1; + } + System.out.println("> batchCount: " + batchCount + ", rowCount: " + rowCount); + }; + + /** + * Iterate over each batch in ArrowReader and convert the batch to String, this will take more time. + */ + public static LoadArrowBatchFunc loadArrowBatchToString = reader -> { + int rowCount = 0; + List<String> result = new ArrayList<>(); + while (reader.loadNextBatch()) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + if (result.size() == 0) { + System.out.println("> " + root.getSchema().toString()); + printRow(root, 0); // only print first line + } + result.add(root.contentToTSVString()); + rowCount += root.getRowCount(); + } + System.out.println("> batchCount: " + result.size() + ", rowCount: " + rowCount); + }; +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/Configuration.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/Configuration.java new file mode 100644 index 00000000000..c40a46ca057 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/Configuration.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +public class Configuration { + public String sql = ""; // require + public String ip = "127.0.0.1"; // require + public String arrowFlightPort = "9090"; // require + public String mysqlPort = "9030"; + public int retryTimes = 2; // The first execution is cold run + public String user = "root"; + public String password = ""; + + Configuration(String[] args) { + for (int i = 0; i < args.length; i++) { + switch (i) { + case 0 -> sql = args[i]; + case 1 -> ip = args[i]; + case 2 -> arrowFlightPort = args[i]; + case 3 -> mysqlPort = args[i]; + case 4 -> retryTimes = Integer.parseInt(args[i]); + case 5 -> user = args[i]; + case 6 -> password = args[i]; + } + } + } +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java new file mode 100644 index 00000000000..3c2202b2b07 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import doris.arrowflight.demo.ArrowBatchReader.LoadArrowBatchFunc; +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.core.AdbcStatement.QueryResult; +import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +import java.util.HashMap; +import java.util.Map; + +/** + * Use the Arrow Flight ADBC driver to connect to the Doris Arrow Flight server and execute query. + */ +public class FlightAdbcDriver { + private static void connectAndExecute(Configuration configuration, LoadArrowBatchFunc loadArrowReader) { + final BufferAllocator allocator = new RootAllocator(); + FlightSqlDriver driver = new FlightSqlDriver(allocator); + Map<String, Object> parameters = new HashMap<>(); + AdbcDriver.PARAM_URI.set(parameters, + Location.forGrpcInsecure(configuration.ip, Integer.parseInt(configuration.arrowFlightPort)).getUri() + .toString()); + AdbcDriver.PARAM_USERNAME.set(parameters, configuration.user); + AdbcDriver.PARAM_PASSWORD.set(parameters, configuration.password); + + try { + AdbcDatabase adbcDatabase = driver.open(parameters); + AdbcConnection connection = adbcDatabase.connect(); + AdbcStatement stmt = connection.createStatement(); + long start = System.currentTimeMillis(); + stmt.setSqlQuery(configuration.sql); + + // executeQuery, two steps: + // 1. Execute Query and get returned FlightInfo; + // 2. Create FlightInfoReader to sequentially traverse each Endpoint; + QueryResult queryResult = stmt.executeQuery(); + ArrowReader reader = queryResult.getReader(); + loadArrowReader.load(reader); + System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start)); + + reader.close(); + queryResult.close(); + stmt.close(); + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run(Configuration configuration) { + System.out.println("*************************************"); + System.out.println("| FlightAdbcDriver |"); + System.out.println("*************************************"); + + System.out.println("FlightAdbcDriver > loadArrowBatch"); + connectAndExecute(configuration, ArrowBatchReader.loadArrowBatch); + System.out.println("FlightAdbcDriver > loadArrowBatchToString"); + connectAndExecute(configuration, ArrowBatchReader.loadArrowBatchToString); + } + + public static void main(String[] args) throws Exception { + Configuration configuration = new Configuration(args); + for (int i = 0; i < configuration.retryTimes; i++) { + run(configuration); + } + } +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java new file mode 100644 index 00000000000..1560e5172d7 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import doris.arrowflight.demo.ArrowBatchReader.LoadArrowBatchFunc; +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.driver.jdbc.JdbcDriver; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +import java.util.HashMap; +import java.util.Map; + +/** + * Use the Arrow Flight JDBC driver to connect to the Doris Arrow Flight server and execute query. + * Unlike the Java JDBC DriverManager, this is a JDBC Driver provided by Arrow Flight, which may contain + * some optimizations (although no performance advantage was observed). + */ +public class FlightJdbcDriver { + private static void connectAndExecute(Configuration configuration, LoadArrowBatchFunc loadArrowReader) { + String DB_URL = "jdbc:arrow-flight-sql://" + configuration.ip + ":" + configuration.arrowFlightPort + + "?useServerPrepStmts=false" + "&cachePrepStmts=true&useSSL=false&useEncryption=false"; + final Map<String, Object> parameters = new HashMap<>(); + AdbcDriver.PARAM_URI.set(parameters, DB_URL); + AdbcDriver.PARAM_USERNAME.set(parameters, configuration.user); + AdbcDriver.PARAM_PASSWORD.set(parameters, configuration.password); + + try { + BufferAllocator allocator = new RootAllocator(); + AdbcDatabase db = new JdbcDriver(allocator).open(parameters); + AdbcConnection connection = db.connect(); + AdbcStatement stmt = connection.createStatement(); + + long start = System.currentTimeMillis(); + stmt.setSqlQuery(configuration.sql); + AdbcStatement.QueryResult queryResult = stmt.executeQuery(); + ArrowReader reader = queryResult.getReader(); + loadArrowReader.load(reader); + System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start)); + + reader.close(); + queryResult.close(); + stmt.close(); + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run(Configuration configuration) { + System.out.println("*************************************"); + System.out.println("| FlightJdbcDriver |"); + System.out.println("*************************************"); + + System.out.println("FlightJdbcDriver > loadArrowBatch"); + connectAndExecute(configuration, ArrowBatchReader.loadArrowBatch); + System.out.println("FlightJdbcDriver > loadArrowBatchToString"); + connectAndExecute(configuration, ArrowBatchReader.loadArrowBatchToString); + } + + public static void main(String[] args) { + Configuration configuration = new Configuration(args); + for (int i = 0; i < configuration.retryTimes; i++) { + run(configuration); + } + } +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightSqlClient.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightSqlClient.java new file mode 100644 index 00000000000..3a01ea16370 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightSqlClient.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.auth2.BearerCredentialWriter; +import org.apache.arrow.flight.grpc.CredentialCallOption; +import org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; + +import java.net.URI; +import java.net.URISyntaxException; + +/** + * Manually execute Arrow Flight SQL Rpc process, usually used for debug. + */ +public class FlightSqlClient { + public record FlightInfoResult<T, T2, T3>(T first, T2 second, T3 third) { + } + + public record DummyFlightInfoResult<T, T2>(T first, T2 second) { + } + + /** + * Connect to FE Arrow Flight Server to obtain Bearertoken and execute Query to get Ticket. + */ + public static FlightInfoResult<FlightInfo, CredentialCallOption, org.apache.arrow.flight.sql.FlightSqlClient.PreparedStatement> getFlightInfoFromDorisFe( + Configuration configuration) throws URISyntaxException { + BufferAllocator allocatorFE = new RootAllocator(Integer.MAX_VALUE); + final Location clientLocationFE = new Location( + new URI("grpc", null, configuration.ip, Integer.parseInt(configuration.arrowFlightPort), + null, null, null)); + FlightClient clientFE = FlightClient.builder(allocatorFE, clientLocationFE).build(); + org.apache.arrow.flight.sql.FlightSqlClient sqlClinetFE = new org.apache.arrow.flight.sql.FlightSqlClient( + clientFE); + + // Use username and password authentication to obtain a Bearertoken for subsequent access to the Doris Arrow Flight Server. + CredentialCallOption credentialCallOption = clientFE.authenticateBasicToken(configuration.user, + configuration.password).get(); + final org.apache.arrow.flight.sql.FlightSqlClient.PreparedStatement preparedStatement = sqlClinetFE.prepare( + configuration.sql, + credentialCallOption); + final FlightInfo info = preparedStatement.execute(credentialCallOption); + return new FlightInfoResult<>(info, credentialCallOption, preparedStatement); + } + + /** + * Use the correct Bearertoken and the correct Ticket, and the expected return result is normal. + */ + public static void getResultFromDorisBe(FlightInfo info, Ticket ticket, CredentialCallOption credentialCallOption) + throws Exception { + final Location locationBE = info.getEndpoints().get(0).getLocations().get(0); + // 连接 BE Arrow Flight Server + BufferAllocator allocatorBE = new RootAllocator(Integer.MAX_VALUE); + FlightClient clientBE = FlightClient.builder(allocatorBE, locationBE).build(); + org.apache.arrow.flight.sql.FlightSqlClient sqlClinetBE = new org.apache.arrow.flight.sql.FlightSqlClient( + clientBE); + + FlightStream stream = sqlClinetBE.getStream(ticket, credentialCallOption); + int rowCount = 0; + int batchCount = 0; + while (stream.next()) { + VectorSchemaRoot root = stream.getRoot(); + if (batchCount == 0) { + System.out.println("> " + root.getSchema().toString()); + ArrowBatchReader.printRow(root, 1); // only print first line + } + rowCount += root.getRowCount(); + batchCount += 1; + } + System.out.println("> batchCount: " + batchCount + ", rowCount: " + rowCount); + stream.close(); + } + + /** + * Construct a dummy Ticket and CredentialCallOption to simulate the BE Arrow Flight Server being hacked, + * to analyze data security. + * + * @return get error `INVALID_ARGUMENT: Malformed ticket` + */ + public static DummyFlightInfoResult<Ticket, CredentialCallOption> constructDummyFlightInfo() { + String Bearertoken = "ojatddjr72k1ss20sqkatkhtd7"; + String queryId = "18c64b4e15094922-af5fea3da80fb89f"; + String query = "select * from clickbench.hits limit 10;"; + CredentialCallOption dummyCredentialCallOption = new CredentialCallOption( + new BearerCredentialWriter(Bearertoken)); + final ByteString handle = ByteString.copyFromUtf8(queryId + ":" + query); + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle).build(); + final Ticket dummyTicket = new Ticket(Any.pack(ticketStatement).toByteArray()); + return new DummyFlightInfoResult<>(dummyTicket, dummyCredentialCallOption); + } + + public static void run(Configuration configuration) { + System.out.println("*************************************"); + System.out.println("| FlightSqlClient |"); + System.out.println("*************************************"); + + try { + System.out.println("FlightSqlClient > getFlightInfoFromDorisFe"); + var flightInfo = getFlightInfoFromDorisFe(configuration); + getResultFromDorisBe(flightInfo.first(), flightInfo.first().getEndpoints().get(0).getTicket(), + flightInfo.second()); + System.out.println(); + + System.out.println( + "FlightSqlClient > constructDummyFlightInfo, don't be afraid! expected to get error `INVALID_ARGUMENT: Malformed ticket`"); + var dummyFlightInfo = constructDummyFlightInfo(); + getResultFromDorisBe(flightInfo.first(), dummyFlightInfo.first(), dummyFlightInfo.second()); + System.out.println(); + + flightInfo.third().close(flightInfo.second()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + Configuration configuration = new Configuration(args); + run(configuration); + } +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/JdbcDriverManager.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/JdbcDriverManager.java new file mode 100644 index 00000000000..3cc07c73847 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/JdbcDriverManager.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import doris.arrowflight.demo.JdbcResultSetReader.LoadJdbcResultSetFunc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Objects; + +/** + * Use the Java JDBC DriverManager to connect to the Doris Arrow Flight server and execute query. + * Usually, DriverManager is used to connect to the database using the Mysql protocol in Java. only need to replace + * `jdbc:mysql` in the URI with `jdbc:arrow-flight-sql` to connect to the database using the Arrow Flight SQL protocol + * (provided that the database implements the Arrow Flight server). + */ +public class JdbcDriverManager { + private static void connectAndExecute(Configuration configuration, String urlPrefix, String port, + LoadJdbcResultSetFunc loadJdbcResultSetFunc) { + String DB_URL = urlPrefix + "://" + configuration.ip + ":" + port + "?useServerPrepStmts=false" + + "&cachePrepStmts=true&useSSL=false&useEncryption=false"; + try { + long start = System.currentTimeMillis(); + Connection conn = DriverManager.getConnection(DB_URL, configuration.user, configuration.password); + Statement stmt = conn.createStatement(); + stmt.execute(configuration.sql); + + final ResultSet resultSet = stmt.getResultSet(); + loadJdbcResultSetFunc.load(resultSet); + System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start)); + + stmt.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run(Configuration configuration) { + System.out.println("*************************************"); + System.out.println("| JdbcDriverManager |"); + System.out.println("*************************************"); + + try { + if (!Objects.equals(configuration.mysqlPort, "")) { + Class.forName("com.mysql.cj.jdbc.Driver"); + System.out.println("JdbcDriverManager > jdbc:mysql > loadJdbcResult"); + connectAndExecute(configuration, "jdbc:mysql", configuration.mysqlPort, + JdbcResultSetReader.loadJdbcResult); + System.out.println("JdbcDriverManager > jdbc:mysql > loadJdbcResultToString"); + connectAndExecute(configuration, "jdbc:mysql", configuration.mysqlPort, + JdbcResultSetReader.loadJdbcResultToString); + } + + Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver"); + System.out.println("JdbcDriverManager > jdbc:arrow-flight-sql > loadJdbcResultToString"); + connectAndExecute(configuration, "jdbc:arrow-flight-sql", configuration.arrowFlightPort, + JdbcResultSetReader.loadJdbcResult); + System.out.println("JdbcDriverManager > jdbc:arrow-flight-sql > loadJdbcResultToString"); + connectAndExecute(configuration, "jdbc:arrow-flight-sql", configuration.arrowFlightPort, + JdbcResultSetReader.loadJdbcResultToString); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) throws ClassNotFoundException { + Configuration configuration = new Configuration(args); + for (int i = 0; i < configuration.retryTimes; i++) { + run(configuration); + } + } +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/JdbcResultSetReader.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/JdbcResultSetReader.java new file mode 100644 index 00000000000..ac9f24a5ca5 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/JdbcResultSetReader.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +/** + * Iterate over each row in jdbc ResultSet. + */ +public class JdbcResultSetReader { + + @FunctionalInterface + public interface LoadJdbcResultSetFunc { + void load(ResultSet resultSet) throws IOException, SQLException; + } + + public static LoadJdbcResultSetFunc loadJdbcResult = resultSet -> { + int rowCount = 0; + final int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + rowCount += 1; + } + System.out.println("> rowCount: " + rowCount + ", columnCount: " + columnCount); + }; + + public static LoadJdbcResultSetFunc loadJdbcResultToString = resultSet -> { + int rowCount = 0; + final int columnCount = resultSet.getMetaData().getColumnCount(); + List<String> result = new ArrayList<>(); + while (resultSet.next()) { + StringBuilder line = new StringBuilder(); + for (int i = 1; i <= columnCount; i++) { + line.append(resultSet.getString(i)).append(","); + } + if (rowCount == 0) { // only print first line + System.out.println("> " + line); + } + rowCount += 1; + result.add(line.toString()); + } + System.out.println( + "> rowCount: " + rowCount + ", columnCount: " + columnCount + " resultSize: " + result.size()); + }; +} diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/Main.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/Main.java new file mode 100644 index 00000000000..2f9a8d20a87 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/Main.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +public class Main { + public static void main(String[] args) { + Configuration configuration = new Configuration(args); + for (int i = 0; i < configuration.retryTimes; i++) { + FlightAdbcDriver.run(configuration); + FlightJdbcDriver.run(configuration); + JdbcDriverManager.run(configuration); + } + FlightSqlClient.run(configuration); + } +} diff --git a/samples/arrow-flight-sql/java/src/test/java/doris/arrowflight/demo/ConfigurationTest.java b/samples/arrow-flight-sql/java/src/test/java/doris/arrowflight/demo/ConfigurationTest.java new file mode 100644 index 00000000000..4614730ede6 --- /dev/null +++ b/samples/arrow-flight-sql/java/src/test/java/doris/arrowflight/demo/ConfigurationTest.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package doris.arrowflight.demo; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; + +/** + * Unit test for simple App. + */ +public class ConfigurationTest { + + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() { + assertTrue(true); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org