stevenzwu commented on code in PR #10308: URL: https://github.com/apache/iceberg/pull/10308#discussion_r1605496877
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Monitors an Iceberg table for changes */ +public class MonitorSource extends SingleThreadedIteratorSource<TableChange> { + private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); + + private final TableLoader tableLoader; + private final RateLimiterStrategy rateLimiterStrategy; + private final long maxReadBack; + + /** + * Creates a {@link org.apache.flink.api.connector.source.Source} which monitors an Iceberg table + * for changes. + * + * @param tableLoader used for accessing the table + * @param rateLimiterStrategy limits the frequency the table is checked + * @param maxReadBack sets the number of snapshots read before stopping change collection + */ + public MonitorSource( + TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null"); + Preconditions.checkArgument(maxReadBack > 0, "Need to read at least 1 snapshot to work"); + + this.tableLoader = tableLoader; + this.rateLimiterStrategy = rateLimiterStrategy; + this.maxReadBack = maxReadBack; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public TypeInformation<TableChange> getProducedType() { + return TypeInformation.of(TableChange.class); + } + + @Override + Iterator<TableChange> createIterator() { + return new SchedulerEventIterator(tableLoader, null, maxReadBack); + } + + @Override + SimpleVersionedSerializer<Iterator<TableChange>> getIteratorSerializer() { + return new SchedulerEventIteratorSerializer(tableLoader, maxReadBack); + } + + @Override + public SourceReader<TableChange, GlobalSplit<TableChange>> createReader( + SourceReaderContext readerContext) throws Exception { + RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(1); + return new RateLimitedSourceReader<>(super.createReader(readerContext), rateLimiter); + } + + /** The Iterator which returns the latest changes on an Iceberg table. */ + @VisibleForTesting + static class SchedulerEventIterator implements Iterator<TableChange> { + private Long lastSnapshotId; + private final long maxReadBack; + private final Table table; + + SchedulerEventIterator(TableLoader tableLoader, Long lastSnapshotId, long maxReadBack) { + this.lastSnapshotId = lastSnapshotId; + this.maxReadBack = maxReadBack; + tableLoader.open(); + this.table = tableLoader.loadTable(); + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public TableChange next() { + try { + table.refresh(); + Snapshot currentSnapshot = table.currentSnapshot(); + Long current = currentSnapshot != null ? currentSnapshot.snapshotId() : null; + Long checking = current; + TableChange event = new TableChange(0, 0, 0L, 0L, 0); + long readBack = 0; + while (checking != null && !checking.equals(lastSnapshotId) && ++readBack <= maxReadBack) { + Snapshot snapshot = table.snapshot(checking); + if (snapshot != null) { + LOG.debug("Reading snapshot {}", snapshot.snapshotId()); + event.merge(new TableChange(snapshot, table.io())); Review Comment: we should probably skip `DataOperations.REPLACE` ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Monitors an Iceberg table for changes */ +public class MonitorSource extends SingleThreadedIteratorSource<TableChange> { + private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); + + private final TableLoader tableLoader; + private final RateLimiterStrategy rateLimiterStrategy; + private final long maxReadBack; + + /** + * Creates a {@link org.apache.flink.api.connector.source.Source} which monitors an Iceberg table + * for changes. + * + * @param tableLoader used for accessing the table + * @param rateLimiterStrategy limits the frequency the table is checked + * @param maxReadBack sets the number of snapshots read before stopping change collection + */ + public MonitorSource( Review Comment: I found it a little strange with read limiter and iterator here. Let me bring up a couple of alternatives to discuss 1. MonitorSource enumerator can emit `TableChangeSplit` and reader just extract the `TableChange` and emit to downstream Trigger operator 2. MonitorSource enumerator just emit `Snapshot` split and reader can process snapshot and construct `TableChange` from manifest info. I like the second option better, the split id could be table UUID + snapshot Id. ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Implementation of the Source V2 API which uses an iterator to read the elements, and uses a + * single thread to do so. + * + * @param <T> The return type of the source + */ +public abstract class SingleThreadedIteratorSource<T> Review Comment: I am also wondering if `SingleThread` is needed. If users don't construct the source operator manually. the stitch code can set the operator parallelism as 1, like the `IcebergFilesCommitter` ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Monitors an Iceberg table for changes */ +public class MonitorSource extends SingleThreadedIteratorSource<TableChange> { + private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); + + private final TableLoader tableLoader; + private final RateLimiterStrategy rateLimiterStrategy; + private final long maxReadBack; + + /** + * Creates a {@link org.apache.flink.api.connector.source.Source} which monitors an Iceberg table + * for changes. + * + * @param tableLoader used for accessing the table + * @param rateLimiterStrategy limits the frequency the table is checked + * @param maxReadBack sets the number of snapshots read before stopping change collection + */ + public MonitorSource( + TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null"); + Preconditions.checkArgument(maxReadBack > 0, "Need to read at least 1 snapshot to work"); + + this.tableLoader = tableLoader; + this.rateLimiterStrategy = rateLimiterStrategy; + this.maxReadBack = maxReadBack; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public TypeInformation<TableChange> getProducedType() { + return TypeInformation.of(TableChange.class); + } + + @Override + Iterator<TableChange> createIterator() { + return new SchedulerEventIterator(tableLoader, null, maxReadBack); + } + + @Override + SimpleVersionedSerializer<Iterator<TableChange>> getIteratorSerializer() { + return new SchedulerEventIteratorSerializer(tableLoader, maxReadBack); + } + + @Override + public SourceReader<TableChange, GlobalSplit<TableChange>> createReader( + SourceReaderContext readerContext) throws Exception { + RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(1); + return new RateLimitedSourceReader<>(super.createReader(readerContext), rateLimiter); + } + + /** The Iterator which returns the latest changes on an Iceberg table. */ + @VisibleForTesting + static class SchedulerEventIterator implements Iterator<TableChange> { + private Long lastSnapshotId; + private final long maxReadBack; + private final Table table; + + SchedulerEventIterator(TableLoader tableLoader, Long lastSnapshotId, long maxReadBack) { + this.lastSnapshotId = lastSnapshotId; + this.maxReadBack = maxReadBack; + tableLoader.open(); + this.table = tableLoader.loadTable(); + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public TableChange next() { + try { + table.refresh(); + Snapshot currentSnapshot = table.currentSnapshot(); + Long current = currentSnapshot != null ? currentSnapshot.snapshotId() : null; + Long checking = current; + TableChange event = new TableChange(0, 0, 0L, 0L, 0); + long readBack = 0; + while (checking != null && !checking.equals(lastSnapshotId) && ++readBack <= maxReadBack) { Review Comment: maxReadBack seems problematic. let's see here is the snapshot history (backwards left to right: from recent snapshot to oldest) S7, S6, S5, S4, S3, S2,S1 lastSnapshotId=S2 before the while loop. maxReadBack=2 after the while loop lastSnapshotId=S5 S3 and S4 will be skipped in the next loop. You would want to scan forward from the last position. You can see how the regular source throttle the enumeration. https://github.com/apache/iceberg/blob/main/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java#L89 ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Objects; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** Event describing changes in an Iceberg table */ +class TableChange { Review Comment: you said the purpose of `MonitorSource` and `TableChange` are to provide input to `Trigger` operator so that the `Trigger` code. Here are the trigger modes discussed before 1. standalone streaming/long-running compaction job. This will use the `MonitorSource` to emit `TableChange` to `Trigger` operator, right? 2. embedded trigger mode in post commit stage of a write job. I assume post commit can emit the `TableChange` event. 3. external trigger mode (like Airflow). how is the Trigger input calculated in this case? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Monitors an Iceberg table for changes */ +public class MonitorSource extends SingleThreadedIteratorSource<TableChange> { + private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); + + private final TableLoader tableLoader; + private final RateLimiterStrategy rateLimiterStrategy; + private final long maxReadBack; + + /** + * Creates a {@link org.apache.flink.api.connector.source.Source} which monitors an Iceberg table + * for changes. + * + * @param tableLoader used for accessing the table + * @param rateLimiterStrategy limits the frequency the table is checked + * @param maxReadBack sets the number of snapshots read before stopping change collection + */ + public MonitorSource( + TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null"); + Preconditions.checkArgument(maxReadBack > 0, "Need to read at least 1 snapshot to work"); + + this.tableLoader = tableLoader; + this.rateLimiterStrategy = rateLimiterStrategy; + this.maxReadBack = maxReadBack; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public TypeInformation<TableChange> getProducedType() { + return TypeInformation.of(TableChange.class); + } + + @Override + Iterator<TableChange> createIterator() { + return new SchedulerEventIterator(tableLoader, null, maxReadBack); + } + + @Override + SimpleVersionedSerializer<Iterator<TableChange>> getIteratorSerializer() { + return new SchedulerEventIteratorSerializer(tableLoader, maxReadBack); + } + + @Override + public SourceReader<TableChange, GlobalSplit<TableChange>> createReader( + SourceReaderContext readerContext) throws Exception { + RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(1); + return new RateLimitedSourceReader<>(super.createReader(readerContext), rateLimiter); + } + + /** The Iterator which returns the latest changes on an Iceberg table. */ + @VisibleForTesting + static class SchedulerEventIterator implements Iterator<TableChange> { + private Long lastSnapshotId; Review Comment: how is this checkpointed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org