swuferhong commented on code in PR #2855: URL: https://github.com/apache/fluss/pull/2855#discussion_r3025739949
########## fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.RemoteStorageException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.FileLogRecords; +import org.apache.fluss.record.LogRecordBatch; +import org.apache.fluss.remote.RemoteLogSegment; +import org.apache.fluss.server.log.remote.RemoteLogManager; +import org.apache.fluss.server.log.remote.RemoteLogStorage; +import org.apache.fluss.utils.FlussPaths; +import org.apache.fluss.utils.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly; + +/** + * A utility class that fetches remote log segments and makes them available as {@link + * FileLogRecords} for KV recovery. It downloads remote log data into a local temporary directory + * using a UUID to avoid conflicts with other concurrent recovery operations. + * + * <p>The fetcher is {@link Closeable} and the caller must close it after use to clean up the + * temporary directory. It is recommended to use try-with-resources to ensure proper resource + * cleanup: + * + * <pre>{@code + * try (RemoteLogFetcher fetcher = new RemoteLogFetcher(...)) { + * for (LogRecordBatch batch : fetcher.fetch(startOffset, localLogStartOffset)) { + * // process batch + * } + * } + * }</pre> + * + * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used by a single thread + * only. + */ +public class RemoteLogFetcher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RemoteLogFetcher.class); + + private static final String REMOTE_LOG_RECOVERY_DIR_PREFIX = "remote-log-recovery-"; + + private final RemoteLogManager remoteLogManager; + private final TableBucket tableBucket; + private final Path tempDir; + + /** Tracks the currently active iterator to ensure proper cleanup on close. */ + private volatile RemoteLogBatchIterator activeIterator; + + public RemoteLogFetcher( + RemoteLogManager remoteLogManager, TableBucket tableBucket, File dataDir) + throws IOException { + this( + remoteLogManager, + tableBucket, + Files.createDirectories( + dataDir.toPath() + .resolve("tmp") + .resolve(REMOTE_LOG_RECOVERY_DIR_PREFIX + UUID.randomUUID()))); + } + + @VisibleForTesting + RemoteLogFetcher(RemoteLogManager remoteLogManager, TableBucket tableBucket, Path tempDir) + throws IOException { + this.remoteLogManager = remoteLogManager; + this.tableBucket = tableBucket; + this.tempDir = tempDir; + Files.createDirectories(tempDir); + } + + /** + * Fetches all relevant remote log segments that cover the range from {@code startOffset} up to + * {@code localLogStartOffset}, and iterates over the log record batches in order. + * + * <p>The returned {@link Iterable} is lazily loaded - remote log segments are downloaded and + * processed only when iterating through the batches. This means that file downloads and I/O + * operations occur during iteration, not when this method is called. + * + * @param startOffset the offset to start fetching from (inclusive) + * @param localLogStartOffset the local log start offset (exclusive, stop before this) + * @return an iterable over all {@link LogRecordBatch} from the fetched remote segments. The + * iterator lazily downloads segments as needed. + * @throws Exception if any error occurs during fetching or reading + */ + public Iterable<LogRecordBatch> fetch(long startOffset, long localLogStartOffset) + throws Exception { + List<RemoteLogSegment> segments = + remoteLogManager.relevantRemoteLogSegments(tableBucket, startOffset); + if (segments.isEmpty()) { + throw new RemoteStorageException( + String.format( + "No remote log segments found for table bucket %s at offset %d", + tableBucket, startOffset)); + } + + LOG.info( + "Found {} remote log segments for table bucket {} from offset {} to local log start offset {}", + segments.size(), + tableBucket, + startOffset, + localLogStartOffset); + + RemoteLogBatchIterator iterator = + new RemoteLogBatchIterator(segments, startOffset, localLogStartOffset); + this.activeIterator = iterator; + return () -> iterator; + } + + @Override + public void close() { + // Close any active iterator to release file handles + if (activeIterator != null) { + activeIterator.close(); + activeIterator = null; + } + LOG.info("Cleaning up remote log recovery temp dir: {}", tempDir); + deleteDirectoryQuietly(tempDir.toFile()); Review Comment: Ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
