This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 74c94c3587ce Revert "[SPARK-50849][CONNECT] Add example project to
demonstrate Spark Connect Server Libraries"
74c94c3587ce is described below
commit 74c94c3587ce8149d31331cff9d8078bc51c0193
Author: yangjie01 <[email protected]>
AuthorDate: Tue Jun 24 21:36:30 2025 +0800
Revert "[SPARK-50849][CONNECT] Add example project to demonstrate Spark
Connect Server Libraries"
### What changes were proposed in this pull request?
This PR reverts the example code provided in SPARK-50849 because it fails
to compile properly.
### Why are the changes needed?
An example that fails to compile properly should not be provided to users.
The following error occurs when executing `mvn clean install -DskipTests`
in the `connect-examples/server-library-example `directory:
```
[ERROR]
/Users/yangjie01/SourceCode/git/spark-maven/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala:87:
overloaded method setExtension with alternatives:
(x$1:
org.sparkproject.com.google.protobuf.Any.Builder)org.apache.spark.connect.proto.Command.Builder
<and>
(x$1:
org.sparkproject.com.google.protobuf.Any)org.apache.spark.connect.proto.Command.Builder
cannot be applied to (com.google.protobuf.Any)
[ERROR]
/Users/yangjie01/SourceCode/git/spark-maven/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala:115:
overloaded method setExtension with alternatives:
(x$1:
org.sparkproject.com.google.protobuf.Any.Builder)org.apache.spark.connect.proto.Relation.Builder
<and>
(x$1:
org.sparkproject.com.google.protobuf.Any)org.apache.spark.connect.proto.Relation.Builder
cannot be applied to (com.google.protobuf.Any)
[ERROR]
/Users/yangjie01/SourceCode/git/spark-maven/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala:115:
overloaded method setExtension with alternatives:
(x$1:
org.sparkproject.com.google.protobuf.Any.Builder)org.apache.spark.connect.proto.Command.Builder
<and>
(x$1:
org.sparkproject.com.google.protobuf.Any)org.apache.spark.connect.proto.Command.Builder
cannot be applied to (com.google.protobuf.Any)
[ERROR] three errors found
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51261 from LuciferYang/revert-SPARK-50849.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
connect-examples/server-library-example/README.md | 133 ------------------
.../server-library-example/client/pom.xml | 112 ----------------
.../client/src/main/resources/log4j2.xml | 30 -----
.../examples/serverlibrary/CustomTable.scala | 149 ---------------------
.../serverlibrary/CustomTableBuilder.scala | 135 -------------------
.../serverlibrary/CustomTableExample.scala | 60 ---------
.../server-library-example/common/pom.xml | 76 -----------
.../common/src/main/protobuf/base.proto | 28 ----
.../common/src/main/protobuf/commands.proto | 61 ---------
.../common/src/main/protobuf/relations.proto | 34 -----
connect-examples/server-library-example/pom.xml | 42 ------
.../resources/dummy_data.data | 6 -
.../server-library-example/server/pom.xml | 90 -------------
.../serverlibrary/CustomCommandPlugin.scala | 134 ------------------
.../examples/serverlibrary/CustomPluginBase.scala | 27 ----
.../serverlibrary/CustomRelationPlugin.scala | 91 -------------
.../examples/serverlibrary/CustomTable.scala | 131 ------------------
17 files changed, 1339 deletions(-)
diff --git a/connect-examples/server-library-example/README.md
b/connect-examples/server-library-example/README.md
deleted file mode 100644
index adf4830d58ff..000000000000
--- a/connect-examples/server-library-example/README.md
+++ /dev/null
@@ -1,133 +0,0 @@
-# Spark Server Library Example - Custom Datasource Handler
-
-This example demonstrates a modular maven-based project architecture with
separate client, server
-and common components. It leverages the extensibility of Spark Connect to
create a server library
-that may be attached to the server to extend the functionality of the Spark
Connect server as a whole. Below is a detailed overview of the setup and
functionality.
-
-## Project Structure
-
-```
-├── common/ # Shared protobuf/utilities/classes
-├── client/ # Sample client implementation
-│ ├── src/ # Source code for client functionality
-│ ├── pom.xml # Maven configuration for the client
-├── server/ # Server-side plugin extension
-│ ├── src/ # Source code for server functionality
-│ ├── pom.xml # Maven configuration for the server
-├── resources/ # Static resources
-├── pom.xml # Parent Maven configuration
-```
-
-## Functionality Overview
-
-To demonstrate the extensibility of Spark Connect, a custom datasource
handler, `CustomTable` is
-implemented in the server module. The class handles reading, writing and
processing data stored in
-a custom format, here we simply use the `.custom` extension (which itself is a
wrapper over `.csv`
-files).
-
-First and foremost, the client and the server must be able to communicate with
each other through
-custom messages that 'understand' our custom data format. This is achieved by
defining custom
-protobuf messages in the `common` module. The client and server modules both
depend on the `common`
-module to access these messages.
-- `common/src/main/protobuf/base.proto`: Defines the base `CustomTable` which
is simply represented
-by a path and a name.
-```protobuf
-message CustomTable {
- string path = 1;
- string name = 2;
-}
-```
-- `common/src/main/protobuf/commands.proto`: Defines the custom commands that
the client can send
-to the server. These commands are typically operations that the server can
perform, such as cloning
-an existing custom table.
-```protobuf
-message CustomCommand {
- oneof command_type {
- CreateTable create_table = 1;
- CloneTable clone_table = 2;
- }
-}
-```
-- `common/src/main/protobuf/relations.proto`: Defines custom `relations`,
which are a mechanism through which an optional input dataset is transformed
into an
- output dataset such as a Scan.
-```protobuf
-message Scan {
- CustomTable table = 1;
-}
-```
-
-On the client side, the `CustomTable` class mimics the style of Spark's
`Dataset` API, allowing the
-user to perform and chain operations on a `CustomTable` object.
-
-On the server side, a similar `CustomTable` class is implemented to handle the
core functionality of
-reading, writing and processing data in the custom format. The plugins
(`CustomCommandPlugin` and
-`CustomRelationPlugin`) are responsible for processing the custom protobuf
messages sent from the client
-(those defined in the `common` module) and delegating the appropriate actions
to the `CustomTable`.
-
-
-
-## Build and Run Instructions
-
-1. **Navigate to the sample project from `SPARK_HOME`**:
- ```bash
- cd connect-examples/server-library-example
- ```
-
-2. **Build and package the modules**:
- ```bash
- mvn clean package
- ```
-
-3. **Download the `4.0.0-preview2` release to use as the Spark Connect
Server**:
- - Choose a distribution from
https://archive.apache.org/dist/spark/spark-4.0.0-preview2/.
- - Example: `curl -L
https://archive.apache.org/dist/spark/spark-4.0.0-preview2/spark-4.0.0-preview2-bin-hadoop3.tgz
| tar xz`
-
-4. **Copy relevant JARs to the root of the unpacked Spark distribution**:
- ```bash
- cp \
-
<SPARK_HOME>/connect-examples/server-library-example/common/target/spark-daria_2.13-1.2.3.jar
\
-
<SPARK_HOME>/connect-examples/server-library-example/common/target/spark-server-library-example-common-1.0.0.jar
\
-
<SPARK_HOME>/connect-examples/server-library-example/server/target/spark-server-library-example-server-extension-1.0.0.jar
\
- .
- ```
-5. **Start the Spark Connect Server with the relevant JARs**:
- ```bash
- bin/spark-connect-shell \
- --jars
spark-server-library-example-server-extension-1.0.0.jar,spark-server-library-example-common-1.0.0.jar,spark-daria_2.13-1.2.3.jar
\
- --conf
spark.connect.extensions.relation.classes=org.apache.connect.examples.serverlibrary.CustomRelationPlugin
\
- --conf
spark.connect.extensions.command.classes=org.apache.connect.examples.serverlibrary.CustomCommandPlugin
- ```
-6. **In a different terminal, navigate back to the root of the sample project
and start the client**:
- ```bash
- java -cp client/target/spark-server-library-client-package-scala-1.0.0.jar
org.apache.connect.examples.serverlibrary.CustomTableExample
- ```
-7. **Notice the printed output in the client terminal as well as the creation
of the cloned table**:
-```protobuf
-Explaining plan for custom table: sample_table with path:
<SPARK_HOME>/spark/connect-examples/server-library-example/client/../resources/dummy_data.custom
-== Parsed Logical Plan ==
-Relation [id#2,name#3] csv
-
-== Analyzed Logical Plan ==
-id: int, name: string
-Relation [id#2,name#3] csv
-
-== Optimized Logical Plan ==
-Relation [id#2,name#3] csv
-
-== Physical Plan ==
-FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV,
Location: InMemoryFileIndex(1
paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
-
-Explaining plan for custom table: cloned_table with path:
<SPARK_HOME>/connect-examples/server-library-example/client/../resources/cloned_data.data
-== Parsed Logical Plan ==
-Relation [id#2,name#3] csv
-
-== Analyzed Logical Plan ==
-id: int, name: string
-Relation [id#2,name#3] csv
-
-== Optimized Logical Plan ==
-Relation [id#2,name#3] csv
-
-== Physical Plan ==
-FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV,
Location: InMemoryFileIndex(1
paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
-```
\ No newline at end of file
diff --git a/connect-examples/server-library-example/client/pom.xml
b/connect-examples/server-library-example/client/pom.xml
deleted file mode 100644
index 364920a2ec22..000000000000
--- a/connect-examples/server-library-example/client/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.connect.examples.serverlibrary</groupId>
- <artifactId>spark-server-library-example</artifactId>
- <version>1.0.0</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>spark-server-library-client-package-scala</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <!-- Custom Proto definitions are in the common module -->
- <dependency>
- <groupId>org.apache.connect.examples.serverlibrary</groupId>
- <artifactId>spark-server-library-example-common</artifactId>
- <version>1.0.0</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- spark-connect-common contains proto definitions that we require to
build custom commands/relations/expressions -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-connect-common_${scala.binary}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!-- Dependency on the spark connect client module to interact with the
Spark server -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-connect-client-jvm_${scala.binary}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!-- Dependency on the scala library -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <plugins>
- <!-- Scala -->
- <plugin>
- <!-- see http://davidb.github.com/scala-maven-plugin -->
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>4.9.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- Shade plugin for creating a fat JAR -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.5.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
-
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- <!--SPARK-42228: Add `ServicesResourceTransformer` to relocation
class names in META-INF/services for grpc-->
- <transformers>
- <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git
a/connect-examples/server-library-example/client/src/main/resources/log4j2.xml
b/connect-examples/server-library-example/client/src/main/resources/log4j2.xml
deleted file mode 100644
index 21b0d9719193..000000000000
---
a/connect-examples/server-library-example/client/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<Configuration status="WARN">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level
%logger{36} - %msg%n" />
- </Console>
- </Appenders>
- <Loggers>
- <Root level="info">
- <AppenderRef ref="Console" />
- </Root>
- </Loggers>
-</Configuration>
diff --git
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
deleted file mode 100644
index 782a246d9298..000000000000
---
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import com.google.protobuf.Any
-import org.apache.spark.connect.proto.Command
-import org.apache.spark.sql.{functions, Column, DataFrame, Dataset, Row,
SparkSession}
-
-import org.apache.connect.examples.serverlibrary.proto
-import
org.apache.connect.examples.serverlibrary.proto.CreateTable.Column.{DataType =>
ProtoDataType}
-
-/**
- * Represents a custom table with associated DataFrame and metadata.
- *
- * @param df The underlying DataFrame.
- * @param table The metadata of the custom table.
- */
-class CustomTable private (private val df: Dataset[Row], private val table:
proto.CustomTable) {
-
- /**
- * Returns the Spark session associated with the DataFrame.
- */
- private def spark = df.sparkSession
-
- /**
- * Converts the custom table to a DataFrame.
- *
- * @return The underlying DataFrame.
- */
- def toDF: Dataset[Row] = df
-
- /**
- * Prints the execution plan of the custom table.
- */
- def explain(): Unit = {
- println(s"Explaining plan for custom table: ${table.getName} with path:
${table.getPath}")
- df.explain("extended")
- }
-
- /**
- * Clones the custom table to a new location with a new name.
- *
- * @param target The target path for the cloned table.
- * @param newName The new name for the cloned table.
- * @param replace Whether to replace the target location if it exists.
- * @return A new `CustomTable` instance representing the cloned table.
- */
- def clone(target: String, newName: String, replace: Boolean): CustomTable = {
- val cloneTableProto = proto.CloneTable
- .newBuilder()
- .setTable(
- proto.CustomTable
- .newBuilder()
- .setName(table.getName)
- .setPath(table.getPath))
- .setClone(
- proto.CustomTable
- .newBuilder()
- .setName(newName)
- .setPath(target))
- .setReplace(replace)
- .build()
- val customCommand = proto.CustomCommand
- .newBuilder()
- .setCloneTable(cloneTableProto)
- .build()
- // Pack the CustomCommand into Any
- val customCommandAny = Any.pack(customCommand)
- // Set the Any as the extension of a Command
- val commandProto = Command
- .newBuilder()
- .setExtension(customCommandAny)
- .build()
- // Execute the command
- spark.execute(commandProto)
- CustomTable.from(spark, newName, target)
- }
-}
-
-object CustomTable {
- /**
- * Creates a `CustomTable` from the given Spark session, name, and path.
- *
- * @param spark The Spark session.
- * @param name The name of the table.
- * @param path The path of the table.
- * @return A new `CustomTable` instance.
- */
- def from(spark: SparkSession, name: String, path: String): CustomTable = {
- val table = proto.CustomTable
- .newBuilder()
- .setName(name)
- .setPath(path)
- .build()
- val relation = proto.CustomRelation
- .newBuilder()
- .setScan(proto.Scan.newBuilder().setTable(table))
- .build()
- val customRelation = Any.pack(relation)
- val df = spark.newDataFrame(f => f.setExtension(customRelation))
- new CustomTable(df, table)
- }
-
- /**
- * Creates a new `CustomTableBuilder` instance.
- *
- * @param spark The Spark session.
- * @return A new `CustomTableBuilder` instance.
- */
- def create(spark: SparkSession): CustomTableBuilder = new
CustomTableBuilder(spark)
-
- /**
- * Enumeration for data types.
- */
- object DataType extends Enumeration {
- type DataType = Value
- val Int, String, Float, Boolean = Value
-
- /**
- * Converts a `DataType` to its corresponding `ProtoDataType`.
- *
- * @param dataType The data type to convert.
- * @return The corresponding `ProtoDataType`.
- */
- def toProto(dataType: DataType): ProtoDataType = {
- dataType match {
- case Int => ProtoDataType.INT
- case String => ProtoDataType.STRING
- case Float => ProtoDataType.FLOAT
- case Boolean => ProtoDataType.BOOLEAN
- }
- }
- }
-}
diff --git
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala
deleted file mode 100644
index a1b8ffdb8dd7..000000000000
---
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import com.google.protobuf.Any
-import org.apache.spark.connect.proto.Command
-import org.apache.spark.sql.SparkSession
-
-import org.apache.connect.examples.serverlibrary.CustomTable
-
-/**
- * Builder class for constructing a `CustomTable` instance.
- *
- * @param spark The Spark session.
- */
-class CustomTableBuilder private[serverlibrary] (spark: SparkSession) {
- import CustomTableBuilder._
-
- private var name: String = _
- private var path: String = _
- private var columns: Seq[Column] = Seq.empty
-
- /**
- * Sets the name of the custom table.
- *
- * @param name The name of the table.
- * @return The current `CustomTableBuilder` instance.
- */
- def name(name: String): CustomTableBuilder = {
- this.name = name
- this
- }
-
- /**
- * Sets the path of the custom table.
- *
- * @param path The path of the table.
- * @return The current `CustomTableBuilder` instance.
- */
- def path(path: String): CustomTableBuilder = {
- this.path = path
- this
- }
-
- /**
- * Adds a column to the custom table.
- *
- * @param name The name of the column.
- * @param dataType The data type of the column.
- * @return The current `CustomTableBuilder` instance.
- */
- def addColumn(name: String, dataType: CustomTable.DataType.Value):
CustomTableBuilder = {
- columns = columns :+ Column(name, dataType)
- this
- }
-
- /**
- * Builds the `CustomTable` instance.
- *
- * @return A new `CustomTable` instance.
- * @throws IllegalArgumentException if name, path, or columns are not set.
- */
- def build(): CustomTable = {
- require(name != null, "Name must be set")
- require(path != null, "Path must be set")
- require(columns.nonEmpty, "At least one column must be added")
-
- // Define the table creation proto
- val createTableProtoBuilder = proto.CreateTable
- .newBuilder()
- .setTable(
- proto.CustomTable
- .newBuilder()
- .setPath(path)
- .setName(name)
- .build())
-
- // Add columns to the table creation proto
- columns.foreach { column =>
- createTableProtoBuilder.addColumns(
- proto.CreateTable.Column
- .newBuilder()
- .setName(column.name)
- .setDataType(CustomTable.DataType.toProto(column.dataType))
- .build())
- }
- val createTableProto = createTableProtoBuilder.build() // Build the
CreateTable object
-
- // Wrap the CreateTable proto in CustomCommand
- val customCommand = proto.CustomCommand
- .newBuilder()
- .setCreateTable(createTableProto)
- .build()
-
- // Pack the CustomCommand into Any
- val customCommandAny = Any.pack(customCommand)
- // Set the Any as the extension of a Command
- val commandProto = Command
- .newBuilder()
- .setExtension(customCommandAny)
- .build()
-
- // Execute the command
- spark.execute(commandProto)
-
- // After the command is executed, create a client-side representation of
the table with the
- // leaf node of the client-side dataset being the Scan node for the custom
table.
- CustomTable.from(spark, name, path)
- }
-}
-
-object CustomTableBuilder {
- /**
- * Case class representing a column in the custom table.
- *
- * @param name The name of the column.
- * @param dataType The data type of the column.
- */
- private case class Column(name: String, dataType: CustomTable.DataType.Value)
-}
diff --git
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableExample.scala
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableExample.scala
deleted file mode 100644
index 8470465cd7a0..000000000000
---
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableExample.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import java.nio.file.{Path, Paths}
-
-import com.google.protobuf.Any
-import org.apache.spark.connect.proto.Command
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType}
-
-import org.apache.connect.examples.serverlibrary.proto
-import
org.apache.connect.examples.serverlibrary.proto.CreateTable.Column.{DataType =>
ProtoDataType}
-
-object CustomTableExample {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder().remote("sc://localhost").build()
-
- // Step 1: Create a custom table from existing data
- val tableName = "sample_table"
- val fileName = "dummy_data.data"
-
- val workingDirectory = System.getProperty("user.dir")
- val fullPath = Paths.get(workingDirectory, s"resources/$fileName")
- val customTable = CustomTable
- .create(spark)
- .name(tableName)
- .path(fullPath.toString)
- .addColumn("id", CustomTable.DataType.Int)
- .addColumn("name", CustomTable.DataType.String)
- .build()
-
- // Step 2: Verify
- customTable.explain()
-
- // Step 3: Clone the custom table
- val clonedPath = fullPath.getParent.resolve("cloned_data.data")
- val clonedName = "cloned_table"
- val clonedTable =
- customTable.clone(target = clonedPath.toString, newName = clonedName,
replace = true)
-
- // Step 4: Verify
- clonedTable.explain()
- }
-}
diff --git a/connect-examples/server-library-example/common/pom.xml
b/connect-examples/server-library-example/common/pom.xml
deleted file mode 100644
index 592c43f26770..000000000000
--- a/connect-examples/server-library-example/common/pom.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.connect.examples.serverlibrary</groupId>
- <artifactId>spark-server-library-example</artifactId>
- <version>1.0.0</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>spark-server-library-example-common</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <!--Required to compile the proto files-->
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <!-- os-maven-plugin helps resolve ${os.detected.classifier}
automatically -->
- <plugin>
- <groupId>kr.motd.maven</groupId>
- <artifactId>os-maven-plugin</artifactId>
- <version>1.7.0</version>
- <extensions>true</extensions>
- </plugin>
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <version>0.6.1</version>
- <configuration>
-
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
- <protoSourceRoot>src/main/protobuf</protoSourceRoot>
- </configuration>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git
a/connect-examples/server-library-example/common/src/main/protobuf/base.proto
b/connect-examples/server-library-example/common/src/main/protobuf/base.proto
deleted file mode 100644
index 9d902a587ed3..000000000000
---
a/connect-examples/server-library-example/common/src/main/protobuf/base.proto
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = 'proto3';
-
-option java_multiple_files = true;
-option java_package = "org.apache.connect.examples.serverlibrary.proto";
-
-message CustomTable {
- // Path to the custom table.
- string path = 1;
- // Name of the custom table.
- string name = 2;
-}
diff --git
a/connect-examples/server-library-example/common/src/main/protobuf/commands.proto
b/connect-examples/server-library-example/common/src/main/protobuf/commands.proto
deleted file mode 100644
index 13d9945cfe61..000000000000
---
a/connect-examples/server-library-example/common/src/main/protobuf/commands.proto
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = 'proto3';
-
-option java_multiple_files = true;
-option java_package = "org.apache.connect.examples.serverlibrary.proto";
-
-import "base.proto";
-
-message CustomCommand {
- oneof command_type {
- CreateTable create_table = 1;
- CloneTable clone_table = 2;
- }
-}
-
-message CreateTable {
-
- // Column in the schema of the table.
- message Column {
- // (Required) Name of the column.
- string name = 1;
- // (Required) Data type of the column.
- enum DataType {
- DATA_TYPE_UNSPECIFIED = 0; // Default value
- INT = 1; // Integer data type
- STRING = 2; // String data type
- FLOAT = 3; // Float data type
- BOOLEAN = 4; // Boolean data type
- }
- DataType data_type = 2;
- }
- // (Required) Table properties.
- CustomTable table = 1;
- // (Required) List of columns in the schema of the table.
- repeated Column columns = 2;
-}
-
-message CloneTable {
- // (Required) The source table to clone.
- CustomTable table = 1;
- // (Required) Path to the location where the data of the cloned table should
be stored.
- CustomTable clone = 2;
- // (Required) Overwrites the target location when true.
- bool replace = 3;
-}
diff --git
a/connect-examples/server-library-example/common/src/main/protobuf/relations.proto
b/connect-examples/server-library-example/common/src/main/protobuf/relations.proto
deleted file mode 100644
index 1ebf0e640bef..000000000000
---
a/connect-examples/server-library-example/common/src/main/protobuf/relations.proto
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = 'proto3';
-
-option java_multiple_files = true;
-option java_package = "org.apache.connect.examples.serverlibrary.proto";
-
-import "base.proto";
-
-message CustomRelation {
- oneof relation_type {
- Scan scan = 1;
- }
-}
-
-message Scan {
- // (Required) Table to scan.
- CustomTable table = 1;
-}
diff --git a/connect-examples/server-library-example/pom.xml
b/connect-examples/server-library-example/pom.xml
deleted file mode 100644
index 1723f3b0154f..000000000000
--- a/connect-examples/server-library-example/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.apache.connect.examples.serverlibrary</groupId>
- <artifactId>spark-server-library-example</artifactId>
- <version>1.0.0</version>
- <packaging>pom</packaging>
- <modules>
- <module>common</module>
- <module>server</module>
- <module>client</module>
- </modules>
- <properties>
- <maven.compiler.source>17</maven.compiler.source>
- <maven.compiler.target>17</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <scala.binary>2.13</scala.binary>
- <scala.version>2.13.15</scala.version>
- <protobuf.version>3.25.4</protobuf.version>
- <spark.version>4.0.0-preview2</spark.version>
- </properties>
-</project>
diff --git a/connect-examples/server-library-example/resources/dummy_data.data
b/connect-examples/server-library-example/resources/dummy_data.data
deleted file mode 100644
index 0a6645b75722..000000000000
--- a/connect-examples/server-library-example/resources/dummy_data.data
+++ /dev/null
@@ -1,6 +0,0 @@
-id,name
-1,John Doe
-2,Jane Smith
-3,Bob Johnson
-4,Alice Williams
-5,Charlie Brown
\ No newline at end of file
diff --git a/connect-examples/server-library-example/server/pom.xml
b/connect-examples/server-library-example/server/pom.xml
deleted file mode 100644
index b13a7537f9c1..000000000000
--- a/connect-examples/server-library-example/server/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.connect.examples.serverlibrary</groupId>
- <artifactId>spark-server-library-example</artifactId>
- <version>1.0.0</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>spark-server-library-example-server-extension</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <!-- Custom Proto definitions are in the common module -->
- <dependency>
- <groupId>org.apache.connect.examples.serverlibrary</groupId>
- <artifactId>spark-server-library-example-common</artifactId>
- <version>1.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-connect_${scala.binary}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- Include Spark Daria for utility Dataframe write methods -->
- <dependency>
- <groupId>com.github.mrpowers</groupId>
- <artifactId>spark-daria_${scala.binary}</artifactId>
- <version>1.2.3</version>
- </dependency>
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <plugins>
- <!-- Scala -->
- <plugin>
- <!-- see http://davidb.github.com/scala-maven-plugin -->
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>4.9.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomCommandPlugin.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomCommandPlugin.scala
deleted file mode 100644
index 2253c4b238be..000000000000
---
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomCommandPlugin.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import scala.collection.JavaConverters._
-
-import com.google.protobuf.Any
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-import org.apache.spark.sql.connect.plugin.CommandPlugin
-import org.apache.spark.sql.types.{StringType, IntegerType, FloatType,
DoubleType, BooleanType, LongType, StructType, StructField, DataType}
-
-import org.apache.connect.examples.serverlibrary.CustomTable
-import org.apache.connect.examples.serverlibrary.proto
-import
org.apache.connect.examples.serverlibrary.proto.CreateTable.Column.{DataType =>
ProtoDataType}
-
-/**
- * Commands are distinct actions that can be executed. Unlike relations, which
focus on the
- * transformation and nesting of output data, commands represent singular
operations that perform
- * specific tasks on the data.
- * In this example, the `CustomCommandPlugin` handles operations related to
creating and duplicating
- * custom tables.
- */
-class CustomCommandPlugin extends CommandPlugin with CustomPluginBase {
-
- /**
- * Processes the raw byte array containing the command.
- *
- * @param raw The raw byte array of the command.
- * @param planner The SparkConnectPlanner instance.
- * @return True if the command was processed, false otherwise.
- */
- override def process(raw: Array[Byte], planner: SparkConnectPlanner):
Boolean = {
- val command = Any.parseFrom(raw)
- if (command.is(classOf[proto.CustomCommand])) {
- processInternal(command.unpack(classOf[proto.CustomCommand]), planner)
- true
- } else {
- false
- }
- }
-
- /**
- * Processes the unpacked CustomCommand.
- *
- * @param command The unpacked CustomCommand.
- * @param planner The SparkConnectPlanner instance.
- */
- private def processInternal(
- command: proto.CustomCommand,
- planner: SparkConnectPlanner): Unit = {
- command.getCommandTypeCase match {
- case proto.CustomCommand.CommandTypeCase.CREATE_TABLE =>
- processCreateTable(planner, command.getCreateTable)
- case proto.CustomCommand.CommandTypeCase.CLONE_TABLE =>
- processCloneTable(planner, command.getCloneTable)
- case _ =>
- throw new IllegalArgumentException(
- s"Unsupported command type: ${command.getCommandTypeCase}")
- }
- }
-
- /**
- * Processes the CreateTable command.
- *
- * @param planner The SparkConnectPlanner instance.
- * @param createTable The CreateTable message.
- */
- private def processCreateTable(
- planner: SparkConnectPlanner,
- createTable: proto.CreateTable): Unit = {
- val tableName = createTable.getTable.getName
- val tablePath = createTable.getTable.getPath
-
- // Convert the list of columns from the protobuf message to a Spark schema
- val schema = StructType(createTable.getColumnsList.asScala.toSeq.map {
column =>
- StructField(
- column.getName,
- protoDataTypeToSparkType(column.getDataType),
- nullable = true // Assuming all columns are nullable for simplicity
- )
- })
-
- // Create the table using the CustomTable utility
- CustomTable.createTable(tableName, tablePath, planner.session, schema)
- }
-
- /**
- * Converts a protobuf DataType to a Spark DataType.
- *
- * @param protoType The protobuf DataType.
- * @return The corresponding Spark DataType.
- */
- private def protoDataTypeToSparkType(protoType: ProtoDataType): DataType = {
- protoType match {
- case ProtoDataType.INT => IntegerType
- case ProtoDataType.STRING => StringType
- case ProtoDataType.FLOAT => FloatType
- case ProtoDataType.BOOLEAN => BooleanType
- case _ =>
- throw new IllegalArgumentException(s"Unsupported or unknown data type:
${protoType}")
- }
- }
-
- /**
- * Processes the CloneTable command.
- *
- * @param planner The SparkConnectPlanner instance.
- * @param msg The CloneTable message.
- */
- private def processCloneTable(planner: SparkConnectPlanner, msg:
proto.CloneTable): Unit = {
- val sourceTable = getCustomTable(msg.getTable)
- CustomTable.cloneTable(
- sourceTable,
- msg.getClone.getName,
- msg.getClone.getPath,
- msg.getReplace)
- }
-}
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomPluginBase.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomPluginBase.scala
deleted file mode 100644
index df73e0d9a0fb..000000000000
---
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomPluginBase.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import org.apache.connect.examples.serverlibrary.proto
-import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-
-trait CustomPluginBase {
- protected def getCustomTable(table: proto.CustomTable): CustomTable = {
- CustomTable.getTable(table.getName, table.getPath)
- }
-}
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomRelationPlugin.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomRelationPlugin.scala
deleted file mode 100644
index 7b444803a065..000000000000
---
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomRelationPlugin.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import java.util.Optional
-
-import com.google.protobuf.Any
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-import org.apache.spark.sql.connect.plugin.RelationPlugin
-
-import org.apache.connect.examples.serverlibrary.{CustomPluginBase,
CustomTable}
-import org.apache.connect.examples.serverlibrary.proto
-
-/**
- * Relations are fundamental to dataset transformations, acting as the
mechanism through which an
- * input dataset is transformed into an output dataset. Conceptually,
relations can be likened to
- * tables within a database, manipulated to achieve desired outcomes.
- * In this example, the `CustomRelationPlugin` handles the transformation
related to scanning
- * custom tables. The scan relation would appear as leaf nodes in a dataset's
associated logical
- * plan node when it involves reads from the custom tables.
- */
-class CustomRelationPlugin extends RelationPlugin with CustomPluginBase {
-
- /**
- * Transforms the raw byte array containing the relation into a Spark
logical plan.
- *
- * @param raw The raw byte array of the relation.
- * @param planner The SparkConnectPlanner instance.
- * @return An Optional containing the LogicalPlan if the relation was
processed, empty otherwise.
- */
- override def transform(
- raw: Array[Byte],
- planner: SparkConnectPlanner): Optional[LogicalPlan] = {
- val rel = Any.parseFrom(raw)
- if (rel.is(classOf[proto.CustomRelation])) {
- val customRelation = rel.unpack(classOf[proto.CustomRelation])
- // Transform the custom relation
- Optional.of(transformInner(customRelation, planner))
- } else {
- Optional.empty()
- }
- }
-
- /**
- * Transforms the unpacked CustomRelation into a Spark logical plan.
- *
- * @param relation The unpacked CustomRelation.
- * @param planner The SparkConnectPlanner instance.
- * @return The corresponding Spark LogicalPlan.
- */
- private def transformInner(
- relation: proto.CustomRelation,
- planner: SparkConnectPlanner): LogicalPlan = {
- relation.getRelationTypeCase match {
- case proto.CustomRelation.RelationTypeCase.SCAN =>
- transformScan(relation.getScan, planner)
- case _ =>
- throw new IllegalArgumentException(
- s"Unsupported relation type: ${relation.getRelationTypeCase}")
- }
- }
-
- /**
- * Transforms the Scan relation into a Spark logical plan.
- *
- * @param scan The Scan message.
- * @param planner The SparkConnectPlanner instance.
- * @return The corresponding Spark LogicalPlan.
- */
- private def transformScan(scan: proto.Scan, planner: SparkConnectPlanner):
LogicalPlan = {
- val customTable = getCustomTable(scan.getTable)
- // Convert the custom table to a DataFrame and get its logical plan
- customTable.toDF().queryExecution.analyzed
- }
-}
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
deleted file mode 100644
index e1630cc25edd..000000000000
---
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.connect.examples.serverlibrary
-
-import java.util.UUID
-
-import com.github.mrpowers.spark.daria.sql.DariaWriters
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-
-/**
- * Represents a custom table with an identifier and a DataFrame.
- *
- * @param identifier The unique identifier for the table.
- * @param df The DataFrame associated with the table.
- */
-class CustomTable private (identifier: CustomTable.Identifier, df:
Dataset[Row]) {
-
- /**
- * Returns the DataFrame associated with the table.
- *
- * @return The DataFrame.
- */
- def toDF(): Dataset[Row] = df
-
- /**
- * Writes the DataFrame to disk as a CSV file.
- */
- def flush(): Unit = {
- // Write dataset to disk as a CSV file
- DariaWriters.writeSingleFile(
- df = df,
- format = "csv",
- sc = df.sparkSession.sparkContext,
- tmpFolder = s"./${UUID.randomUUID().toString}",
- filename = identifier.path,
- saveMode = "overwrite")
- }
-}
-
-object CustomTable {
-
- /**
- * Represents the unique identifier for a custom table.
- *
- * @param name The name of the table.
- * @param path The path where the table is stored.
- */
- private case class Identifier(name: String, path: String)
-
- // Collection holding all the CustomTable instances and searchable by the
identifier
- private val tablesByIdentifier = scala.collection.mutable.Map[Identifier,
CustomTable]()
-
- /**
- * Creates a new custom table.
- *
- * @param name The name of the table.
- * @param path The path where the table is stored.
- * @param spark The SparkSession instance.
- * @param schema The schema of the table.
- * @return The created CustomTable instance.
- */
- private[serverlibrary] def createTable(
- name: String,
- path: String,
- spark: SparkSession,
- schema: StructType): CustomTable = {
- val identifier = Identifier(name, path)
- val df = spark.read
- .option("header", "true")
- .schema(schema)
- .csv(path)
- val table = new CustomTable(identifier, df)
- tablesByIdentifier(identifier) = table
- table
- }
-
- /**
- * Clones an existing custom table.
- *
- * @param sourceTable The source table to clone.
- * @param newName The name of the new table.
- * @param newPath The path where the new table will be stored.
- * @param replace Whether to replace the existing table if it exists.
- * @return The cloned CustomTable instance.
- */
- private[serverlibrary] def cloneTable(
- sourceTable: CustomTable,
- newName: String,
- newPath: String,
- replace: Boolean): CustomTable = {
- val newIdentifier = Identifier(newName, newPath)
- val clonedDf = sourceTable.toDF()
- val clonedTable = new CustomTable(newIdentifier, clonedDf)
- clonedTable.flush()
- tablesByIdentifier(newIdentifier) = clonedTable
- clonedTable
- }
-
- /**
- * Retrieves a custom table based on its identifier.
- *
- * @param name The name of the table.
- * @param path The path where the table is stored.
- * @return The CustomTable instance.
- * @throws IllegalArgumentException if the table is not found.
- */
- def getTable(name: String, path: String): CustomTable = {
- val identifier = Identifier(name, path)
- tablesByIdentifier.get(identifier) match {
- case Some(table) => table
- case None =>
- throw new IllegalArgumentException(s"Table with identifier $identifier
not found")
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]