This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bb112e2089f5 [SPARK-52326][SQL] Add partitions related
ExternalCatalogEvent and post them in corresponding operations
bb112e2089f5 is described below
commit bb112e2089f5028d3b9e0200a1211517ea139351
Author: Xiang Li <[email protected]>
AuthorDate: Wed Dec 17 09:25:34 2025 +0800
[SPARK-52326][SQL] Add partitions related ExternalCatalogEvent and post
them in corresponding operations
### What changes were proposed in this pull request?
1. Add `partitions` related events, for the following external catalog
operations: create / drop / alter / rename. A base "PartitionsEvent" is added
by extending "TableEvent". One "PartitionsEvent" (and its subs) could contain
the operation for one or more partitions. So it is named as "Partition`s`Event"
(in the plural form of partition), so are its subs.
2. Post those events in the corresponding external catalog operations
### Why are the changes needed?
The operation list extracted from Spark events are of great help for user
to summarize what have been done against a db/table/function/partition, for the
purpose of logging and/or auditing.
In
[ExternalCatalogWithListener](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala),
there are events posted against db, table and function for all registered
listeners. But those operations against partition(s) do not have their events
posted.
### Does this PR introduce _any_ user-facing change?
With this change, partition(s) related operations are posted into register
listener as events
### How was this patch tested?
Enrich UT and also tested on local cluster
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53439 from waterlx/partitions_event.
Authored-by: Xiang Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalog/ExternalCatalogWithListener.scala | 10 ++
.../apache/spark/sql/catalyst/catalog/events.scala | 85 ++++++++++++++++
.../catalog/ExternalCatalogEventSuite.scala | 108 +++++++++++++++++++++
3 files changed, 203 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
index 33f088079caa..bfc47d2f348d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
@@ -204,7 +204,10 @@ class ExternalCatalogWithListener(delegate:
ExternalCatalog)
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
+ val partSpecs = parts.map(_.spec)
+ postToAll(CreatePartitionsPreEvent(db, table, partSpecs))
delegate.createPartitions(db, table, parts, ignoreIfExists)
+ postToAll(CreatePartitionsEvent(db, table, partSpecs))
}
override def dropPartitions(
@@ -214,7 +217,9 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = {
+ postToAll(DropPartitionsPreEvent(db, table, partSpecs))
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge,
retainData)
+ postToAll(DropPartitionsEvent(db, table, partSpecs))
}
override def renamePartitions(
@@ -222,14 +227,19 @@ class ExternalCatalogWithListener(delegate:
ExternalCatalog)
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
+ postToAll(RenamePartitionsPreEvent(db, table, specs, newSpecs))
delegate.renamePartitions(db, table, specs, newSpecs)
+ postToAll(RenamePartitionsEvent(db, table, specs, newSpecs))
}
override def alterPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit = {
+ val partSpecs = parts.map(_.spec)
+ postToAll(AlterPartitionsPreEvent(db, table, partSpecs))
delegate.alterPartitions(db, table, parts)
+ postToAll(AlterPartitionsEvent(db, table, partSpecs))
}
override def getPartition(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
index 974c225afbae..27f4f39aa5ab 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
/**
* Event emitted by the external catalog when it is modified. Events are
either fired before or
@@ -203,3 +204,87 @@ case class RenameFunctionEvent(
name: String,
newName: String)
extends FunctionEvent
+
+/**
+ * Event fired when some partitions (of a table) are created, dropped,
renamed, altered.
+ */
+trait PartitionsEvent extends TableEvent {
+ /**
+ * Specs of the partitions which are touched.
+ */
+ val partSpecs: Seq[TablePartitionSpec]
+}
+
+/**
+ * Event fired before some partitions (of a table) are created.
+ */
+case class CreatePartitionsPreEvent(
+ database: String,
+ name /* of table */: String,
+ partSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been created.
+ */
+case class CreatePartitionsEvent(
+ database: String,
+ name /* of table */: String,
+ partSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired before some partitions (of a table) are dropped.
+ */
+case class DropPartitionsPreEvent(
+ database: String,
+ name /* of table */ : String,
+ partSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been dropped.
+ */
+case class DropPartitionsEvent(
+ database: String,
+ name /* of table */ : String,
+ partSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired before some partitions (of a table) are renamed.
+ */
+case class RenamePartitionsPreEvent(
+ database: String,
+ name /* of table */ : String,
+ partSpecs: Seq[TablePartitionSpec],
+ newPartSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been renamed.
+ */
+case class RenamePartitionsEvent(
+ database: String,
+ name /* of table */ : String,
+ partSpecs: Seq[TablePartitionSpec],
+ newPartSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired before some partitions (of a table) are altered.
+ */
+case class AlterPartitionsPreEvent(
+ database: String,
+ name /* of table */ : String,
+ partSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been altered.
+ */
+case class AlterPartitionsEvent(
+ database: String,
+ name /* of table */ : String,
+ partSpecs: Seq[TablePartitionSpec])
+ extends PartitionsEvent
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
index 15858bf2cc69..f332393e503f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -209,4 +209,112 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
catalog.dropFunction("db5", "fn4")
checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5",
"fn4") :: Nil)
}
+
+ testWithCatalog("partitions") { (catalog, checkEvents) =>
+ // Prepare db
+ val db = "db1"
+ val dbUri = preparePath(Files.createTempDirectory(db + "_"))
+ val dbDefinition = CatalogDatabase(
+ name = db,
+ description = "",
+ locationUri = dbUri,
+ properties = Map.empty)
+
+ catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+ checkEvents(
+ CreateDatabasePreEvent(db) ::
+ CreateDatabaseEvent(db) :: Nil)
+
+ // Prepare table
+ val table = "table1"
+ val tableUri = preparePath(Files.createTempDirectory(table + "_"))
+ val tableDefinition = CatalogTable(
+ identifier = TableIdentifier(table, Some(db)),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(locationUri =
Option(tableUri)),
+ schema = new StructType()
+ .add("year", "int")
+ .add("month", "int")
+ .add("sales", "long"))
+
+ catalog.createTable(tableDefinition, ignoreIfExists = false)
+ checkEvents(
+ CreateTablePreEvent(db, table) ::
+ CreateTableEvent(db, table) :: Nil)
+
+ // Prepare partitions
+ val storageFormat = CatalogStorageFormat(
+ locationUri = Some(tableUri),
+ inputFormat = Some("tableInputFormat"),
+ outputFormat = Some("tableOutputFormat"),
+ serde = None,
+ compressed = false,
+ properties = Map.empty)
+ val parts = Seq(CatalogTablePartition(Map("year" -> "2025", "month" ->
"Jan"), storageFormat))
+ val partSpecs = parts.map(_.spec)
+
+ val newPartSpecs = Seq(Map("year" -> "2026", "month" -> "Feb"))
+
+ // CREATE
+ catalog.createPartitions(db, table, parts, ignoreIfExists = false)
+ checkEvents(
+ CreatePartitionsPreEvent(db, table, partSpecs) ::
+ CreatePartitionsEvent(db, table, partSpecs) :: Nil)
+
+ // Re-create with ignoreIfExists as true
+ catalog.createPartitions(db, table, parts, ignoreIfExists = true)
+ checkEvents(
+ CreatePartitionsPreEvent(db, table, partSpecs) ::
+ CreatePartitionsEvent(db, table, partSpecs) :: Nil)
+
+ // createPartitions() failed because re-creating with ignoreIfExists as
false, so PreEvent only
+ intercept[AnalysisException] {
+ catalog.createPartitions(db, table, parts, ignoreIfExists = false)
+ }
+ checkEvents(CreatePartitionsPreEvent(db, table, partSpecs) :: Nil)
+
+ // ALTER
+ catalog.alterPartitions(db, table, parts)
+ checkEvents(
+ AlterPartitionsPreEvent(db, table, partSpecs) ::
+ AlterPartitionsEvent(db, table, partSpecs) ::
+ Nil)
+
+ // RENAME
+ catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
+ checkEvents(
+ RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) ::
+ RenamePartitionsEvent(db, table, partSpecs, newPartSpecs) :: Nil)
+
+ // renamePartitions() failed because partitions have been renamed
according to newPartSpecs,
+ // so PreEvent only
+ intercept[AnalysisException] {
+ catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
+ }
+ checkEvents(RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs)
:: Nil)
+
+ // DROP
+ // dropPartitions() failed
+ // because partition of (old) partSpecs do not exist and ignoreIfNotExists
is false,
+ // So PreEvent only
+ intercept[AnalysisException] {
+ catalog.dropPartitions(db, table, partSpecs,
+ ignoreIfNotExists = false, purge = true, retainData = true)
+ }
+ checkEvents(DropPartitionsPreEvent(db, table, partSpecs) :: Nil)
+
+ // Drop the renamed partitions
+ catalog.dropPartitions(db, table, newPartSpecs,
+ ignoreIfNotExists = false, purge = true, retainData = true)
+ checkEvents(
+ DropPartitionsPreEvent(db, table, newPartSpecs) ::
+ DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
+
+ // Re-drop with ignoreIfNotExists being true
+ catalog.dropPartitions(db, table, newPartSpecs,
+ ignoreIfNotExists = true, purge = true, retainData = true)
+ checkEvents(
+ DropPartitionsPreEvent(db, table, newPartSpecs) ::
+ DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]