This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3a3bc77f3de Revert "[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf
serializer for StreamingQueryProgressWrapper"
3a3bc77f3de is described below
commit 3a3bc77f3dea368ca0b434a3f8a9629b5d69a5ca
Author: Gengliang Wang <[email protected]>
AuthorDate: Thu Jan 5 20:28:55 2023 -0800
Revert "[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for
StreamingQueryProgressWrapper"
### What changes were proposed in this pull request?
This reverts commit 915e9c67a9581a1f66e70321879092d854c9fb3b.
### Why are the changes needed?
When running end-to-end tests, there are 5 NPE errors from string fields:
- SourceProgress.latestOffset
- SourceProgress.endOffset
- SourceProgress.startOffset
- StreamingQueryData.name
- StreamingQueryProgress.name
After fixing them, there is following error:
```
java.lang.UnsupportedOperationException
at
java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
at
org.apache.spark.sql.streaming.ui.StreamingQueryStatisticsPage.$anonfun$generateStatTable$27(StreamingQueryStatisticsPage.scala:401)
```
The deserialized map `StreamingQueryProgress.durationMs` needs to be
mutable.
Give the StreamingQueryProgressWrapper contains nullable fields and mutable
map, I suggest using the default JSON serailizer for this class.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA tests
Closes #39416 from gengliangwang/revertSS.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 51 -------
.../org.apache.spark.status.protobuf.ProtobufSerDe | 1 -
.../org/apache/spark/sql/streaming/progress.scala | 8 +-
.../ui/StreamingQueryStatusListener.scala | 2 +-
.../protobuf/sql/SinkProgressSerializer.scala | 42 -----
.../protobuf/sql/SourceProgressSerializer.scala | 65 --------
.../sql/StateOperatorProgressSerializer.scala | 75 ---------
.../sql/StreamingQueryProgressSerializer.scala | 89 -----------
.../StreamingQueryProgressWrapperSerializer.scala | 40 -----
.../sql/KVStoreProtobufSerializerSuite.scala | 170 +--------------------
10 files changed, 6 insertions(+), 537 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 1c3e5bfc49a..499fda34174 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -686,54 +686,3 @@ message ExecutorPeakMetricsDistributions {
repeated double quantiles = 1;
repeated ExecutorMetrics executor_metrics = 2;
}
-
-message StateOperatorProgress {
- string operator_name = 1;
- int64 num_rows_total = 2;
- int64 num_rows_updated = 3;
- int64 all_updates_time_ms = 4;
- int64 num_rows_removed = 5;
- int64 all_removals_time_ms = 6;
- int64 commit_time_ms = 7;
- int64 memory_used_bytes = 8;
- int64 num_rows_dropped_by_watermark = 9;
- int64 num_shuffle_partitions = 10;
- int64 num_state_store_instances = 11;
- map<string, int64> custom_metrics = 12;
-}
-
-message SourceProgress {
- string description = 1;
- string start_offset = 2;
- string end_offset = 3;
- string latest_offset = 4;
- int64 num_input_rows = 5;
- double input_rows_per_second = 6;
- double processed_rows_per_second = 7;
- map<string, string> metrics = 8;
-}
-
-message SinkProgress {
- string description = 1;
- int64 num_output_rows = 2;
- map<string, string> metrics = 3;
-}
-
-message StreamingQueryProgress {
- string id = 1;
- string run_id = 2;
- string name = 3;
- string timestamp = 4;
- int64 batch_id = 5;
- int64 batch_duration = 6;
- map<string, int64> duration_ms = 7;
- map<string, string> event_time = 8;
- repeated StateOperatorProgress state_operators = 9;
- repeated SourceProgress sources = 10;
- SinkProgress sink = 11;
- map<string, string> observed_metrics = 12;
-}
-
-message StreamingQueryProgressWrapper {
- StreamingQueryProgress progress = 1;
-}
diff --git
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index e907d559349..7beff87d7ec 100644
---
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -18,4 +18,3 @@
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
-org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 1b755ed70c6..3d206e7780c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -40,7 +40,7 @@ import
org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
* Information about updates made to stateful operators in a
[[StreamingQuery]] during a trigger.
*/
@Evolving
-class StateOperatorProgress private[spark](
+class StateOperatorProgress private[sql](
val operatorName: String,
val numRowsTotal: Long,
val numRowsUpdated: Long,
@@ -125,7 +125,7 @@ class StateOperatorProgress private[spark](
* @since 2.1.0
*/
@Evolving
-class StreamingQueryProgress private[spark](
+class StreamingQueryProgress private[sql](
val id: UUID,
val runId: UUID,
val name: String,
@@ -190,7 +190,7 @@ class StreamingQueryProgress private[spark](
* @since 2.1.0
*/
@Evolving
-class SourceProgress protected[spark](
+class SourceProgress protected[sql](
val description: String,
val startOffset: String,
val endOffset: String,
@@ -236,7 +236,7 @@ class SourceProgress protected[spark](
* @since 2.1.0
*/
@Evolving
-class SinkProgress protected[spark](
+class SinkProgress protected[sql](
val description: String,
val numOutputRows: Long,
val metrics: ju.Map[String, String] = Map[String, String]().asJava)
extends Serializable {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index c5ecdb6395a..1bdc5e3f79a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData(
}
}
-private[spark] class StreamingQueryProgressWrapper(val progress:
StreamingQueryProgress) {
+private[sql] class StreamingQueryProgressWrapper(val progress:
StreamingQueryProgress) {
@JsonIgnore @KVIndex
private val uniqueId: String = getUniqueId(progress.runId, progress.batchId,
progress.timestamp)
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
deleted file mode 100644
index 66f8e4942bf..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.protobuf.sql
-
-import org.apache.spark.sql.streaming.SinkProgress
-import org.apache.spark.status.protobuf.StoreTypes
-
-private[protobuf] object SinkProgressSerializer {
-
- def serialize(sink: SinkProgress): StoreTypes.SinkProgress = {
- val builder = StoreTypes.SinkProgress.newBuilder()
- builder.setDescription(sink.description)
- builder.setNumOutputRows(sink.numOutputRows)
- sink.metrics.forEach {
- case (k, v) => builder.putMetrics(k, v)
- }
- builder.build()
- }
-
- def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = {
- new SinkProgress(
- description = sink.getDescription,
- numOutputRows = sink.getNumOutputRows,
- metrics = sink.getMetricsMap
- )
- }
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
deleted file mode 100644
index 4d53c7bbc7b..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.protobuf.sql
-
-import java.util.{List => JList}
-
-import org.apache.spark.sql.streaming.SourceProgress
-import org.apache.spark.status.protobuf.StoreTypes
-
-private[protobuf] object SourceProgressSerializer {
-
- def serialize(source: SourceProgress): StoreTypes.SourceProgress = {
- val builder = StoreTypes.SourceProgress.newBuilder()
- builder.setDescription(source.description)
- builder.setStartOffset(source.startOffset)
- builder.setEndOffset(source.endOffset)
- builder.setLatestOffset(source.latestOffset)
- builder.setNumInputRows(source.numInputRows)
- builder.setInputRowsPerSecond(source.inputRowsPerSecond)
- builder.setProcessedRowsPerSecond(source.processedRowsPerSecond)
- source.metrics.forEach {
- case (k, v) => builder.putMetrics(k, v)
- }
- builder.build()
- }
-
- def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]):
Array[SourceProgress] = {
- val size = sourceList.size()
- val result = new Array[SourceProgress](size)
- var i = 0
- while (i < size) {
- result(i) = deserialize(sourceList.get(i))
- i += 1
- }
- result
- }
-
- private def deserialize(source: StoreTypes.SourceProgress): SourceProgress =
{
- new SourceProgress(
- description = source.getDescription,
- startOffset = source.getStartOffset,
- endOffset = source.getEndOffset,
- latestOffset = source.getLatestOffset,
- numInputRows = source.getNumInputRows,
- inputRowsPerSecond = source.getInputRowsPerSecond,
- processedRowsPerSecond = source.getProcessedRowsPerSecond,
- metrics = source.getMetricsMap
- )
- }
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
deleted file mode 100644
index 6831c044c82..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.protobuf.sql
-
-import java.util.{List => JList}
-
-import org.apache.spark.sql.streaming.StateOperatorProgress
-import org.apache.spark.status.protobuf.StoreTypes
-
-object StateOperatorProgressSerializer {
-
- def serialize(stateOperator: StateOperatorProgress):
StoreTypes.StateOperatorProgress = {
- val builder = StoreTypes.StateOperatorProgress.newBuilder()
- builder.setOperatorName(stateOperator.operatorName)
- builder.setNumRowsTotal(stateOperator.numRowsTotal)
- builder.setNumRowsUpdated(stateOperator.numRowsUpdated)
- builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs)
- builder.setNumRowsRemoved(stateOperator.numRowsRemoved)
- builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs)
- builder.setCommitTimeMs(stateOperator.commitTimeMs)
- builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes)
-
builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark)
- builder.setNumShufflePartitions(stateOperator.numShufflePartitions)
- builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances)
- stateOperator.customMetrics.forEach {
- case (k, v) => builder.putCustomMetrics(k, v)
- }
- builder.build()
- }
-
- def deserializeToArray(
- stateOperatorList: JList[StoreTypes.StateOperatorProgress]):
Array[StateOperatorProgress] = {
- val size = stateOperatorList.size()
- val result = new Array[StateOperatorProgress](size)
- var i = 0
- while (i < size) {
- result(i) = deserialize(stateOperatorList.get(i))
- i += 1
- }
- result
- }
-
- private def deserialize(
- stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress
= {
- new StateOperatorProgress(
- operatorName = stateOperator.getOperatorName,
- numRowsTotal = stateOperator.getNumRowsTotal,
- numRowsUpdated = stateOperator.getNumRowsUpdated,
- allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs,
- numRowsRemoved = stateOperator.getNumRowsRemoved,
- allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs,
- commitTimeMs = stateOperator.getCommitTimeMs,
- memoryUsedBytes = stateOperator.getMemoryUsedBytes,
- numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark,
- numShufflePartitions = stateOperator.getNumShufflePartitions,
- numStateStoreInstances = stateOperator.getNumStateStoreInstances,
- customMetrics = stateOperator.getCustomMetricsMap
- )
- }
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
deleted file mode 100644
index 32d62a18ca4..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.protobuf.sql
-
-import java.util.{HashMap => JHashMap, Map => JMap, UUID}
-
-import com.fasterxml.jackson.databind.json.JsonMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
-import org.apache.spark.sql.streaming.StreamingQueryProgress
-import org.apache.spark.status.protobuf.StoreTypes
-
-private[protobuf] object StreamingQueryProgressSerializer {
-
- private val mapper: JsonMapper = JsonMapper.builder()
- .addModule(DefaultScalaModule)
- .build()
-
- def serialize(process: StreamingQueryProgress):
StoreTypes.StreamingQueryProgress = {
- val builder = StoreTypes.StreamingQueryProgress.newBuilder()
- builder.setId(process.id.toString)
- builder.setRunId(process.runId.toString)
- builder.setName(process.name)
- builder.setTimestamp(process.timestamp)
- builder.setBatchId(process.batchId)
- builder.setBatchDuration(process.batchDuration)
- process.durationMs.forEach {
- case (k, v) => builder.putDurationMs(k, v)
- }
- process.eventTime.forEach {
- case (k, v) => builder.putEventTime(k, v)
- }
- process.stateOperators.foreach(
- s =>
builder.addStateOperators(StateOperatorProgressSerializer.serialize(s)))
- process.sources.foreach(
- s => builder.addSources(SourceProgressSerializer.serialize(s))
- )
- builder.setSink(SinkProgressSerializer.serialize(process.sink))
- process.observedMetrics.forEach {
- case (k, v) => builder.putObservedMetrics(k,
mapper.writeValueAsString(v))
- }
- builder.build()
- }
-
- def deserialize(process: StoreTypes.StreamingQueryProgress):
StreamingQueryProgress = {
- new StreamingQueryProgress(
- id = UUID.fromString(process.getId),
- runId = UUID.fromString(process.getRunId),
- name = process.getName,
- timestamp = process.getTimestamp,
- batchId = process.getBatchId,
- batchDuration = process.getBatchDuration,
- durationMs = process.getDurationMsMap,
- eventTime = process.getEventTimeMap,
- stateOperators =
-
StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList),
- sources =
SourceProgressSerializer.deserializeToArray(process.getSourcesList),
- sink = SinkProgressSerializer.deserialize(process.getSink),
- observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap)
- )
- }
-
- private def convertToObservedMetrics(input: JMap[String, String]):
JHashMap[String, Row] = {
- val observedMetrics = new JHashMap[String, Row](input.size())
- val classType = classOf[GenericRowWithSchema]
- input.forEach {
- case (k, v) =>
- observedMetrics.put(k, mapper.readValue(v, classType))
- }
- observedMetrics
- }
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
deleted file mode 100644
index 3846af26754..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.protobuf.sql
-
-import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper
-import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
-
-class StreamingQueryProgressWrapperSerializer extends ProtobufSerDe {
-
- override val supportClass: Class[_] =
- classOf[StreamingQueryProgressWrapper]
-
- override def serialize(input: Any): Array[Byte] = {
- val data = input.asInstanceOf[StreamingQueryProgressWrapper]
- val builder = StoreTypes.StreamingQueryProgressWrapper.newBuilder()
-
builder.setProgress(StreamingQueryProgressSerializer.serialize(data.progress))
- builder.build().toByteArray
- }
-
- override def deserialize(bytes: Array[Byte]): StreamingQueryProgressWrapper
= {
- val processWrapper =
StoreTypes.StreamingQueryProgressWrapper.parseFrom(bytes)
- new StreamingQueryProgressWrapper(
- StreamingQueryProgressSerializer.deserialize(processWrapper.getProgress))
- }
-}
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index b590f6dd42c..f7b783ef3ca 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -17,18 +17,11 @@
package org.apache.spark.status.protobuf.sql
-import java.lang.{Long => JLong}
import java.util.UUID
-import scala.collection.JavaConverters._
-
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.ui._
-import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress,
StateOperatorProgress, StreamingQueryProgress}
-import org.apache.spark.sql.streaming.ui.{StreamingQueryData,
StreamingQueryProgressWrapper}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.streaming.ui.StreamingQueryData
import org.apache.spark.status.api.v1.sql.SqlResourceSuite
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
@@ -243,165 +236,4 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
assert(result.startTimestamp == input.startTimestamp)
assert(result.endTimestamp == input.endTimestamp)
}
-
- test("StreamingQueryProgressWrapper") {
- // Generate input data
- val stateOperatorProgress0 = new StateOperatorProgress(
- operatorName = "op-0",
- numRowsTotal = 1L,
- numRowsUpdated = 2L,
- allUpdatesTimeMs = 3L,
- numRowsRemoved = 4L,
- allRemovalsTimeMs = 5L,
- commitTimeMs = 6L,
- memoryUsedBytes = 7L,
- numRowsDroppedByWatermark = 8L,
- numShufflePartitions = 9L,
- numStateStoreInstances = 10L,
- customMetrics = Map(
- "custom-metrics-00" -> JLong.valueOf("10"),
- "custom-metrics-01" -> JLong.valueOf("11")).asJava
- )
-
- val stateOperatorProgress1 = new StateOperatorProgress(
- operatorName = "op-1",
- numRowsTotal = 11L,
- numRowsUpdated = 12L,
- allUpdatesTimeMs = 13L,
- numRowsRemoved = 14L,
- allRemovalsTimeMs = 15L,
- commitTimeMs = 16L,
- memoryUsedBytes = 17L,
- numRowsDroppedByWatermark = 18L,
- numShufflePartitions = 19L,
- numStateStoreInstances = 20L,
- customMetrics = Map(
- "custom-metrics-10" -> JLong.valueOf("20"),
- "custom-metrics-11" -> JLong.valueOf("21")).asJava
- )
-
- val source0 = new SourceProgress(
- description = "description-0",
- startOffset = "startOffset-0",
- endOffset = "endOffset-0",
- latestOffset = "latestOffset-0",
- numInputRows = 10L,
- inputRowsPerSecond = 11.0,
- processedRowsPerSecond = 12.0,
- metrics = Map(
- "metrics-00" -> "10",
- "metrics-01" -> "11").asJava
- )
-
- val source1 = new SourceProgress(
- description = "description-",
- startOffset = "startOffset-1",
- endOffset = "endOffset-1",
- latestOffset = "latestOffset-1",
- numInputRows = 20L,
- inputRowsPerSecond = 21.0,
- processedRowsPerSecond = 22.0,
- metrics = Map(
- "metrics-10" -> "20",
- "metrics-11" -> "21").asJava
- )
-
- val sink = new SinkProgress(
- description = "sink-0",
- numOutputRows = 30,
- metrics = Map(
- "metrics-20" -> "30",
- "metrics-21" -> "31").asJava
- )
-
- val schema1 = new StructType()
- .add("c1", "long")
- .add("c2", "double")
- val schema2 = new StructType()
- .add("rc", "long")
- .add("min_q", "string")
- .add("max_q", "string")
-
- val observedMetrics = Map[String, Row](
- "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1),
- "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"),
schema2)
- ).asJava
-
- val progress = new StreamingQueryProgress(
- id = UUID.randomUUID(),
- runId = UUID.randomUUID(),
- name = "name-1",
- timestamp = "2023-01-03T09:14:04.175Z",
- batchId = 1L,
- batchDuration = 2L,
- durationMs = Map(
- "duration-0" -> JLong.valueOf("10"),
- "duration-1" -> JLong.valueOf("11")).asJava,
- eventTime = Map(
- "eventTime-0" -> "20",
- "eventTime-1" -> "21").asJava,
- stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1),
- sources = Array(source0, source1),
- sink = sink,
- observedMetrics = observedMetrics
- )
- val input = new StreamingQueryProgressWrapper(progress)
-
- // Do serialization and deserialization
- val bytes = serializer.serialize(input)
- val result = serializer.deserialize(bytes,
classOf[StreamingQueryProgressWrapper])
-
- // Assertion results
- val resultProcess = result.progress
- assert(progress.id == resultProcess.id)
- assert(progress.runId == resultProcess.runId)
- assert(progress.name == resultProcess.name)
- assert(progress.timestamp == resultProcess.timestamp)
- assert(progress.batchId == resultProcess.batchId)
- assert(progress.batchDuration == resultProcess.batchDuration)
- assert(progress.durationMs == resultProcess.durationMs)
- assert(progress.eventTime == resultProcess.eventTime)
-
- progress.stateOperators.zip(resultProcess.stateOperators).foreach {
- case (o1, o2) =>
- assert(o1.operatorName == o2.operatorName)
- assert(o1.numRowsTotal == o2.numRowsTotal)
- assert(o1.numRowsUpdated == o2.numRowsUpdated)
- assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs)
- assert(o1.numRowsRemoved == o2.numRowsRemoved)
- assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs)
- assert(o1.commitTimeMs == o2.commitTimeMs)
- assert(o1.memoryUsedBytes == o2.memoryUsedBytes)
- assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark)
- assert(o1.numShufflePartitions == o2.numShufflePartitions)
- assert(o1.numStateStoreInstances == o2.numStateStoreInstances)
- assert(o1.customMetrics == o2.customMetrics)
- }
-
- progress.sources.zip(resultProcess.sources).foreach {
- case (s1, s2) =>
- assert(s1.description == s2.description)
- assert(s1.startOffset == s2.startOffset)
- assert(s1.endOffset == s2.endOffset)
- assert(s1.latestOffset == s2.latestOffset)
- assert(s1.numInputRows == s2.numInputRows)
- assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond)
- assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
- assert(s1.metrics == s2.metrics)
- }
-
- Seq(progress.sink).zip(Seq(resultProcess.sink)).foreach {
- case (s1, s2) =>
- assert(s1.description == s2.description)
- assert(s1.numOutputRows == s2.numOutputRows)
- assert(s1.metrics == s2.metrics)
- }
-
- val resultObservedMetrics = resultProcess.observedMetrics
- assert(progress.observedMetrics.size() == resultObservedMetrics.size())
- assert(progress.observedMetrics.keySet() == resultObservedMetrics.keySet())
- progress.observedMetrics.entrySet().forEach { e =>
- assert(e.getValue == resultObservedMetrics.get(e.getKey))
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]