stevenzwu commented on code in PR #10308:
URL: https://github.com/apache/iceberg/pull/10308#discussion_r1599253290


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Implementation of the Source V2 API which uses an iterator to read the 
elements, and uses a
+ * single thread to do so.
+ *
+ * @param <T> The return type of the source
+ */
+public abstract class SingleThreadedIteratorSource<T>
+    implements Source<
+            T,
+            SingleThreadedIteratorSource.GlobalSplit<T>,
+            Collection<SingleThreadedIteratorSource.GlobalSplit<T>>>,
+        ResultTypeQueryable<T> {
+  private static final String PARALLELISM_ERROR = "Parallelism should be set 
to 1";
+
+  /**
+   * Creates the iterator to return the elements which then emitted by the 
source.
+   *
+   * @return iterator for the elements
+   */
+  public abstract Iterator<T> createIterator();
+
+  /**
+   * Serializes the iterator, which is used to save and restore the state of 
the source.
+   *
+   * @return serializer for the iterator
+   */
+  public abstract SimpleVersionedSerializer<Iterator<T>> 
getIteratorSerializer();
+
+  @Override
+  public SplitEnumerator<GlobalSplit<T>, Collection<GlobalSplit<T>>> 
createEnumerator(
+      SplitEnumeratorContext<GlobalSplit<T>> enumContext) {
+    Preconditions.checkArgument(enumContext.currentParallelism() == 1, 
PARALLELISM_ERROR);
+    return new IteratorSourceEnumerator<>(
+        enumContext, ImmutableList.of(new GlobalSplit<>(createIterator())));
+  }
+
+  @Override
+  public SplitEnumerator<GlobalSplit<T>, Collection<GlobalSplit<T>>> 
restoreEnumerator(
+      SplitEnumeratorContext<GlobalSplit<T>> enumContext, 
Collection<GlobalSplit<T>> checkpoint) {
+    Preconditions.checkArgument(enumContext.currentParallelism() == 1, 
PARALLELISM_ERROR);
+    return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<GlobalSplit<T>> getSplitSerializer() {
+    return new SplitSerializer<>(getIteratorSerializer());
+  }
+
+  @Override
+  public SimpleVersionedSerializer<Collection<GlobalSplit<T>>> 
getEnumeratorCheckpointSerializer() {
+    return new CheckpointSerializer<>(getIteratorSerializer());
+  }
+
+  @Override
+  public SourceReader<T, GlobalSplit<T>> createReader(SourceReaderContext 
readerContext)
+      throws Exception {
+    Preconditions.checkArgument(readerContext.getIndexOfSubtask() == 0, 
PARALLELISM_ERROR);
+    return new IteratorSourceReader<>(readerContext);
+  }
+
+  /** The single split of the {@link SingleThreadedIteratorSource}. */
+  public static class GlobalSplit<T> implements IteratorSourceSplit<T, 
Iterator<T>>, Serializable {

Review Comment:
   what does `global` mean here?



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Implementation of the Source V2 API which uses an iterator to read the 
elements, and uses a
+ * single thread to do so.
+ *
+ * @param <T> The return type of the source
+ */
+public abstract class SingleThreadedIteratorSource<T>

Review Comment:
   curious on why having a base class. do we envision another implementation 
besides `MonitorSource`?



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Objects;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/** Event describing changes in an Iceberg table */
+class TableChange {

Review Comment:
   how will this be used by downstream operators for planning purpose?
   
   also how does this work with maintenance tasks like snapshot expiration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to