xloya opened a new pull request, #7096:
URL: https://github.com/apache/iceberg/pull/7096
When there are enough data files, using broadcast join for association may
cause oom.
**Physical plan before modified:**
`== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false, true) AS value#219]
+- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2729/204334828@45ec7633, obj#218:
java.lang.String
+- DeserializeToObject newInstance(class scala.Tuple2), obj#217:
scala.Tuple2
+- **BroadcastHashJoin [_1#211.path], [_2#212.path], LeftOuter,
BuildRight, false**
:- Project [named_struct(authority, authority#62, path, path#63,
scheme, scheme#64, uriAsString, uriAsString#65) AS _1#211]
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#62, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#63, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#64, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#65]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1711b63f, obj#61:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- Scan[obj#52]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0,
struct<authority:string,path:string,scheme:string,uriAsString:string>,
false].path),false), [plan_id=232]
+- Union
:- Project [named_struct(authority, authority#123, path,
path#124, scheme, scheme#125, uriAsString, uriAsString#126) AS _2#212]
: +- Filter isnotnull(path#124)
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#123, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#124, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#125, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#126]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1c1e78eb, obj#122:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2780/770030446@1eae036e, obj#118:
org.apache.iceberg.spark.actions.FileInfo
: +- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.ManifestFileBean), (setPath,path#67.toString),
(setAddedSnapshotId,staticinvoke(class java.lang.Long, ObjectType(class
java.lang.Long), valueOf, addedSnapshotId#227L, true, false, true)),
(setContent,staticinvoke(class java.lang.Integer, ObjectType(class
java.lang.Integer), valueOf, content#221, true, false, true)),
(setPartitionSpecId,staticinvoke(class java.lang.Integer, ObjectType(class
java.lang.Integer), valueOf, partitionSpecId#225, true, false, true)),
(setLength,staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long),
valueOf, length#223L, true, false, true))), obj#117:
org.apache.iceberg.spark.actions.ManifestFileBean
: +- Exchange RoundRobinPartitioning(200),
REPARTITION_BY_NUM, [plan_id=200]
: +- HashAggregate(keys=[path#67],
functions=[first(content#66, false), first(length#68L, false),
first(partitionSpecId#92, false), first(addedSnapshotId#93L, false)],
output=[content#221, path#67, length#223L, partitionSpecId#225,
addedSnapshotId#227L])
: +- HashAggregate(keys=[path#67],
functions=[partial_first(content#66, false), partial_first(length#68L, false),
partial_first(partitionSpecId#92, false), partial_first(addedSnapshotId#93L,
false)], output=[path#67, first#245, valueSet#246, first#247L, valueSet#248,
first#249, valueSet#250, first#251L, valueSet#252])
: +- Project [content#66,
path#67, length#68L, partition_spec_id#69 AS partitionSpecId#92,
added_snapshot_id#70L AS addedSnapshotId#93L]
: +- BatchScan[content#66,
path#67, length#68L, partition_spec_id#69, added_snapshot_id#70L]
file:/tmp/junit7658548847041001892/junit5488406415725968442/#all_manifests
(branch=null) [filters=, groupedBy=] RuntimeFilters: []
:- Project [named_struct(authority, authority#166, path,
path#167, scheme, scheme#168, uriAsString, uriAsString#169) AS _2#228]
: +- Filter isnotnull(path#167)
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#166, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#167, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#168, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#169]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2729/204334828@29173f02, obj#165:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.FileInfo), (setPath,path#128.toString),
(setType,type#153.toString)), obj#164: org.apache.iceberg.spark.actions.FileInfo
: +- Project [path#128, Manifest AS type#153]
: +- BatchScan[path#128]
file:/tmp/junit7658548847041001892/junit5488406415725968442/#all_manifests
(branch=null) [filters=, groupedBy=] RuntimeFilters: []
:- Project [named_struct(authority, authority#177, path,
path#178, scheme, scheme#179, uriAsString, uriAsString#180) AS _2#229]
: +- Filter isnotnull(path#178)
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#177, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#178, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#179, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#180]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1b4d4949, obj#176:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.FileInfo), (setPath,path#170.toString),
(setType,type#171.toString)), obj#175: org.apache.iceberg.spark.actions.FileInfo
: +- LocalTableScan [path#170, type#171]
+- Project [named_struct(authority, authority#188, path,
path#189, scheme, scheme#190, uriAsString, uriAsString#191) AS _2#230]
+- Filter isnotnull(path#189)
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#188, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#189, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#190, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#191]
+- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1946579d, obj#187:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
+- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.FileInfo), (setPath,path#181.toString),
(setType,type#182.toString)), obj#186: org.apache.iceberg.spark.actions.FileInfo
+- LocalTableScan [path#181, type#182]`
**Physical plan after modified:**
`== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false, true) AS value#219]
+- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2728/869880469@7d1a1ee9, obj#218:
java.lang.String
+- DeserializeToObject newInstance(class scala.Tuple2), obj#217:
scala.Tuple2
+- **SortMergeJoin [_1#211.path], [_2#212.path], LeftOuter**
:- Sort [_1#211.path ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(_1#211.path, 200),
ENSURE_REQUIREMENTS, [plan_id=233]
: +- Project [named_struct(authority, authority#62, path,
path#63, scheme, scheme#64, uriAsString, uriAsString#65) AS _1#211]
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#62, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#63, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#64, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#65]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2728/869880469@122ff251, obj#61:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- Scan[obj#52]
+- Sort [_2#212.path ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_2#212.path, 200),
ENSURE_REQUIREMENTS, [plan_id=234]
+- Union
:- Project [named_struct(authority, authority#123,
path, path#124, scheme, scheme#125, uriAsString, uriAsString#126) AS _2#212]
: +- Filter isnotnull(path#124)
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#123, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#124, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#125, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#126]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2728/869880469@438fc55e, obj#122:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2779/1142936077@56567e9b, obj#118:
org.apache.iceberg.spark.actions.FileInfo
: +- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.ManifestFileBean), (setPath,path#67.toString),
(setAddedSnapshotId,staticinvoke(class java.lang.Long, ObjectType(class
java.lang.Long), valueOf, addedSnapshotId#227L, true, false, true)),
(setContent,staticinvoke(class java.lang.Integer, ObjectType(class
java.lang.Integer), valueOf, content#221, true, false, true)),
(setPartitionSpecId,staticinvoke(class java.lang.Integer, ObjectType(class
java.lang.Integer), valueOf, partitionSpecId#225, true, false, true)),
(setLength,staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long),
valueOf, length#223L, true, false, true))), obj#117:
org.apache.iceberg.spark.actions.ManifestFileBean
: +- Exchange
RoundRobinPartitioning(200), REPARTITION_BY_NUM, [plan_id=200]
: +- HashAggregate(keys=[path#67],
functions=[first(content#66, false), first(length#68L, false),
first(partitionSpecId#92, false), first(addedSnapshotId#93L, false)],
output=[content#221, path#67, length#223L, partitionSpecId#225,
addedSnapshotId#227L])
: +-
HashAggregate(keys=[path#67], functions=[partial_first(content#66, false),
partial_first(length#68L, false), partial_first(partitionSpecId#92, false),
partial_first(addedSnapshotId#93L, false)], output=[path#67, first#245,
valueSet#246, first#247L, valueSet#248, first#249, valueSet#250, first#251L,
valueSet#252])
: +- Project [content#66,
path#67, length#68L, partition_spec_id#69 AS partitionSpecId#92,
added_snapshot_id#70L AS addedSnapshotId#93L]
: +- BatchScan[content#66,
path#67, length#68L, partition_spec_id#69, added_snapshot_id#70L]
file:/tmp/junit8641658744871490880/junit9101792858122759973/#all_manifests
(branch=null) [filters=, groupedBy=] RuntimeFilters: []
:- Project [named_struct(authority, authority#166,
path, path#167, scheme, scheme#168, uriAsString, uriAsString#169) AS _2#228]
: +- Filter isnotnull(path#167)
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#166, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#167, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#168, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#169]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2728/869880469@4d121ab6, obj#165:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.FileInfo), (setPath,path#128.toString),
(setType,type#153.toString)), obj#164: org.apache.iceberg.spark.actions.FileInfo
: +- Project [path#128, Manifest AS
type#153]
: +- BatchScan[path#128]
file:/tmp/junit8641658744871490880/junit9101792858122759973/#all_manifests
(branch=null) [filters=, groupedBy=] RuntimeFilters: []
:- Project [named_struct(authority, authority#177,
path, path#178, scheme, scheme#179, uriAsString, uriAsString#180) AS _2#229]
: +- Filter isnotnull(path#178)
: +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#177, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#178, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#179, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#180]
: +- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2728/869880469@290def7d, obj#176:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
: +- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.FileInfo), (setPath,path#170.toString),
(setType,type#171.toString)), obj#175: org.apache.iceberg.spark.actions.FileInfo
: +- LocalTableScan [path#170, type#171]
+- Project [named_struct(authority, authority#188,
path, path#189, scheme, scheme#190, uriAsString, uriAsString#191) AS _2#230]
+- Filter isnotnull(path#189)
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getAuthority, true, false, true) AS authority#188, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getPath, true, false, true) AS path#189, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getScheme, true, false, true) AS scheme#190, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI,
true]).getUriAsString, true, false, true) AS uriAsString#191]
+- MapPartitions
org.apache.spark.sql.Dataset$$Lambda$2728/869880469@10ea1754, obj#187:
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI
+- DeserializeToObject
initializejavabean(newInstance(class
org.apache.iceberg.spark.actions.FileInfo), (setPath,path#181.toString),
(setType,type#182.toString)), obj#186: org.apache.iceberg.spark.actions.FileInfo
+- LocalTableScan [path#181, type#182]`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]