This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 27d74616441 [Opt](Iceberg) Simplify the code of getting time travel snapshotId (#34299) (#38101) 27d74616441 is described below commit 27d746164418df92fee1f0a0dc8c7fcb456963fc Author: Mingyu Chen <morning...@163.com> AuthorDate: Fri Jul 19 09:45:56 2024 +0800 [Opt](Iceberg) Simplify the code of getting time travel snapshotId (#34299) (#38101) bp #34299 Co-authored-by: Butao Zhang <zhangbu...@cmss.chinamobile.com> --- .../datasource/iceberg/source/IcebergScanNode.java | 29 +++------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index bfb2a5aeb34..6ea58014003 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -58,25 +58,23 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; -import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import java.io.IOException; import java.nio.ByteBuffer; -import java.time.Instant; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -287,8 +285,8 @@ public class IcebergScanNode extends FileQueryScanNode { if (type == TableSnapshot.VersionType.VERSION) { return tableSnapshot.getVersion(); } else { - long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - return getSnapshotIdAsOfTime(icebergTable.history(), snapshotId); + long timestamp = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); + return SnapshotUtil.snapshotIdAsOfTime(icebergTable, timestamp); } } catch (IllegalArgumentException e) { throw new UserException(e); @@ -297,27 +295,6 @@ public class IcebergScanNode extends FileQueryScanNode { return null; } - private long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) { - // find history at or before asOfTimestamp - HistoryEntry latestHistory = null; - for (HistoryEntry entry : historyEntries) { - if (entry.timestampMillis() <= asOfTimestamp) { - if (latestHistory == null) { - latestHistory = entry; - continue; - } - if (entry.timestampMillis() > latestHistory.timestampMillis()) { - latestHistory = entry; - } - } - } - if (latestHistory == null) { - throw new NotFoundException("No version history at or before " - + Instant.ofEpochMilli(asOfTimestamp)); - } - return latestHistory.snapshotId(); - } - private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) { List<IcebergDeleteFileFilter> filters = new ArrayList<>(); for (DeleteFile delete : spitTask.deletes()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org