wuchong commented on code in PR #2794: URL: https://github.com/apache/fluss/pull/2794#discussion_r2911133417
########## fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectLimitedRows; + +/** + * A {@link BatchScanner} that combines multiple {@link BatchScanner} instances into a single + * scanner. It polls the underlying scanners in a round-robin fashion: each {@link + * #pollBatch(Duration)} call is delegated to the next scanner in the queue, and scanners that still + * have data are re-enqueued while exhausted scanners are closed and removed. + * + * <p>When a {@code limit} is specified, rows are collected eagerly across all underlying scanners + * up to that limit and returned in a single batch. + */ +@Internal +public class CompositeBatchScanner implements BatchScanner { + + /** Queue of underlying scanners to be polled in order. */ + private final LinkedList<BatchScanner> scannerQueue; + + /** Optional row limit; when set, rows are collected eagerly up to this count. */ + private final @Nullable Integer limit; + + public CompositeBatchScanner(List<BatchScanner> scanners, @Nullable Integer limit) { + this.scannerQueue = new LinkedList<>(scanners); + this.limit = limit; + } + + @Override + public void close() throws IOException { + scannerQueue.forEach(IOUtils::closeQuietly); + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + + while (!scannerQueue.isEmpty()) { + // Direct return limit scan which don't have so much data. + if (limit != null) { + CloseableIterator<InternalRow> iterator = + CloseableIterator.wrap(collectLimitedRows(scannerQueue, limit).iterator()); + scannerQueue.clear(); + return iterator; + } + + BatchScanner scanner = scannerQueue.poll(); + try { + CloseableIterator<InternalRow> iterator = scanner.pollBatch(timeout); + if (iterator != null) { + // If the scanner has more data, add it back to the queue + scannerQueue.add(scanner); + return iterator; + } else { + // Close the scanner if it has no more data, and not add it back to the queue + scanner.close(); + } + } catch (Exception e) { + // Ensure all scanners are closed on failure to avoid resource leaks + IOUtils.closeQuietly(scanner); + scannerQueue.forEach(IOUtils::closeQuietly); + scannerQueue.clear(); Review Comment: Should this logic be moved to the `close()` method? If a fatal exception occurs, the scanner owner is responsible for manually invoking `close()`. Placing it here might be problematic if the exception is transient and eligible for retry. ########## fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java: ########## @@ -324,6 +325,24 @@ private void testSnapshotRead( table.close(); } + @Test + void testTableLevelScanRespectsLimit() throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-table-level-scan-limit"); + long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true); + + // insert 3 rows per bucket (9 total across 3 buckets) + putRows(tableId, tablePath, 9); + + int limit = 5; + try (Table table = conn.getTable(tablePath); + BatchScanner scanner = table.newScan().limit(limit).createBatchScanner()) { + List<InternalRow> actual = collectRows(scanner); + // collectLimitedRows stops once >= limit rows are collected + assertThat(actual.size()).isGreaterThanOrEqualTo(limit); + assertThat(actual.size()).isLessThanOrEqualTo(9); Review Comment: Just assert the size should be equal to `limit`? The current assersion looks like the returned result is not determinist. ########## fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompositeBatchScanner}. */ +class CompositeBatchScannerTest { + + private static final Duration TIMEOUT = Duration.ofMillis(10); + + // ------------------------------------------------------------------------- + // No-limit tests + // ------------------------------------------------------------------------- + + @Test + void testPollBatchWithNoLimit() throws IOException { + // Three scanners each holding rows [0], [1], [2]. + // CompositeBatchScanner should round-robin and eventually return all rows. + StubBatchScanner s1 = scanner(0); + StubBatchScanner s2 = scanner(1); + StubBatchScanner s3 = scanner(2); + + CompositeBatchScanner composite = + new CompositeBatchScanner(Arrays.asList(s1, s2, s3), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(collected).hasSize(3); + assertThat(intValues(collected)).containsExactlyInAnyOrder(0, 1, 2); + assertThat(s1.closed).isTrue(); + assertThat(s2.closed).isTrue(); + assertThat(s3.closed).isTrue(); + } + + @Test + void testPollBatchSkipsExhaustedScanner() throws IOException { + // s1 is already exhausted (returns null immediately), s2 has data. + StubBatchScanner s1 = scanner(); // no rows → immediately returns null + StubBatchScanner s2 = scanner(99); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), null); + + List<InternalRow> collected = collectAll(composite); + + assertThat(intValues(collected)).containsExactly(99); + assertThat(s1.closed).isTrue(); + assertThat(s2.closed).isTrue(); + } + + @Test + void testPollBatchWithLimit() throws IOException { + // Two scanners with 3 rows each (one row per batch), limit = 3. + // collectLimitedRows collects until rows.size() >= limit. + StubBatchScanner s1 = scanner(1, 2, 3); + StubBatchScanner s2 = scanner(4, 5, 6); + + CompositeBatchScanner composite = new CompositeBatchScanner(Arrays.asList(s1, s2), 3); + + CloseableIterator<InternalRow> batch = composite.pollBatch(TIMEOUT); + assertThat(batch).isNotNull(); + + List<Integer> values = new ArrayList<>(); + while (batch.hasNext()) { + values.add(batch.next().getInt(0)); + } + assertThat(values.size()).isGreaterThanOrEqualTo(3); Review Comment: should be equal to `3`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
