This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd2b4782ccf8 [SPARK-50849][CONNECT] Add example project to demonstrate
Spark Connect Server Libraries
bd2b4782ccf8 is described below
commit bd2b4782ccf88575a60505e4064417689d6236a9
Author: vicennial <[email protected]>
AuthorDate: Mon Feb 17 09:43:59 2025 -0400
[SPARK-50849][CONNECT] Add example project to demonstrate Spark Connect
Server Libraries
### What changes were proposed in this pull request?
This PR adds a sample project, `server-library-example` (under a new
directory `connect-examples`) to demonstrate the workings of using Spark
Connect Server Libraries (see https://github.com/apache/spark/pull/48922 for
context).
The sample project contains several modules (`common`, `server` and
`client`) to showcase how a user may choose to extend the Spark Connect
protocol with custom functionality.
### Why are the changes needed?
Currently, there are limited resources and documentation to aid a user in
building their own Spark Connect Server Libraries. This PR aims to bridge this
gap by providing an exoskeleton of a project to work with.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Copilot
-------------------- Render of `README.md` below ----------------
# Spark Server Library Example - Custom Datasource Handler
This example demonstrates a modular maven-based project architecture with
separate client, server
and common components. It leverages the extensibility of Spark Connect to
create a server library
that may be attached to the server to extend the functionality of the Spark
Connect server as a whole. Below is a detailed overview of the setup and
functionality.
## Project Structure
```
├── common/ # Shared protobuf/utilities/classes
├── client/ # Sample client implementation
│ ├── src/ # Source code for client functionality
│ ├── pom.xml # Maven configuration for the client
├── server/ # Server-side plugin extension
│ ├── src/ # Source code for server functionality
│ ├── pom.xml # Maven configuration for the server
├── resources/ # Static resources
├── pom.xml # Parent Maven configuration
```
## Functionality Overview
To demonstrate the extensibility of Spark Connect, a custom datasource
handler, `CustomTable` is
implemented in the server module. The class handles reading, writing and
processing data stored in
a custom format, here we simply use the `.custom` extension (which itself
is a wrapper over `.csv`
files).
First and foremost, the client and the server must be able to communicate
with each other through
custom messages that 'understand' our custom data format. This is achieved
by defining custom
protobuf messages in the `common` module. The client and server modules
both depend on the `common`
module to access these messages.
- `common/src/main/protobuf/base.proto`: Defines the base `CustomTable`
which is simply represented
by a path and a name.
```protobuf
message CustomTable {
string path = 1;
string name = 2;
}
```
- `common/src/main/protobuf/commands.proto`: Defines the custom commands
that the client can send
to the server. These commands are typically operations that the server can
perform, such as cloning
an existing custom table.
```protobuf
message CustomCommand {
oneof command_type {
CreateTable create_table = 1;
CloneTable clone_table = 2;
}
}
```
- `common/src/main/protobuf/relations.proto`: Defines custom `relations`,
which are a mechanism through which an optional input dataset is transformed
into an
output dataset such as a Scan.
```protobuf
message Scan {
CustomTable table = 1;
}
```
On the client side, the `CustomTable` class mimics the style of Spark's
`Dataset` API, allowing the
user to perform and chain operations on a `CustomTable` object.
On the server side, a similar `CustomTable` class is implemented to handle
the core functionality of
reading, writing and processing data in the custom format. The plugins
(`CustomCommandPlugin` and
`CustomRelationPlugin`) are responsible for processing the custom protobuf
messages sent from the client
(those defined in the `common` module) and delegating the appropriate
actions to the `CustomTable`.
## Build and Run Instructions
1. **Navigate to the sample project from `SPARK_HOME`**:
```bash
cd connect-examples/server-library-example
```
2. **Build and package the modules**:
```bash
mvn clean package
```
3. **Download the `4.0.0-preview2` release to use as the Spark Connect
Server**:
- Choose a distribution from
https://archive.apache.org/dist/spark/spark-4.0.0-preview2/.
- Example: `curl -L
https://archive.apache.org/dist/spark/spark-4.0.0-preview2/spark-4.0.0-preview2-bin-hadoop3.tgz
| tar xz`
4. **Copy relevant JARs to the root of the unpacked Spark distribution**:
```bash
cp \
<SPARK_HOME>/connect-examples/server-library-example/resources/spark-daria_2.13-1.2.3.jar
\
<SPARK_HOME>/connect-examples/server-library-example/common/target/spark-server-library-example-common-1.0-SNAPSHOT.jar
\
<SPARK_HOME>/connect-examples/server-library-example/server/target/spark-server-library-example-server-extension-1.0-SNAPSHOT.jar
\
.
```
5. **Start the Spark Connect Server with the relevant JARs**:
```bash
bin/spark-connect-shell \
--jars
spark-server-library-example-server-extension,spark-server-library-example-common-1.0-SNAPSHOT.jar,spark-daria_2.13-1.2.3.jar
\
--conf
spark.connect.extensions.relation.classes=org.example.CustomRelationPlugin \
--conf
spark.connect.extensions.command.classes=org.example.CustomCommandPlugin
```
6. **In a different terminal, navigate back to the root of the sample
project and start the client**:
```bash
java -cp
client/target/spark-server-library-client-package-scala-1.0-SNAPSHOT.jar
org.example.Main
```
7. **Notice the printed output in the client terminal as well as the
creation of the cloned table**:
```protobuf
Explaining plan for custom table: sample_table with path:
<SPARK_HOME>/spark/connect-examples/server-library-example/client/../resources/dummy_data.custom
== Parsed Logical Plan ==
Relation [id#2,name#3] csv
== Analyzed Logical Plan ==
id: int, name: string
Relation [id#2,name#3] csv
== Optimized Logical Plan ==
Relation [id#2,name#3] csv
== Physical Plan ==
FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV,
Location: InMemoryFileIndex(1
paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
Explaining plan for custom table: cloned_table with path:
<SPARK_HOME>/connect-examples/server-library-example/client/../resources/cloned_data.custom
== Parsed Logical Plan ==
Relation [id#2,name#3] csv
== Analyzed Logical Plan ==
id: int, name: string
Relation [id#2,name#3] csv
== Optimized Logical Plan ==
Relation [id#2,name#3] csv
== Physical Plan ==
FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV,
Location: InMemoryFileIndex(1
paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
```
Closes #49604 from vicennial/connectExamples.
Authored-by: vicennial <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
connect-examples/server-library-example/README.md | 133 ++++++++++++++++++
.../server-library-example/client/pom.xml | 112 ++++++++++++++++
.../client/src/main/resources/log4j2.xml | 30 +++++
.../examples/serverlibrary/CustomTable.scala | 149 +++++++++++++++++++++
.../serverlibrary/CustomTableBuilder.scala | 135 +++++++++++++++++++
.../serverlibrary/CustomTableExample.scala | 60 +++++++++
.../server-library-example/common/pom.xml | 76 +++++++++++
.../common/src/main/protobuf/base.proto | 28 ++++
.../common/src/main/protobuf/commands.proto | 61 +++++++++
.../common/src/main/protobuf/relations.proto | 34 +++++
connect-examples/server-library-example/pom.xml | 42 ++++++
.../resources/dummy_data.data | 6 +
.../resources/spark-daria_2.13-1.2.3.jar | Bin 0 -> 187330 bytes
.../server-library-example/server/pom.xml | 91 +++++++++++++
.../serverlibrary/CustomCommandPlugin.scala | 134 ++++++++++++++++++
.../examples/serverlibrary/CustomPluginBase.scala | 27 ++++
.../serverlibrary/CustomRelationPlugin.scala | 91 +++++++++++++
.../examples/serverlibrary/CustomTable.scala | 131 ++++++++++++++++++
18 files changed, 1340 insertions(+)
diff --git a/connect-examples/server-library-example/README.md
b/connect-examples/server-library-example/README.md
new file mode 100644
index 000000000000..6028a66cd5c7
--- /dev/null
+++ b/connect-examples/server-library-example/README.md
@@ -0,0 +1,133 @@
+# Spark Server Library Example - Custom Datasource Handler
+
+This example demonstrates a modular maven-based project architecture with
separate client, server
+and common components. It leverages the extensibility of Spark Connect to
create a server library
+that may be attached to the server to extend the functionality of the Spark
Connect server as a whole. Below is a detailed overview of the setup and
functionality.
+
+## Project Structure
+
+```
+├── common/ # Shared protobuf/utilities/classes
+├── client/ # Sample client implementation
+│ ├── src/ # Source code for client functionality
+│ ├── pom.xml # Maven configuration for the client
+├── server/ # Server-side plugin extension
+│ ├── src/ # Source code for server functionality
+│ ├── pom.xml # Maven configuration for the server
+├── resources/ # Static resources
+├── pom.xml # Parent Maven configuration
+```
+
+## Functionality Overview
+
+To demonstrate the extensibility of Spark Connect, a custom datasource
handler, `CustomTable` is
+implemented in the server module. The class handles reading, writing and
processing data stored in
+a custom format, here we simply use the `.custom` extension (which itself is a
wrapper over `.csv`
+files).
+
+First and foremost, the client and the server must be able to communicate with
each other through
+custom messages that 'understand' our custom data format. This is achieved by
defining custom
+protobuf messages in the `common` module. The client and server modules both
depend on the `common`
+module to access these messages.
+- `common/src/main/protobuf/base.proto`: Defines the base `CustomTable` which
is simply represented
+by a path and a name.
+```protobuf
+message CustomTable {
+ string path = 1;
+ string name = 2;
+}
+```
+- `common/src/main/protobuf/commands.proto`: Defines the custom commands that
the client can send
+to the server. These commands are typically operations that the server can
perform, such as cloning
+an existing custom table.
+```protobuf
+message CustomCommand {
+ oneof command_type {
+ CreateTable create_table = 1;
+ CloneTable clone_table = 2;
+ }
+}
+```
+- `common/src/main/protobuf/relations.proto`: Defines custom `relations`,
which are a mechanism through which an optional input dataset is transformed
into an
+ output dataset such as a Scan.
+```protobuf
+message Scan {
+ CustomTable table = 1;
+}
+```
+
+On the client side, the `CustomTable` class mimics the style of Spark's
`Dataset` API, allowing the
+user to perform and chain operations on a `CustomTable` object.
+
+On the server side, a similar `CustomTable` class is implemented to handle the
core functionality of
+reading, writing and processing data in the custom format. The plugins
(`CustomCommandPlugin` and
+`CustomRelationPlugin`) are responsible for processing the custom protobuf
messages sent from the client
+(those defined in the `common` module) and delegating the appropriate actions
to the `CustomTable`.
+
+
+
+## Build and Run Instructions
+
+1. **Navigate to the sample project from `SPARK_HOME`**:
+ ```bash
+ cd connect-examples/server-library-example
+ ```
+
+2. **Build and package the modules**:
+ ```bash
+ mvn clean package
+ ```
+
+3. **Download the `4.0.0-preview2` release to use as the Spark Connect
Server**:
+ - Choose a distribution from
https://archive.apache.org/dist/spark/spark-4.0.0-preview2/.
+ - Example: `curl -L
https://archive.apache.org/dist/spark/spark-4.0.0-preview2/spark-4.0.0-preview2-bin-hadoop3.tgz
| tar xz`
+
+4. **Copy relevant JARs to the root of the unpacked Spark distribution**:
+ ```bash
+ cp \
+
<SPARK_HOME>/connect-examples/server-library-example/resources/spark-daria_2.13-1.2.3.jar
\
+
<SPARK_HOME>/connect-examples/server-library-example/common/target/spark-server-library-example-common-1.0.0.jar
\
+
<SPARK_HOME>/connect-examples/server-library-example/server/target/spark-server-library-example-server-extension-1.0.0.jar
\
+ .
+ ```
+5. **Start the Spark Connect Server with the relevant JARs**:
+ ```bash
+ bin/spark-connect-shell \
+ --jars
spark-server-library-example-server-extension-1.0.0.jar,spark-server-library-example-common-1.0.0.jar,spark-daria_2.13-1.2.3.jar
\
+ --conf
spark.connect.extensions.relation.classes=org.apache.connect.examples.serverlibrary.CustomRelationPlugin
\
+ --conf
spark.connect.extensions.command.classes=org.apache.connect.examples.serverlibrary.CustomCommandPlugin
+ ```
+6. **In a different terminal, navigate back to the root of the sample project
and start the client**:
+ ```bash
+ java -cp client/target/spark-server-library-client-package-scala-1.0.0.jar
org.apache.connect.examples.serverlibrary.CustomTableExample
+ ```
+7. **Notice the printed output in the client terminal as well as the creation
of the cloned table**:
+```protobuf
+Explaining plan for custom table: sample_table with path:
<SPARK_HOME>/spark/connect-examples/server-library-example/client/../resources/dummy_data.custom
+== Parsed Logical Plan ==
+Relation [id#2,name#3] csv
+
+== Analyzed Logical Plan ==
+id: int, name: string
+Relation [id#2,name#3] csv
+
+== Optimized Logical Plan ==
+Relation [id#2,name#3] csv
+
+== Physical Plan ==
+FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV,
Location: InMemoryFileIndex(1
paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
+
+Explaining plan for custom table: cloned_table with path:
<SPARK_HOME>/connect-examples/server-library-example/client/../resources/cloned_data.data
+== Parsed Logical Plan ==
+Relation [id#2,name#3] csv
+
+== Analyzed Logical Plan ==
+id: int, name: string
+Relation [id#2,name#3] csv
+
+== Optimized Logical Plan ==
+Relation [id#2,name#3] csv
+
+== Physical Plan ==
+FileScan csv [id#2,name#3] Batched: false, DataFilters: [], Format: CSV,
Location: InMemoryFileIndex(1
paths)[file:/Users/venkata.gudesa/spark/connect-examples/server-library-example/resou...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string>
+```
\ No newline at end of file
diff --git a/connect-examples/server-library-example/client/pom.xml
b/connect-examples/server-library-example/client/pom.xml
new file mode 100644
index 000000000000..364920a2ec22
--- /dev/null
+++ b/connect-examples/server-library-example/client/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.connect.examples.serverlibrary</groupId>
+ <artifactId>spark-server-library-example</artifactId>
+ <version>1.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-server-library-client-package-scala</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- Custom Proto definitions are in the common module -->
+ <dependency>
+ <groupId>org.apache.connect.examples.serverlibrary</groupId>
+ <artifactId>spark-server-library-example-common</artifactId>
+ <version>1.0.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- spark-connect-common contains proto definitions that we require to
build custom commands/relations/expressions -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-connect-common_${scala.binary}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <!-- Dependency on the spark connect client module to interact with the
Spark server -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-connect-client-jvm_${scala.binary}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <!-- Dependency on the scala library -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <plugins>
+ <!-- Scala -->
+ <plugin>
+ <!-- see http://davidb.github.com/scala-maven-plugin -->
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>4.9.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Shade plugin for creating a fat JAR -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.5.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <!--SPARK-42228: Add `ServicesResourceTransformer` to relocation
class names in META-INF/services for grpc-->
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connect-examples/server-library-example/client/src/main/resources/log4j2.xml
b/connect-examples/server-library-example/client/src/main/resources/log4j2.xml
new file mode 100644
index 000000000000..21b0d9719193
--- /dev/null
+++
b/connect-examples/server-library-example/client/src/main/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level
%logger{36} - %msg%n" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console" />
+ </Root>
+ </Loggers>
+</Configuration>
diff --git
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
new file mode 100644
index 000000000000..782a246d9298
--- /dev/null
+++
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import com.google.protobuf.Any
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.sql.{functions, Column, DataFrame, Dataset, Row,
SparkSession}
+
+import org.apache.connect.examples.serverlibrary.proto
+import
org.apache.connect.examples.serverlibrary.proto.CreateTable.Column.{DataType =>
ProtoDataType}
+
+/**
+ * Represents a custom table with associated DataFrame and metadata.
+ *
+ * @param df The underlying DataFrame.
+ * @param table The metadata of the custom table.
+ */
+class CustomTable private (private val df: Dataset[Row], private val table:
proto.CustomTable) {
+
+ /**
+ * Returns the Spark session associated with the DataFrame.
+ */
+ private def spark = df.sparkSession
+
+ /**
+ * Converts the custom table to a DataFrame.
+ *
+ * @return The underlying DataFrame.
+ */
+ def toDF: Dataset[Row] = df
+
+ /**
+ * Prints the execution plan of the custom table.
+ */
+ def explain(): Unit = {
+ println(s"Explaining plan for custom table: ${table.getName} with path:
${table.getPath}")
+ df.explain("extended")
+ }
+
+ /**
+ * Clones the custom table to a new location with a new name.
+ *
+ * @param target The target path for the cloned table.
+ * @param newName The new name for the cloned table.
+ * @param replace Whether to replace the target location if it exists.
+ * @return A new `CustomTable` instance representing the cloned table.
+ */
+ def clone(target: String, newName: String, replace: Boolean): CustomTable = {
+ val cloneTableProto = proto.CloneTable
+ .newBuilder()
+ .setTable(
+ proto.CustomTable
+ .newBuilder()
+ .setName(table.getName)
+ .setPath(table.getPath))
+ .setClone(
+ proto.CustomTable
+ .newBuilder()
+ .setName(newName)
+ .setPath(target))
+ .setReplace(replace)
+ .build()
+ val customCommand = proto.CustomCommand
+ .newBuilder()
+ .setCloneTable(cloneTableProto)
+ .build()
+ // Pack the CustomCommand into Any
+ val customCommandAny = Any.pack(customCommand)
+ // Set the Any as the extension of a Command
+ val commandProto = Command
+ .newBuilder()
+ .setExtension(customCommandAny)
+ .build()
+ // Execute the command
+ spark.execute(commandProto)
+ CustomTable.from(spark, newName, target)
+ }
+}
+
+object CustomTable {
+ /**
+ * Creates a `CustomTable` from the given Spark session, name, and path.
+ *
+ * @param spark The Spark session.
+ * @param name The name of the table.
+ * @param path The path of the table.
+ * @return A new `CustomTable` instance.
+ */
+ def from(spark: SparkSession, name: String, path: String): CustomTable = {
+ val table = proto.CustomTable
+ .newBuilder()
+ .setName(name)
+ .setPath(path)
+ .build()
+ val relation = proto.CustomRelation
+ .newBuilder()
+ .setScan(proto.Scan.newBuilder().setTable(table))
+ .build()
+ val customRelation = Any.pack(relation)
+ val df = spark.newDataFrame(f => f.setExtension(customRelation))
+ new CustomTable(df, table)
+ }
+
+ /**
+ * Creates a new `CustomTableBuilder` instance.
+ *
+ * @param spark The Spark session.
+ * @return A new `CustomTableBuilder` instance.
+ */
+ def create(spark: SparkSession): CustomTableBuilder = new
CustomTableBuilder(spark)
+
+ /**
+ * Enumeration for data types.
+ */
+ object DataType extends Enumeration {
+ type DataType = Value
+ val Int, String, Float, Boolean = Value
+
+ /**
+ * Converts a `DataType` to its corresponding `ProtoDataType`.
+ *
+ * @param dataType The data type to convert.
+ * @return The corresponding `ProtoDataType`.
+ */
+ def toProto(dataType: DataType): ProtoDataType = {
+ dataType match {
+ case Int => ProtoDataType.INT
+ case String => ProtoDataType.STRING
+ case Float => ProtoDataType.FLOAT
+ case Boolean => ProtoDataType.BOOLEAN
+ }
+ }
+ }
+}
diff --git
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala
new file mode 100644
index 000000000000..a1b8ffdb8dd7
--- /dev/null
+++
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableBuilder.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import com.google.protobuf.Any
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.sql.SparkSession
+
+import org.apache.connect.examples.serverlibrary.CustomTable
+
+/**
+ * Builder class for constructing a `CustomTable` instance.
+ *
+ * @param spark The Spark session.
+ */
+class CustomTableBuilder private[serverlibrary] (spark: SparkSession) {
+ import CustomTableBuilder._
+
+ private var name: String = _
+ private var path: String = _
+ private var columns: Seq[Column] = Seq.empty
+
+ /**
+ * Sets the name of the custom table.
+ *
+ * @param name The name of the table.
+ * @return The current `CustomTableBuilder` instance.
+ */
+ def name(name: String): CustomTableBuilder = {
+ this.name = name
+ this
+ }
+
+ /**
+ * Sets the path of the custom table.
+ *
+ * @param path The path of the table.
+ * @return The current `CustomTableBuilder` instance.
+ */
+ def path(path: String): CustomTableBuilder = {
+ this.path = path
+ this
+ }
+
+ /**
+ * Adds a column to the custom table.
+ *
+ * @param name The name of the column.
+ * @param dataType The data type of the column.
+ * @return The current `CustomTableBuilder` instance.
+ */
+ def addColumn(name: String, dataType: CustomTable.DataType.Value):
CustomTableBuilder = {
+ columns = columns :+ Column(name, dataType)
+ this
+ }
+
+ /**
+ * Builds the `CustomTable` instance.
+ *
+ * @return A new `CustomTable` instance.
+ * @throws IllegalArgumentException if name, path, or columns are not set.
+ */
+ def build(): CustomTable = {
+ require(name != null, "Name must be set")
+ require(path != null, "Path must be set")
+ require(columns.nonEmpty, "At least one column must be added")
+
+ // Define the table creation proto
+ val createTableProtoBuilder = proto.CreateTable
+ .newBuilder()
+ .setTable(
+ proto.CustomTable
+ .newBuilder()
+ .setPath(path)
+ .setName(name)
+ .build())
+
+ // Add columns to the table creation proto
+ columns.foreach { column =>
+ createTableProtoBuilder.addColumns(
+ proto.CreateTable.Column
+ .newBuilder()
+ .setName(column.name)
+ .setDataType(CustomTable.DataType.toProto(column.dataType))
+ .build())
+ }
+ val createTableProto = createTableProtoBuilder.build() // Build the
CreateTable object
+
+ // Wrap the CreateTable proto in CustomCommand
+ val customCommand = proto.CustomCommand
+ .newBuilder()
+ .setCreateTable(createTableProto)
+ .build()
+
+ // Pack the CustomCommand into Any
+ val customCommandAny = Any.pack(customCommand)
+ // Set the Any as the extension of a Command
+ val commandProto = Command
+ .newBuilder()
+ .setExtension(customCommandAny)
+ .build()
+
+ // Execute the command
+ spark.execute(commandProto)
+
+ // After the command is executed, create a client-side representation of
the table with the
+ // leaf node of the client-side dataset being the Scan node for the custom
table.
+ CustomTable.from(spark, name, path)
+ }
+}
+
+object CustomTableBuilder {
+ /**
+ * Case class representing a column in the custom table.
+ *
+ * @param name The name of the column.
+ * @param dataType The data type of the column.
+ */
+ private case class Column(name: String, dataType: CustomTable.DataType.Value)
+}
diff --git
a/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableExample.scala
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableExample.scala
new file mode 100644
index 000000000000..8470465cd7a0
--- /dev/null
+++
b/connect-examples/server-library-example/client/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTableExample.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import java.nio.file.{Path, Paths}
+
+import com.google.protobuf.Any
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType}
+
+import org.apache.connect.examples.serverlibrary.proto
+import
org.apache.connect.examples.serverlibrary.proto.CreateTable.Column.{DataType =>
ProtoDataType}
+
+object CustomTableExample {
+ def main(args: Array[String]): Unit = {
+ val spark = SparkSession.builder().remote("sc://localhost").build()
+
+ // Step 1: Create a custom table from existing data
+ val tableName = "sample_table"
+ val fileName = "dummy_data.data"
+
+ val workingDirectory = System.getProperty("user.dir")
+ val fullPath = Paths.get(workingDirectory, s"resources/$fileName")
+ val customTable = CustomTable
+ .create(spark)
+ .name(tableName)
+ .path(fullPath.toString)
+ .addColumn("id", CustomTable.DataType.Int)
+ .addColumn("name", CustomTable.DataType.String)
+ .build()
+
+ // Step 2: Verify
+ customTable.explain()
+
+ // Step 3: Clone the custom table
+ val clonedPath = fullPath.getParent.resolve("cloned_data.data")
+ val clonedName = "cloned_table"
+ val clonedTable =
+ customTable.clone(target = clonedPath.toString, newName = clonedName,
replace = true)
+
+ // Step 4: Verify
+ clonedTable.explain()
+ }
+}
diff --git a/connect-examples/server-library-example/common/pom.xml
b/connect-examples/server-library-example/common/pom.xml
new file mode 100644
index 000000000000..592c43f26770
--- /dev/null
+++ b/connect-examples/server-library-example/common/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.connect.examples.serverlibrary</groupId>
+ <artifactId>spark-server-library-example</artifactId>
+ <version>1.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-server-library-example-common</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!--Required to compile the proto files-->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- os-maven-plugin helps resolve ${os.detected.classifier}
automatically -->
+ <plugin>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.7.0</version>
+ <extensions>true</extensions>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <protoSourceRoot>src/main/protobuf</protoSourceRoot>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connect-examples/server-library-example/common/src/main/protobuf/base.proto
b/connect-examples/server-library-example/common/src/main/protobuf/base.proto
new file mode 100644
index 000000000000..9d902a587ed3
--- /dev/null
+++
b/connect-examples/server-library-example/common/src/main/protobuf/base.proto
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+option java_multiple_files = true;
+option java_package = "org.apache.connect.examples.serverlibrary.proto";
+
+message CustomTable {
+ // Path to the custom table.
+ string path = 1;
+ // Name of the custom table.
+ string name = 2;
+}
diff --git
a/connect-examples/server-library-example/common/src/main/protobuf/commands.proto
b/connect-examples/server-library-example/common/src/main/protobuf/commands.proto
new file mode 100644
index 000000000000..13d9945cfe61
--- /dev/null
+++
b/connect-examples/server-library-example/common/src/main/protobuf/commands.proto
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+option java_multiple_files = true;
+option java_package = "org.apache.connect.examples.serverlibrary.proto";
+
+import "base.proto";
+
+message CustomCommand {
+ oneof command_type {
+ CreateTable create_table = 1;
+ CloneTable clone_table = 2;
+ }
+}
+
+message CreateTable {
+
+ // Column in the schema of the table.
+ message Column {
+ // (Required) Name of the column.
+ string name = 1;
+ // (Required) Data type of the column.
+ enum DataType {
+ DATA_TYPE_UNSPECIFIED = 0; // Default value
+ INT = 1; // Integer data type
+ STRING = 2; // String data type
+ FLOAT = 3; // Float data type
+ BOOLEAN = 4; // Boolean data type
+ }
+ DataType data_type = 2;
+ }
+ // (Required) Table properties.
+ CustomTable table = 1;
+ // (Required) List of columns in the schema of the table.
+ repeated Column columns = 2;
+}
+
+message CloneTable {
+ // (Required) The source table to clone.
+ CustomTable table = 1;
+ // (Required) Path to the location where the data of the cloned table should
be stored.
+ CustomTable clone = 2;
+ // (Required) Overwrites the target location when true.
+ bool replace = 3;
+}
diff --git
a/connect-examples/server-library-example/common/src/main/protobuf/relations.proto
b/connect-examples/server-library-example/common/src/main/protobuf/relations.proto
new file mode 100644
index 000000000000..1ebf0e640bef
--- /dev/null
+++
b/connect-examples/server-library-example/common/src/main/protobuf/relations.proto
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+option java_multiple_files = true;
+option java_package = "org.apache.connect.examples.serverlibrary.proto";
+
+import "base.proto";
+
+message CustomRelation {
+ oneof relation_type {
+ Scan scan = 1;
+ }
+}
+
+message Scan {
+ // (Required) Table to scan.
+ CustomTable table = 1;
+}
diff --git a/connect-examples/server-library-example/pom.xml
b/connect-examples/server-library-example/pom.xml
new file mode 100644
index 000000000000..1723f3b0154f
--- /dev/null
+++ b/connect-examples/server-library-example/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.connect.examples.serverlibrary</groupId>
+ <artifactId>spark-server-library-example</artifactId>
+ <version>1.0.0</version>
+ <packaging>pom</packaging>
+ <modules>
+ <module>common</module>
+ <module>server</module>
+ <module>client</module>
+ </modules>
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <scala.binary>2.13</scala.binary>
+ <scala.version>2.13.15</scala.version>
+ <protobuf.version>3.25.4</protobuf.version>
+ <spark.version>4.0.0-preview2</spark.version>
+ </properties>
+</project>
diff --git a/connect-examples/server-library-example/resources/dummy_data.data
b/connect-examples/server-library-example/resources/dummy_data.data
new file mode 100644
index 000000000000..0a6645b75722
--- /dev/null
+++ b/connect-examples/server-library-example/resources/dummy_data.data
@@ -0,0 +1,6 @@
+id,name
+1,John Doe
+2,Jane Smith
+3,Bob Johnson
+4,Alice Williams
+5,Charlie Brown
\ No newline at end of file
diff --git
a/connect-examples/server-library-example/resources/spark-daria_2.13-1.2.3.jar
b/connect-examples/server-library-example/resources/spark-daria_2.13-1.2.3.jar
new file mode 100644
index 000000000000..31703de77709
Binary files /dev/null and
b/connect-examples/server-library-example/resources/spark-daria_2.13-1.2.3.jar
differ
diff --git a/connect-examples/server-library-example/server/pom.xml
b/connect-examples/server-library-example/server/pom.xml
new file mode 100644
index 000000000000..b95c5e3d6161
--- /dev/null
+++ b/connect-examples/server-library-example/server/pom.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.connect.examples.serverlibrary</groupId>
+ <artifactId>spark-server-library-example</artifactId>
+ <version>1.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-server-library-example-server-extension</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- Custom Proto definitions are in the common module -->
+ <dependency>
+ <groupId>org.apache.connect.examples.serverlibrary</groupId>
+ <artifactId>spark-server-library-example-common</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-connect_${scala.binary}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Include Spark Daria for utility Dataframe write methods -->
+ <dependency>
+ <groupId>com.github.mrpowers</groupId>
+ <artifactId>spark-daria_2.12</artifactId>
+ <version>1.2.3</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <plugins>
+ <!-- Scala -->
+ <plugin>
+ <!-- see http://davidb.github.com/scala-maven-plugin -->
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>4.9.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomCommandPlugin.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomCommandPlugin.scala
new file mode 100644
index 000000000000..2253c4b238be
--- /dev/null
+++
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomCommandPlugin.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.Any
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.plugin.CommandPlugin
+import org.apache.spark.sql.types.{StringType, IntegerType, FloatType,
DoubleType, BooleanType, LongType, StructType, StructField, DataType}
+
+import org.apache.connect.examples.serverlibrary.CustomTable
+import org.apache.connect.examples.serverlibrary.proto
+import
org.apache.connect.examples.serverlibrary.proto.CreateTable.Column.{DataType =>
ProtoDataType}
+
+/**
+ * Commands are distinct actions that can be executed. Unlike relations, which
focus on the
+ * transformation and nesting of output data, commands represent singular
operations that perform
+ * specific tasks on the data.
+ * In this example, the `CustomCommandPlugin` handles operations related to
creating and duplicating
+ * custom tables.
+ */
+class CustomCommandPlugin extends CommandPlugin with CustomPluginBase {
+
+ /**
+ * Processes the raw byte array containing the command.
+ *
+ * @param raw The raw byte array of the command.
+ * @param planner The SparkConnectPlanner instance.
+ * @return True if the command was processed, false otherwise.
+ */
+ override def process(raw: Array[Byte], planner: SparkConnectPlanner):
Boolean = {
+ val command = Any.parseFrom(raw)
+ if (command.is(classOf[proto.CustomCommand])) {
+ processInternal(command.unpack(classOf[proto.CustomCommand]), planner)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Processes the unpacked CustomCommand.
+ *
+ * @param command The unpacked CustomCommand.
+ * @param planner The SparkConnectPlanner instance.
+ */
+ private def processInternal(
+ command: proto.CustomCommand,
+ planner: SparkConnectPlanner): Unit = {
+ command.getCommandTypeCase match {
+ case proto.CustomCommand.CommandTypeCase.CREATE_TABLE =>
+ processCreateTable(planner, command.getCreateTable)
+ case proto.CustomCommand.CommandTypeCase.CLONE_TABLE =>
+ processCloneTable(planner, command.getCloneTable)
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Unsupported command type: ${command.getCommandTypeCase}")
+ }
+ }
+
+ /**
+ * Processes the CreateTable command.
+ *
+ * @param planner The SparkConnectPlanner instance.
+ * @param createTable The CreateTable message.
+ */
+ private def processCreateTable(
+ planner: SparkConnectPlanner,
+ createTable: proto.CreateTable): Unit = {
+ val tableName = createTable.getTable.getName
+ val tablePath = createTable.getTable.getPath
+
+ // Convert the list of columns from the protobuf message to a Spark schema
+ val schema = StructType(createTable.getColumnsList.asScala.toSeq.map {
column =>
+ StructField(
+ column.getName,
+ protoDataTypeToSparkType(column.getDataType),
+ nullable = true // Assuming all columns are nullable for simplicity
+ )
+ })
+
+ // Create the table using the CustomTable utility
+ CustomTable.createTable(tableName, tablePath, planner.session, schema)
+ }
+
+ /**
+ * Converts a protobuf DataType to a Spark DataType.
+ *
+ * @param protoType The protobuf DataType.
+ * @return The corresponding Spark DataType.
+ */
+ private def protoDataTypeToSparkType(protoType: ProtoDataType): DataType = {
+ protoType match {
+ case ProtoDataType.INT => IntegerType
+ case ProtoDataType.STRING => StringType
+ case ProtoDataType.FLOAT => FloatType
+ case ProtoDataType.BOOLEAN => BooleanType
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported or unknown data type:
${protoType}")
+ }
+ }
+
+ /**
+ * Processes the CloneTable command.
+ *
+ * @param planner The SparkConnectPlanner instance.
+ * @param msg The CloneTable message.
+ */
+ private def processCloneTable(planner: SparkConnectPlanner, msg:
proto.CloneTable): Unit = {
+ val sourceTable = getCustomTable(msg.getTable)
+ CustomTable.cloneTable(
+ sourceTable,
+ msg.getClone.getName,
+ msg.getClone.getPath,
+ msg.getReplace)
+ }
+}
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomPluginBase.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomPluginBase.scala
new file mode 100644
index 000000000000..df73e0d9a0fb
--- /dev/null
+++
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomPluginBase.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import org.apache.connect.examples.serverlibrary.proto
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+
+trait CustomPluginBase {
+ protected def getCustomTable(table: proto.CustomTable): CustomTable = {
+ CustomTable.getTable(table.getName, table.getPath)
+ }
+}
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomRelationPlugin.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomRelationPlugin.scala
new file mode 100644
index 000000000000..7b444803a065
--- /dev/null
+++
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomRelationPlugin.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import java.util.Optional
+
+import com.google.protobuf.Any
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.plugin.RelationPlugin
+
+import org.apache.connect.examples.serverlibrary.{CustomPluginBase,
CustomTable}
+import org.apache.connect.examples.serverlibrary.proto
+
+/**
+ * Relations are fundamental to dataset transformations, acting as the
mechanism through which an
+ * input dataset is transformed into an output dataset. Conceptually,
relations can be likened to
+ * tables within a database, manipulated to achieve desired outcomes.
+ * In this example, the `CustomRelationPlugin` handles the transformation
related to scanning
+ * custom tables. The scan relation would appear as leaf nodes in a dataset's
associated logical
+ * plan node when it involves reads from the custom tables.
+ */
+class CustomRelationPlugin extends RelationPlugin with CustomPluginBase {
+
+ /**
+ * Transforms the raw byte array containing the relation into a Spark
logical plan.
+ *
+ * @param raw The raw byte array of the relation.
+ * @param planner The SparkConnectPlanner instance.
+ * @return An Optional containing the LogicalPlan if the relation was
processed, empty otherwise.
+ */
+ override def transform(
+ raw: Array[Byte],
+ planner: SparkConnectPlanner): Optional[LogicalPlan] = {
+ val rel = Any.parseFrom(raw)
+ if (rel.is(classOf[proto.CustomRelation])) {
+ val customRelation = rel.unpack(classOf[proto.CustomRelation])
+ // Transform the custom relation
+ Optional.of(transformInner(customRelation, planner))
+ } else {
+ Optional.empty()
+ }
+ }
+
+ /**
+ * Transforms the unpacked CustomRelation into a Spark logical plan.
+ *
+ * @param relation The unpacked CustomRelation.
+ * @param planner The SparkConnectPlanner instance.
+ * @return The corresponding Spark LogicalPlan.
+ */
+ private def transformInner(
+ relation: proto.CustomRelation,
+ planner: SparkConnectPlanner): LogicalPlan = {
+ relation.getRelationTypeCase match {
+ case proto.CustomRelation.RelationTypeCase.SCAN =>
+ transformScan(relation.getScan, planner)
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Unsupported relation type: ${relation.getRelationTypeCase}")
+ }
+ }
+
+ /**
+ * Transforms the Scan relation into a Spark logical plan.
+ *
+ * @param scan The Scan message.
+ * @param planner The SparkConnectPlanner instance.
+ * @return The corresponding Spark LogicalPlan.
+ */
+ private def transformScan(scan: proto.Scan, planner: SparkConnectPlanner):
LogicalPlan = {
+ val customTable = getCustomTable(scan.getTable)
+ // Convert the custom table to a DataFrame and get its logical plan
+ customTable.toDF().queryExecution.analyzed
+ }
+}
diff --git
a/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
new file mode 100644
index 000000000000..e1630cc25edd
--- /dev/null
+++
b/connect-examples/server-library-example/server/src/main/scala/org/apache/connect/examples/serverlibrary/CustomTable.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.examples.serverlibrary
+
+import java.util.UUID
+
+import com.github.mrpowers.spark.daria.sql.DariaWriters
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+/**
+ * Represents a custom table with an identifier and a DataFrame.
+ *
+ * @param identifier The unique identifier for the table.
+ * @param df The DataFrame associated with the table.
+ */
+class CustomTable private (identifier: CustomTable.Identifier, df:
Dataset[Row]) {
+
+ /**
+ * Returns the DataFrame associated with the table.
+ *
+ * @return The DataFrame.
+ */
+ def toDF(): Dataset[Row] = df
+
+ /**
+ * Writes the DataFrame to disk as a CSV file.
+ */
+ def flush(): Unit = {
+ // Write dataset to disk as a CSV file
+ DariaWriters.writeSingleFile(
+ df = df,
+ format = "csv",
+ sc = df.sparkSession.sparkContext,
+ tmpFolder = s"./${UUID.randomUUID().toString}",
+ filename = identifier.path,
+ saveMode = "overwrite")
+ }
+}
+
+object CustomTable {
+
+ /**
+ * Represents the unique identifier for a custom table.
+ *
+ * @param name The name of the table.
+ * @param path The path where the table is stored.
+ */
+ private case class Identifier(name: String, path: String)
+
+ // Collection holding all the CustomTable instances and searchable by the
identifier
+ private val tablesByIdentifier = scala.collection.mutable.Map[Identifier,
CustomTable]()
+
+ /**
+ * Creates a new custom table.
+ *
+ * @param name The name of the table.
+ * @param path The path where the table is stored.
+ * @param spark The SparkSession instance.
+ * @param schema The schema of the table.
+ * @return The created CustomTable instance.
+ */
+ private[serverlibrary] def createTable(
+ name: String,
+ path: String,
+ spark: SparkSession,
+ schema: StructType): CustomTable = {
+ val identifier = Identifier(name, path)
+ val df = spark.read
+ .option("header", "true")
+ .schema(schema)
+ .csv(path)
+ val table = new CustomTable(identifier, df)
+ tablesByIdentifier(identifier) = table
+ table
+ }
+
+ /**
+ * Clones an existing custom table.
+ *
+ * @param sourceTable The source table to clone.
+ * @param newName The name of the new table.
+ * @param newPath The path where the new table will be stored.
+ * @param replace Whether to replace the existing table if it exists.
+ * @return The cloned CustomTable instance.
+ */
+ private[serverlibrary] def cloneTable(
+ sourceTable: CustomTable,
+ newName: String,
+ newPath: String,
+ replace: Boolean): CustomTable = {
+ val newIdentifier = Identifier(newName, newPath)
+ val clonedDf = sourceTable.toDF()
+ val clonedTable = new CustomTable(newIdentifier, clonedDf)
+ clonedTable.flush()
+ tablesByIdentifier(newIdentifier) = clonedTable
+ clonedTable
+ }
+
+ /**
+ * Retrieves a custom table based on its identifier.
+ *
+ * @param name The name of the table.
+ * @param path The path where the table is stored.
+ * @return The CustomTable instance.
+ * @throws IllegalArgumentException if the table is not found.
+ */
+ def getTable(name: String, path: String): CustomTable = {
+ val identifier = Identifier(name, path)
+ tablesByIdentifier.get(identifier) match {
+ case Some(table) => table
+ case None =>
+ throw new IllegalArgumentException(s"Table with identifier $identifier
not found")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]