This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bd93dafb9fd [SPARK-52630][SS] Reorganize streaming operator and state 
mgmt code and dirs
2bd93dafb9fd is described below

commit 2bd93dafb9fda6cc5201ff00547e07b7c098b7cc
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu Jul 3 12:01:45 2025 +0900

    [SPARK-52630][SS] Reorganize streaming operator and state mgmt code and dirs
    
    ### What changes were proposed in this pull request?
    Reorganize streaming operator and state mgmt code and dirs
    
    ### Why are the changes needed?
    Making it easier for operator and component extension in the future
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51327 from anishshri-db/task/SPARK-52630.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../stateful}/MergingSortWithSessionWindowStateIterator.scala             | 0
 .../streaming/{ => operators/stateful}/StatefulOperatorPartitioning.scala | 0
 .../{state => operators/stateful}/StreamingAggregationStateManager.scala  | 0
 .../stateful}/StreamingSessionWindowStateManager.scala                    | 0
 .../stateful/flatmapgroupswithstate}/FlatMapGroupsWithStateExec.scala     | 0
 .../flatmapgroupswithstate}/FlatMapGroupsWithStateExecHelper.scala        | 0
 .../{ => operators/stateful/flatmapgroupswithstate}/GroupStateImpl.scala  | 0
 .../{ => operators/stateful/join}/StreamingSymmetricHashJoinExec.scala    | 0
 .../{ => operators/stateful/join}/StreamingSymmetricHashJoinHelper.scala  | 0
 .../stateful/join}/SymmetricHashJoinStateManager.scala                    | 0
 .../execution/streaming/{ => operators/stateful}/statefulOperators.scala  | 0
 .../execution/streaming/{ => operators/stateful}/streamingLimits.scala    | 0
 .../stateful/transformwithstate}/StateStoreColumnFamilySchemaUtils.scala  | 0
 .../stateful/transformwithstate}/StateTypesEncoderUtils.scala             | 0
 .../stateful/transformwithstate}/TransformWithStateExec.scala             | 0
 .../stateful/transformwithstate}/TransformWithStateExecBase.scala         | 0
 .../stateful/transformwithstate}/TransformWithStateVariableUtils.scala    | 0
 .../statefulprocessor}/StatefulProcessorHandleImpl.scala                  | 0
 .../statefulprocessor}/StatefulProcessorHandleImplBase.scala              | 0
 .../stateful/transformwithstate/statevariables}/ListStateImpl.scala       | 0
 .../transformwithstate/statevariables}/ListStateMetricsImpl.scala         | 0
 .../stateful/transformwithstate/statevariables}/MapStateImpl.scala        | 0
 .../stateful/transformwithstate/statevariables}/ValueStateImpl.scala      | 0
 .../stateful/transformwithstate/timers}/ExpiredTimerInfoImpl.scala        | 0
 .../stateful/transformwithstate/timers}/TimerStateImpl.scala              | 0
 .../stateful/transformwithstate/timers}/TimerValuesImpl.scala             | 0
 .../stateful/transformwithstate/ttl}/ListStateImplWithTTL.scala           | 0
 .../stateful/transformwithstate/ttl}/MapStateImplWithTTL.scala            | 0
 .../{ => operators/stateful/transformwithstate/ttl}/TTLState.scala        | 0
 .../stateful/transformwithstate/ttl}/ValueStateImplWithTTL.scala          | 0
 .../apache/spark/sql/execution/streaming/{ => sinks}/FileStreamSink.scala | 0
 .../spark/sql/execution/streaming/{ => sinks}/FileStreamSinkLog.scala     | 0
 .../org/apache/spark/sql/execution/streaming/{ => sinks}/console.scala    | 0
 33 files changed, 0 insertions(+), 0 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/MergingSortWithSessionWindowStateIterator.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MergingSortWithSessionWindowStateIterator.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/MergingSortWithSessionWindowStateIterator.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorPartitioning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/StatefulOperatorPartitioning.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorPartitioning.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/StatefulOperatorPartitioning.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/StreamingAggregationStateManager.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/StreamingAggregationStateManager.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/StreamingSessionWindowStateManager.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/StreamingSessionWindowStateManager.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/flatmapgroupswithstate/FlatMapGroupsWithStateExec.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/flatmapgroupswithstate/FlatMapGroupsWithStateExec.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/flatmapgroupswithstate/FlatMapGroupsWithStateExecHelper.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/flatmapgroupswithstate/FlatMapGroupsWithStateExecHelper.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/flatmapgroupswithstate/GroupStateImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/flatmapgroupswithstate/GroupStateImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/streamingLimits.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/streamingLimits.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateStoreColumnFamilySchemaUtils.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateStoreColumnFamilySchemaUtils.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateTypesEncoderUtils.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateTypesEncoderUtils.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExecBase.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateVariableUtils.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateVariableUtils.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statefulprocessor/StatefulProcessorHandleImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statefulprocessor/StatefulProcessorHandleImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statefulprocessor/StatefulProcessorHandleImplBase.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statefulprocessor/StatefulProcessorHandleImplBase.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/ListStateImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/ListStateImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/ListStateMetricsImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/ListStateMetricsImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/MapStateImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/MapStateImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/ValueStateImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statevariables/ValueStateImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/ExpiredTimerInfoImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/ExpiredTimerInfoImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/TimerStateImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/TimerStateImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerValuesImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/TimerValuesImpl.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerValuesImpl.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/TimerValuesImpl.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/ListStateImplWithTTL.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/ListStateImplWithTTL.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/MapStateImplWithTTL.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/MapStateImplWithTTL.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/TTLState.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/TTLState.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/ValueStateImplWithTTL.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/ValueStateImplWithTTL.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sinks/FileStreamSink.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sinks/FileStreamSink.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sinks/FileStreamSinkLog.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sinks/FileStreamSinkLog.scala
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sinks/console.scala
similarity index 100%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sinks/console.scala


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to