swuferhong commented on code in PR #2855:
URL: https://github.com/apache/fluss/pull/2855#discussion_r3025739949


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.RemoteStorageException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.FileLogRecords;
+import org.apache.fluss.record.LogRecordBatch;
+import org.apache.fluss.remote.RemoteLogSegment;
+import org.apache.fluss.server.log.remote.RemoteLogManager;
+import org.apache.fluss.server.log.remote.RemoteLogStorage;
+import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly;
+
+/**
+ * A utility class that fetches remote log segments and makes them available 
as {@link
+ * FileLogRecords} for KV recovery. It downloads remote log data into a local 
temporary directory
+ * using a UUID to avoid conflicts with other concurrent recovery operations.
+ *
+ * <p>The fetcher is {@link Closeable} and the caller must close it after use 
to clean up the
+ * temporary directory. It is recommended to use try-with-resources to ensure 
proper resource
+ * cleanup:
+ *
+ * <pre>{@code
+ * try (RemoteLogFetcher fetcher = new RemoteLogFetcher(...)) {
+ *     for (LogRecordBatch batch : fetcher.fetch(startOffset, 
localLogStartOffset)) {
+ *         // process batch
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used 
by a single thread
+ * only.
+ */
+public class RemoteLogFetcher implements Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RemoteLogFetcher.class);
+
+    private static final String REMOTE_LOG_RECOVERY_DIR_PREFIX = 
"remote-log-recovery-";
+
+    private final RemoteLogManager remoteLogManager;
+    private final TableBucket tableBucket;
+    private final Path tempDir;
+
+    /** Tracks the currently active iterator to ensure proper cleanup on 
close. */
+    private volatile RemoteLogBatchIterator activeIterator;
+
+    public RemoteLogFetcher(
+            RemoteLogManager remoteLogManager, TableBucket tableBucket, File 
dataDir)
+            throws IOException {
+        this(
+                remoteLogManager,
+                tableBucket,
+                Files.createDirectories(
+                        dataDir.toPath()
+                                .resolve("tmp")
+                                .resolve(REMOTE_LOG_RECOVERY_DIR_PREFIX + 
UUID.randomUUID())));
+    }
+
+    @VisibleForTesting
+    RemoteLogFetcher(RemoteLogManager remoteLogManager, TableBucket 
tableBucket, Path tempDir)
+            throws IOException {
+        this.remoteLogManager = remoteLogManager;
+        this.tableBucket = tableBucket;
+        this.tempDir = tempDir;
+        Files.createDirectories(tempDir);
+    }
+
+    /**
+     * Fetches all relevant remote log segments that cover the range from 
{@code startOffset} up to
+     * {@code localLogStartOffset}, and iterates over the log record batches 
in order.
+     *
+     * <p>The returned {@link Iterable} is lazily loaded - remote log segments 
are downloaded and
+     * processed only when iterating through the batches. This means that file 
downloads and I/O
+     * operations occur during iteration, not when this method is called.
+     *
+     * @param startOffset the offset to start fetching from (inclusive)
+     * @param localLogStartOffset the local log start offset (exclusive, stop 
before this)
+     * @return an iterable over all {@link LogRecordBatch} from the fetched 
remote segments. The
+     *     iterator lazily downloads segments as needed.
+     * @throws Exception if any error occurs during fetching or reading
+     */
+    public Iterable<LogRecordBatch> fetch(long startOffset, long 
localLogStartOffset)
+            throws Exception {
+        List<RemoteLogSegment> segments =
+                remoteLogManager.relevantRemoteLogSegments(tableBucket, 
startOffset);
+        if (segments.isEmpty()) {
+            throw new RemoteStorageException(
+                    String.format(
+                            "No remote log segments found for table bucket %s 
at offset %d",
+                            tableBucket, startOffset));
+        }
+
+        LOG.info(
+                "Found {} remote log segments for table bucket {} from offset 
{} to local log start offset {}",
+                segments.size(),
+                tableBucket,
+                startOffset,
+                localLogStartOffset);
+
+        RemoteLogBatchIterator iterator =
+                new RemoteLogBatchIterator(segments, startOffset, 
localLogStartOffset);
+        this.activeIterator = iterator;
+        return () -> iterator;
+    }
+
+    @Override
+    public void close() {
+        // Close any active iterator to release file handles
+        if (activeIterator != null) {
+            activeIterator.close();
+            activeIterator = null;
+        }
+        LOG.info("Cleaning up remote log recovery temp dir: {}", tempDir);
+        deleteDirectoryQuietly(tempDir.toFile());

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to