wuchong commented on code in PR #2951:
URL: https://github.com/apache/fluss/pull/2951#discussion_r3029360185
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java:
##########
@@ -1494,13 +1504,91 @@ private LogReadInfo readRecords(FetchParams
fetchParams, LogTablet logTablet)
// todo validate fetched epoch.
- FetchDataInfo fetchDataInfo =
- logTablet.read(
- readOffset,
- fetchParams.maxFetchBytes(),
- fetchParams.isolation(),
- fetchParams.minOneMessage(),
- fetchParams.projection());
+ // Create ReadContext for batch filtering if needed.
+ // Only ARROW format has batch-level statistics (V1+ magic) for filter
evaluation.
+ // INDEXED and COMPACTED formats use V0 magic without statistics, so
filter pushdown
+ // would be a no-op — skip it entirely to avoid unnecessary overhead.
+ Predicate resolvedFilter = null;
+ LogRecordReadContext readContext = null;
+ PredicateSchemaResolver predicateResolver = null;
+ FilterInfo filterInfo =
fetchParams.getFilterInfo(tableBucket.getTableId());
+ if (filterInfo != null && logFormat == LogFormat.ARROW) {
+ try {
+ int filterSchemaId = filterInfo.getSchemaId();
+ RowType rowType = null;
+ int schemaIdForContext = -1;
+ if (filterSchemaId >= 0) {
+ Schema filterSchema =
schemaGetter.getSchema(filterSchemaId);
+ if (filterSchema == null) {
+ LOG.warn(
+ "Filter schema not found (schemaId={}) for
table {} bucket {}, falling back to unfiltered read.",
+ filterSchemaId,
+ tableInfo.getTablePath(),
+ tableBucket);
+ } else {
+ rowType = filterSchema.getRowType();
+ schemaIdForContext = filterSchemaId;
+ }
+ } else {
+ rowType = tableInfo.getSchema().getRowType();
+ schemaIdForContext = tableInfo.getSchemaId();
+ }
+ if (rowType != null) {
+ resolvedFilter =
+
PredicateMessageUtils.toPredicate(filterInfo.getPbPredicate(), rowType);
+ if (resolvedFilter != null) {
+ readContext =
+ LogRecordReadContext.createArrowReadContext(
+ rowType, schemaIdForContext,
schemaGetter);
+ predicateResolver =
+ new PredicateSchemaResolver(
+ resolvedFilter, schemaIdForContext,
schemaGetter);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to initialize filter context for {}, "
+ + "falling back to unfiltered read.",
+ tableBucket,
+ e);
+ // Safe fallback: reset all variables to ensure consistent
null state,
+ // so the read proceeds as if no filter was requested.
+ resolvedFilter = null;
+ if (readContext != null) {
+ try {
+ readContext.close();
+ } catch (Exception closeEx) {
+ LOG.debug("Failed to close readContext for {}",
tableBucket, closeEx);
+ }
+ }
+ readContext = null;
+ predicateResolver = null;
Review Comment:
Would be better to extract them into a separate method to initialize the 3
variables. So if there is any exception, it's easy to fallback without
affecting any local variables.
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java:
##########
@@ -395,13 +396,29 @@ public LogAppendInfo appendAsFollower(MemoryLogRecords
records) throws Exception
return append(records, false);
}
- /** Read messages from the local log. */
+ /**
+ * Read messages from the local log.
+ *
+ * @param readOffset the offset to start reading from
+ * @param maxLength the maximum number of bytes to read
+ * @param fetchIsolation the fetch isolation level
+ * @param minOneMessage if true, at least one message is returned even if
it exceeds maxLength
+ * @param projection the column projection to apply, or null for no
projection
+ * @param recordBatchFilter the batch-level filter predicate, or null for
no filtering
+ * @param readContext the read context for batch statistics extraction
(must be non-null iff
+ * recordBatchFilter is non-null)
+ * @param predicateResolver resolves predicates for evolved schemas, or
null to use the original
+ * predicate for all batches
+ */
public FetchDataInfo read(
long readOffset,
int maxLength,
FetchIsolation fetchIsolation,
boolean minOneMessage,
- @Nullable FileLogProjection projection)
+ @Nullable FileLogProjection projection,
+ @Nullable Predicate recordBatchFilter,
+ @Nullable LogRecordBatch.ReadContext readContext,
+ @Nullable PredicateSchemaResolver predicateResolver)
Review Comment:
Introducing three separate parameters is excessive. It would be better to
encapsulate them within a dedicated structure, such as `FilterParams` or
`FilterContext`. It will also be easier for us to handling nullability, because
the 3 parameters should all be null or all be not-null (IIUC).
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java:
##########
@@ -540,19 +647,154 @@ public FetchDataInfo read(
}
}
- public void changeFileSuffixes(String oldSuffix, String newSuffix) throws
IOException {
- fileLogRecords.renameTo(
- new File(
- FileUtils.replaceSuffix(
- fileLogRecords.file().getPath(), oldSuffix,
newSuffix)));
- lazyOffsetIndex.renameTo(
- new File(
- FileUtils.replaceSuffix(
- lazyOffsetIndex.file().getPath(), oldSuffix,
newSuffix)));
- lazyTimeIndex.renameTo(
- new File(
- FileUtils.replaceSuffix(
- lazyTimeIndex.file().getPath(), oldSuffix,
newSuffix)));
+ // NOTE: Server-side filter pushdown currently only works for ARROW format
log tables.
+ // INDEXED and COMPACTED formats use V0 batch magic which does not include
batch-level
+ // statistics (min/max/nullCount). Without statistics, the predicate
cannot be evaluated
+ // against a batch, so all batches are included as a safe fallback.
+ @Nullable
+ private FetchDataInfo readWithFilter(
+ long startOffset,
+ int maxSize,
+ long maxPosition,
+ boolean minOneMessage,
+ @Nullable FileLogProjection projection,
+ Predicate recordBatchFilter,
+ LogRecordBatch.ReadContext readContext,
+ @Nullable PredicateSchemaResolver predicateResolver)
+ throws IOException {
+
+ if (maxSize < 0) {
+ throw new IllegalArgumentException(
+ "Invalid max size " + maxSize + " for log read from
segment " + fileLogRecords);
+ }
+
+ // Use translateOffset to precisely locate the starting position, same
as readWithoutFilter
+ FileLogRecords.LogOffsetPosition startOffsetAndSize =
translateOffset(startOffset, 0);
+ if (startOffsetAndSize == null) {
+ return null;
+ }
+ int startPosition = startOffsetAndSize.getPosition();
+
+ // Iterate batches from the translated position.
+ // Note: AbstractIterator doesn't implement AutoCloseable, so no
explicit cleanup needed.
+ // The iterator's internal state will be garbage collected when this
method returns.
+ AbstractIterator<FileChannelLogRecordBatch> iter =
+ fileLogRecords.batchIterator(startPosition, (int) maxPosition);
+
+ MultiBytesView.Builder builder = null;
+ int accumulatedSize = 0;
+ FileChannelLogRecordBatch firstIncludedBatch = null;
+ FileChannelLogRecordBatch lastIncludedBatch = null;
+ FileChannelLogRecordBatch lastScannedBatch = null;
+ int adjustedMaxSize = maxSize;
+ int filterEvalFailures = 0;
+
+ while (iter.hasNext()) {
+ FileChannelLogRecordBatch batch = iter.next();
+
+ lastScannedBatch = batch;
+
+ // Apply filter using statistics. On any failure, fall back to
including
+ // the batch so a single corrupt batch cannot break the entire
fetch.
+ boolean include = true;
+ try {
+ Optional<LogRecordBatchStatistics> statsOpt =
batch.getStatistics(readContext);
+ if (statsOpt.isPresent()) {
+ LogRecordBatchStatistics stats = statsOpt.get();
+ Predicate effectivePredicate =
+ predicateResolver != null
+ ?
predicateResolver.resolve(stats.getSchemaId())
+ : recordBatchFilter;
+ if (effectivePredicate != null) {
+ include =
+ effectivePredicate.test(
+ batch.getRecordCount(),
+ stats.getMinValues(),
+ stats.getMaxValues(),
+ stats.getNullCounts());
+ }
+ // If effectivePredicate is null, cannot adapt -> include
batch (safe
+ // fallback)
+ }
+ } catch (Exception e) {
+ filterEvalFailures++;
+ if (filterEvalFailures <= 3) {
+ LOG.warn(
+ "Failed to evaluate filter for batch at offset {}
in segment {} ({}), "
+ + "including batch as safe fallback.",
+ batch.baseLogOffset(),
+ fileLogRecords,
+ e.getClass().getSimpleName(),
+ e);
+ }
+ include = true;
+ }
+
+ if (!include) {
+ continue;
+ }
+
+ int batchSize = batch.sizeInBytes();
+
+ if (projection == null) {
+ // No projection: use original batch size for limit check
+ if (firstIncludedBatch == null) {
+ firstIncludedBatch = batch;
+ adjustedMaxSize = minOneMessage ? Math.max(maxSize,
batchSize) : maxSize;
+ } else if (accumulatedSize + batchSize > adjustedMaxSize) {
+ break;
+ }
+ lastIncludedBatch = batch;
+ if (builder == null) {
+ builder = MultiBytesView.builder();
+ }
+ builder.addBytes(fileLogRecords.channel(), batch.position(),
batchSize);
+ accumulatedSize += batchSize;
+ } else {
+ // With projection: project first, then check size with
projected size
+ BytesView projectedBytesView =
projection.projectRecordBatch(batch);
Review Comment:
throw exception if `logFormat != LogFormat.ARROW` like how we do in
`readWithoutFilter`.
##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java:
##########
@@ -147,6 +168,9 @@ public InternalRow getMaxValues() {
@Override
public Long[] getNullCounts() {
Review Comment:
I think we can optimize `getNullCounts()` return a primitive `int[]`.
1. `int` count is enough as you only use 4 bytes for each field null count.
2. `int` rather than boxed type `Integer` to avoid boxing (double size), as
you use `-1` as non-exist.
This can reduce a lot of memory size for `cachedNullCounts`,
`statsNullCounts`. Especially for `cachedNullCounts` when total field count is
very large (1000+).
##########
fluss-common/src/main/java/org/apache/fluss/record/ArrowNullCountReader.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.FieldNode;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.Message;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.RecordBatch;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Field;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache;
+import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ArrowUtils;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utility for extracting null counts from Arrow RecordBatch FlatBuffer
metadata. Used by both
+ * FileChannelLogRecordBatch and DefaultLogRecordBatch to read null counts
from Arrow metadata
+ * instead of the statistics binary format (V2).
+ */
+class ArrowNullCountReader {
+
+ private static final Cache<FieldNodeMappingKey, int[]>
FIELD_NODE_MAPPING_CACHE =
+ CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .expireAfterAccess(Duration.ofMinutes(10))
+ .build();
+
+ /**
+ * Cached version of {@link #computeFieldNodeMapping}. Uses a bounded
Guava cache keyed by
+ * (schemaId, statsIndexMapping). Safe for concurrent access. The cache is
bounded to 1000
+ * entries with 10-minute expiry to prevent unbounded growth during schema
evolution.
+ */
+ static int[] computeFieldNodeMappingCached(
+ RowType rowType, int[] statsIndexMapping, int schemaId) {
+ FieldNodeMappingKey key = new FieldNodeMappingKey(schemaId,
statsIndexMapping);
+ int[] cached = FIELD_NODE_MAPPING_CACHE.getIfPresent(key);
+ if (cached != null) {
+ return cached;
+ }
+ int[] result = computeFieldNodeMapping(rowType, statsIndexMapping);
+ FIELD_NODE_MAPPING_CACHE.put(key, result);
+ return result;
+ }
+
+ /**
+ * Compute mapping from statistics column positions to Arrow FieldNode
indexes. Handles nested
+ * types (ARRAY, MAP, ROW) which occupy multiple FieldNode slots.
+ *
+ * @param rowType the full table schema
+ * @param statsIndexMapping maps statistics positions to schema field
indexes
+ * @return array where result[i] is the FieldNode index for
statsIndexMapping[i]
+ */
+ static int[] computeFieldNodeMapping(RowType rowType, int[]
statsIndexMapping) {
+ Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
+ List<Field> fields = arrowSchema.getFields();
+
+ // Build schema-field-index to FieldNode-index mapping
+ int[] schemaToFieldNode = new int[rowType.getFieldCount()];
+ int fieldNodeIndex = 0;
+ for (int i = 0; i < fields.size(); i++) {
+ schemaToFieldNode[i] = fieldNodeIndex;
+ fieldNodeIndex += countFieldNodes(fields.get(i));
+ }
+
+ // Map statsIndexMapping to FieldNode indexes
+ int[] result = new int[statsIndexMapping.length];
+ for (int i = 0; i < statsIndexMapping.length; i++) {
+ result[i] = schemaToFieldNode[statsIndexMapping[i]];
+ }
+ return result;
+ }
+
+ /** Count total FieldNodes for a field (1 for simple types, more for
nested). */
+ private static int countFieldNodes(Field field) {
+ int count = 1; // the field itself
+ for (Field child : field.getChildren()) {
+ count += countFieldNodes(child);
+ }
+ return count;
+ }
+
+ /**
+ * Extract null counts from Arrow FlatBuffer metadata bytes.
+ *
+ * @param arrowMetadataBytes the Arrow FlatBuffer metadata (without IPC
continuation/size
+ * header)
+ * @param fieldNodeMapping pre-computed mapping from stats index to
FieldNode index
+ * @return null counts array aligned with statsIndexMapping positions
+ */
+ static Long[] extractNullCounts(byte[] arrowMetadataBytes, int[]
fieldNodeMapping) {
+ ByteBuffer buffer = ByteBuffer.wrap(arrowMetadataBytes);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ Message message = Message.getRootAsMessage(buffer);
+ RecordBatch recordBatch = (RecordBatch) message.header(new
RecordBatch());
+ if (recordBatch == null) {
+ throw new IllegalStateException(
+ "Arrow metadata does not contain a RecordBatch header
(type="
+ + message.headerType()
+ + ")");
+ }
+
+ FieldNode node = new FieldNode();
+ Long[] nullCounts = new Long[fieldNodeMapping.length];
+ for (int i = 0; i < fieldNodeMapping.length; i++) {
+ int nodeIndex = fieldNodeMapping[i];
+ if (nodeIndex < 0 || nodeIndex >= recordBatch.nodesLength()) {
+ throw new IllegalStateException(
+ "FieldNode index "
+ + nodeIndex
+ + " out of range [0, "
+ + recordBatch.nodesLength()
+ + ")");
+ }
+ recordBatch.nodes(node, nodeIndex);
+ nullCounts[i] = node.nullCount();
+ }
+ return nullCounts;
+ }
+
+ /** Cache key combining schemaId and statsIndexMapping. */
+ static final class FieldNodeMappingKey {
+ private final int schemaId;
+ private final int[] statsIndexMapping;
Review Comment:
The cache key lacks `tableId`, it has issues when there are multiple tables.
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java:
##########
@@ -60,6 +61,19 @@ public interface Scan {
*/
Scan limit(int rowNumber);
+ /**
+ * Returns a new scan from this that will apply the given predicate filter.
+ *
+ * <p>Note: the filter currently only supports record batch level
filtering for log scanners,
+ * not row level filtering. The computing engine still needs to perform
secondary filtering on
+ * the results. Batch scanners do not support filter pushdown.
+ *
+ * @param predicate the predicate to apply for record batch level filtering
+ */
+ default Scan filter(@Nullable Predicate predicate) {
+ throw new UnsupportedOperationException("Filter pushdown is not
supported by this Scan.");
Review Comment:
We don't need to have default implementation, because it is not a
user-defined interface. All the implementation of this interface are provided
by Fluss itself, so a simple interface without default implemenation is much
cleaner here.
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java:
##########
@@ -540,19 +647,154 @@ public FetchDataInfo read(
}
}
- public void changeFileSuffixes(String oldSuffix, String newSuffix) throws
IOException {
- fileLogRecords.renameTo(
- new File(
- FileUtils.replaceSuffix(
- fileLogRecords.file().getPath(), oldSuffix,
newSuffix)));
- lazyOffsetIndex.renameTo(
- new File(
- FileUtils.replaceSuffix(
- lazyOffsetIndex.file().getPath(), oldSuffix,
newSuffix)));
- lazyTimeIndex.renameTo(
- new File(
- FileUtils.replaceSuffix(
- lazyTimeIndex.file().getPath(), oldSuffix,
newSuffix)));
+ // NOTE: Server-side filter pushdown currently only works for ARROW format
log tables.
+ // INDEXED and COMPACTED formats use V0 batch magic which does not include
batch-level
+ // statistics (min/max/nullCount). Without statistics, the predicate
cannot be evaluated
+ // against a batch, so all batches are included as a safe fallback.
+ @Nullable
+ private FetchDataInfo readWithFilter(
+ long startOffset,
+ int maxSize,
+ long maxPosition,
+ boolean minOneMessage,
+ @Nullable FileLogProjection projection,
+ Predicate recordBatchFilter,
+ LogRecordBatch.ReadContext readContext,
+ @Nullable PredicateSchemaResolver predicateResolver)
+ throws IOException {
+
+ if (maxSize < 0) {
+ throw new IllegalArgumentException(
+ "Invalid max size " + maxSize + " for log read from
segment " + fileLogRecords);
+ }
+
+ // Use translateOffset to precisely locate the starting position, same
as readWithoutFilter
+ FileLogRecords.LogOffsetPosition startOffsetAndSize =
translateOffset(startOffset, 0);
+ if (startOffsetAndSize == null) {
+ return null;
+ }
+ int startPosition = startOffsetAndSize.getPosition();
+
+ // Iterate batches from the translated position.
+ // Note: AbstractIterator doesn't implement AutoCloseable, so no
explicit cleanup needed.
+ // The iterator's internal state will be garbage collected when this
method returns.
+ AbstractIterator<FileChannelLogRecordBatch> iter =
+ fileLogRecords.batchIterator(startPosition, (int) maxPosition);
+
+ MultiBytesView.Builder builder = null;
+ int accumulatedSize = 0;
+ FileChannelLogRecordBatch firstIncludedBatch = null;
+ FileChannelLogRecordBatch lastIncludedBatch = null;
+ FileChannelLogRecordBatch lastScannedBatch = null;
+ int adjustedMaxSize = maxSize;
+ int filterEvalFailures = 0;
+
+ while (iter.hasNext()) {
+ FileChannelLogRecordBatch batch = iter.next();
+
+ lastScannedBatch = batch;
+
+ // Apply filter using statistics. On any failure, fall back to
including
+ // the batch so a single corrupt batch cannot break the entire
fetch.
+ boolean include = true;
+ try {
+ Optional<LogRecordBatchStatistics> statsOpt =
batch.getStatistics(readContext);
+ if (statsOpt.isPresent()) {
+ LogRecordBatchStatistics stats = statsOpt.get();
+ Predicate effectivePredicate =
+ predicateResolver != null
+ ?
predicateResolver.resolve(stats.getSchemaId())
Review Comment:
When `predicateResolver` will be null if this is a filter read? I though the
3 parameter should all be null or all be non-null.
##########
fluss-common/src/main/java/org/apache/fluss/record/FileLogRecords.java:
##########
@@ -309,8 +309,27 @@ public FileChannelChunk toChunk() {
private AbstractIterator<FileChannelLogRecordBatch> batchIterator(int
start)
throws IOException {
+ return batchIterator(start, -1);
+ }
+
+ /**
+ * Get an iterator over the record batches in the file, starting at a
specific position and
+ * ending at a specific position.
+ *
+ * @param start the start position in the file
+ * @param endPosition the end position in the file, or -1 to use the
default end
+ * @return An iterator over batches starting from {@code start}
+ */
+ public AbstractIterator<FileChannelLogRecordBatch> batchIterator(int
start, int endPosition)
+ throws IOException {
Review Comment:
The `IOException` is never thrown from the method.
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java:
##########
@@ -116,7 +131,8 @@ public LogScanner createLogScanner() {
conn.getClientMetricGroup(),
conn.getOrCreateRemoteFileDownloader(),
projectedColumns,
- schemaGetter);
+ schemaGetter,
+ recordBatchFilter);
Review Comment:
throw exception in all the other `createXxxScanner` if the
`recordBatchFilter` is not null, because they all don't support the filter.
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java:
##########
@@ -202,8 +202,18 @@ public static FetchLogResultForBucket
getFetchLogResultForBucket(
respForBucket.hasRecords()
?
MemoryLogRecords.pointToByteBuffer(recordsBuffer)
: MemoryLogRecords.EMPTY;
- fetchLogResultForBucket =
- new FetchLogResultForBucket(tb, records,
respForBucket.getHighWatermark());
+ if (respForBucket.hasFilteredEndOffset()
+ && respForBucket.getFilteredEndOffset() >= 0) {
+ fetchLogResultForBucket =
+ new FetchLogResultForBucket(
+ tb,
Review Comment:
When `filteredEndOffset` is set, any records in the response are currently
silently ignored. Although `records` are typically unset when
`filteredEndOffset` is present, a specific edge case leads to inefficiency: if
the first batch in a log file matches the filter but subsequent batches do not,
the system returns only the next offset of the matched batch without including
the `FilteredEndOffset`. Consequently, these already-filtered batches must be
re-evaluated in the next request, resulting in unnecessary resource consumption
and performance degradation.
##########
fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java:
##########
@@ -524,4 +526,461 @@ private static void assertEquals(List<Object[]> actual,
List<Object[]> expected)
assertThat(actual.get(i)).isEqualTo(expected.get(i));
}
}
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1,
LOG_MAGIC_VALUE_V2})
+ void testProjectRecordBatch(byte recordBatchMagic) throws Exception {
+ // Use schemaId=2 which corresponds to DATA2_SCHEMA (3 columns) in
testingSchemaGetter
+ int schemaIdForData2 = 2;
+
+ // Create test data with multiple batches
+ FileLogRecords fileLogRecords =
+ createFileLogRecords(
+ schemaIdForData2,
+ recordBatchMagic,
+ TestData.DATA2_ROW_TYPE,
+ TestData.DATA2,
+ TestData.DATA2); // Use DATA2 which has 3 columns:
a(int), b(string),
+ // c(string)
+
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+ projection.setCurrentProjection(
+ 1L,
+ testingSchemaGetter,
+ DEFAULT_COMPRESSION,
+ new int[] {0, 2}); // Project columns a and c
+
+ // Get the first batch
+ FileLogInputStream.FileChannelLogRecordBatch batch =
+ fileLogRecords.batchIterator(0, -1).next();
+
+ // Perform projection
+ BytesView projectedBytes = projection.projectRecordBatch(batch);
+
+ // Verify the projected bytes are not empty
+ assertThat(projectedBytes.getBytesLength()).isGreaterThan(0);
+
+ // Verify the projected data by reading it back
+ LogRecords projectedRecords = new BytesViewLogRecords(projectedBytes);
+ RowType projectedType = TestData.DATA2_ROW_TYPE.project(new int[] {0,
2});
+
+ try (LogRecordReadContext context =
+ createArrowReadContext(projectedType, schemaIdForData2,
testingSchemaGetter)) {
+ for (LogRecordBatch projectedBatch : projectedRecords.batches()) {
+ try (CloseableIterator<LogRecord> records =
projectedBatch.records(context)) {
+ int recordCount = 0;
+ while (records.hasNext()) {
+ LogRecord record = records.next();
+ InternalRow row = record.getRow();
+
+ // Verify projected row has correct number of fields
+ assertThat(row.getFieldCount()).isEqualTo(2);
+
+ // Verify the projected fields contain correct data
+ assertThat(row.getInt(0))
+
.isEqualTo(TestData.DATA2.get(recordCount)[0]); // column a
+ assertThat(row.getString(1).toString())
+
.isEqualTo(TestData.DATA2.get(recordCount)[2]); // column c
+
+ recordCount++;
+ }
+ // Verify we got all records from the original batch
+ assertThat(recordCount).isEqualTo(TestData.DATA2.size());
+ }
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1,
LOG_MAGIC_VALUE_V2})
+ void testProjectRecordBatchEmptyBatch(byte recordBatchMagic) throws
Exception {
+ // Create an empty batch (this would be a CDC batch with no records)
+ FileLogRecords fileLogRecords = FileLogRecords.open(new File(tempDir,
"empty.tmp"));
+
+ // Create an empty memory log records
+ MemoryLogRecords emptyRecords =
+ createRecordsWithoutBaseLogOffset(
+ TestData.DATA1_ROW_TYPE,
+ DEFAULT_SCHEMA_ID,
+ 0L,
+ System.currentTimeMillis(),
+ recordBatchMagic,
+ new ArrayList<>(), // Empty data
+ LogFormat.ARROW);
+
+ fileLogRecords.append(emptyRecords);
+ fileLogRecords.flush();
+
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+ projection.setCurrentProjection(
+ 1L, testingSchemaGetter, DEFAULT_COMPRESSION, new int[] {0});
+
+ // Get the batch (should be empty)
+ FileLogInputStream.FileChannelLogRecordBatch batch =
+ fileLogRecords.batchIterator(0, -1).next();
+
+ // Perform projection on empty batch
+ BytesView projectedBytes = projection.projectRecordBatch(batch);
+
+ // Should return empty bytes for empty batch
+ assertThat(projectedBytes.getBytesLength()).isEqualTo(0);
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1,
LOG_MAGIC_VALUE_V2})
+ void testProjectRecordBatchSingleColumn(byte recordBatchMagic) throws
Exception {
+ // Use schemaId=2 which corresponds to DATA2_SCHEMA (3 columns) in
testingSchemaGetter
+ int schemaIdForData2 = 2;
+
+ // Test projection to single column
+ FileLogRecords fileLogRecords =
+ createFileLogRecords(
+ schemaIdForData2,
+ recordBatchMagic,
+ TestData.DATA2_ROW_TYPE,
+ TestData.DATA2);
+
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+ projection.setCurrentProjection(
+ 1L,
+ testingSchemaGetter,
+ DEFAULT_COMPRESSION,
+ new int[] {1}); // Project only column b
+
+ FileLogInputStream.FileChannelLogRecordBatch batch =
+ fileLogRecords.batchIterator(0, -1).next();
+ BytesView projectedBytes = projection.projectRecordBatch(batch);
+
+ assertThat(projectedBytes.getBytesLength()).isGreaterThan(0);
+
+ // Verify single column projection
+ LogRecords projectedRecords = new BytesViewLogRecords(projectedBytes);
+ RowType projectedType = TestData.DATA2_ROW_TYPE.project(new int[] {1});
+
+ try (LogRecordReadContext context =
+ createArrowReadContext(projectedType, schemaIdForData2,
testingSchemaGetter)) {
+ for (LogRecordBatch projectedBatch : projectedRecords.batches()) {
+ try (CloseableIterator<LogRecord> records =
projectedBatch.records(context)) {
+ int recordCount = 0;
+ while (records.hasNext()) {
+ LogRecord record = records.next();
+ InternalRow row = record.getRow();
+
+ assertThat(row.getFieldCount()).isEqualTo(1);
+ assertThat(row.getString(0).toString())
+
.isEqualTo(TestData.DATA2.get(recordCount)[1]); // column b
+
+ recordCount++;
+ }
+ assertThat(recordCount).isEqualTo(TestData.DATA2.size());
+ }
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1,
LOG_MAGIC_VALUE_V2})
+ void testProjectRecordBatchAllColumns(byte recordBatchMagic) throws
Exception {
+ // Use schemaId=2 which corresponds to DATA2_SCHEMA (3 columns) in
testingSchemaGetter
+ int schemaIdForData2 = 2;
+
+ // Test projection to all columns (should be equivalent to no
projection)
+ FileLogRecords fileLogRecords =
+ createFileLogRecords(
+ schemaIdForData2,
+ recordBatchMagic,
+ TestData.DATA2_ROW_TYPE,
+ TestData.DATA2);
+
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+ projection.setCurrentProjection(
+ 1L,
+ testingSchemaGetter,
+ DEFAULT_COMPRESSION,
+ new int[] {0, 1, 2}); // Project all columns
+
+ FileLogInputStream.FileChannelLogRecordBatch batch =
+ fileLogRecords.batchIterator(0, -1).next();
+ BytesView projectedBytes = projection.projectRecordBatch(batch);
+
+ assertThat(projectedBytes.getBytesLength()).isGreaterThan(0);
+
+ // Verify all columns projection
+ LogRecords projectedRecords = new BytesViewLogRecords(projectedBytes);
+ RowType projectedType = TestData.DATA2_ROW_TYPE.project(new int[] {0,
1, 2});
+
+ try (LogRecordReadContext context =
+ createArrowReadContext(projectedType, schemaIdForData2,
testingSchemaGetter)) {
+ for (LogRecordBatch projectedBatch : projectedRecords.batches()) {
+ try (CloseableIterator<LogRecord> records =
projectedBatch.records(context)) {
+ int recordCount = 0;
+ while (records.hasNext()) {
+ LogRecord record = records.next();
+ InternalRow row = record.getRow();
+
+ assertThat(row.getFieldCount()).isEqualTo(3);
+ assertThat(row.getInt(0))
+
.isEqualTo(TestData.DATA2.get(recordCount)[0]); // column a
+ assertThat(row.getString(1).toString())
+
.isEqualTo(TestData.DATA2.get(recordCount)[1]); // column b
+ assertThat(row.getString(2).toString())
+
.isEqualTo(TestData.DATA2.get(recordCount)[2]); // column c
+
+ recordCount++;
+ }
+ assertThat(recordCount).isEqualTo(TestData.DATA2.size());
+ }
+ }
+ }
+ }
+
+ @Test
+ void testProjectRecordBatchNoProjectionSet() throws Exception {
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+
+ // Create a file with actual data so we can get a valid batch
+ FileLogRecords fileLogRecords =
+ createFileLogRecords(LOG_MAGIC_VALUE_V2,
TestData.DATA2_ROW_TYPE, TestData.DATA2);
+ FileLogInputStream.FileChannelLogRecordBatch batch =
+ fileLogRecords.batchIterator(0, -1).next();
+
+ // Should throw exception when no projection is set
+ // getOrCreateProjectionInfo returns null because no projection has
been registered,
+ // causing a NullPointerException downstream. This is an
implementation detail —
+ // the key contract is that calling projectRecordBatch without setting
a projection fails.
+ assertThatThrownBy(() -> projection.projectRecordBatch(batch))
+ .isInstanceOf(Exception.class);
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1,
LOG_MAGIC_VALUE_V2})
+ void testProjectRecordBatchMultipleBatches(byte recordBatchMagic) throws
Exception {
+ // Test projection across multiple batches
+ FileLogRecords fileLogRecords =
+ createFileLogRecords(
+ recordBatchMagic,
+ TestData.DATA1_ROW_TYPE,
+ TestData.DATA1,
+ TestData.ANOTHER_DATA1);
+
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+ projection.setCurrentProjection(
+ 1L,
+ testingSchemaGetter,
+ DEFAULT_COMPRESSION,
+ new int[] {0}); // Project only column a
+
+ // Test projection on first batch
+ FileLogInputStream.FileChannelLogRecordBatch firstBatch =
+ fileLogRecords.batchIterator(0, -1).next();
+ BytesView firstProjectedBytes =
projection.projectRecordBatch(firstBatch);
+ assertThat(firstProjectedBytes.getBytesLength()).isGreaterThan(0);
+
+ // Test projection on second batch
+ FileLogInputStream.FileChannelLogRecordBatch secondBatch =
+ fileLogRecords
+ .batchIterator(firstBatch.position() +
firstBatch.sizeInBytes(), -1)
+ .next();
+ BytesView secondProjectedBytes =
projection.projectRecordBatch(secondBatch);
+ assertThat(secondProjectedBytes.getBytesLength()).isGreaterThan(0);
+
+ // Verify both projections work correctly
+ LogRecords firstProjectedRecords = new
BytesViewLogRecords(firstProjectedBytes);
+ LogRecords secondProjectedRecords = new
BytesViewLogRecords(secondProjectedBytes);
+ RowType projectedType = TestData.DATA1_ROW_TYPE.project(new int[] {0});
+
+ try (LogRecordReadContext context =
+ createArrowReadContext(projectedType, DEFAULT_SCHEMA_ID,
testingSchemaGetter)) {
+ // Verify first batch
+ int firstBatchCount = 0;
+ for (LogRecordBatch projectedBatch :
firstProjectedRecords.batches()) {
+ try (CloseableIterator<LogRecord> records =
projectedBatch.records(context)) {
+ while (records.hasNext()) {
+ LogRecord record = records.next();
+ InternalRow row = record.getRow();
+
assertThat(row.getInt(0)).isEqualTo(TestData.DATA1.get(firstBatchCount)[0]);
+ firstBatchCount++;
+ }
+ }
+ }
+ assertThat(firstBatchCount).isEqualTo(TestData.DATA1.size());
+
+ // Verify second batch
+ int secondBatchCount = 0;
+ for (LogRecordBatch projectedBatch :
secondProjectedRecords.batches()) {
+ try (CloseableIterator<LogRecord> records =
projectedBatch.records(context)) {
+ while (records.hasNext()) {
+ LogRecord record = records.next();
+ InternalRow row = record.getRow();
+ assertThat(row.getInt(0))
+
.isEqualTo(TestData.ANOTHER_DATA1.get(secondBatchCount)[0]);
+ secondBatchCount++;
+ }
+ }
+ }
+
assertThat(secondBatchCount).isEqualTo(TestData.ANOTHER_DATA1.size());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V2})
+ void testProjectRecordBatchStatisticsClearing(byte recordBatchMagic)
throws Exception {
+ // Test that statistics are properly cleared during projection for V2+
versions
+ FileLogRecords fileLogRecords =
+ createFileLogRecords(recordBatchMagic,
TestData.DATA2_ROW_TYPE, TestData.DATA2);
+
+ FileLogProjection projection = new FileLogProjection(new
ProjectionPushdownCache());
+ projection.setCurrentProjection(
+ 1L,
+ testingSchemaGetter,
+ DEFAULT_COMPRESSION,
+ new int[] {0, 1}); // Project columns a and b
+
+ FileLogInputStream.FileChannelLogRecordBatch batch =
+ fileLogRecords.batchIterator(0, -1).next();
+ BytesView projectedBytes = projection.projectRecordBatch(batch);
+
+ assertThat(projectedBytes.getBytesLength()).isGreaterThan(0);
+
+ // Verify the projected batch has statistics cleared
+ LogRecords projectedRecords = new BytesViewLogRecords(projectedBytes);
+ RowType projectedType = TestData.DATA2_ROW_TYPE.project(new int[] {0,
1});
+
+ try (LogRecordReadContext context =
+ createArrowReadContext(projectedType, DEFAULT_SCHEMA_ID,
testingSchemaGetter)) {
+ for (LogRecordBatch projectedBatch : projectedRecords.batches()) {
+ // Verify that statistics are not available in projected batch
+ assertThat(projectedBatch.getStatistics(context)).isEmpty();
+
+ // Verify the projected data is correct
+ try (CloseableIterator<LogRecord> records =
projectedBatch.records(context)) {
+ int recordCount = 0;
+ while (records.hasNext()) {
+ LogRecord record = records.next();
+ InternalRow row = record.getRow();
+
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getInt(0))
+
.isEqualTo(TestData.DATA2.get(recordCount)[0]); // column a
+ assertThat(row.getString(1).toString())
+
.isEqualTo(TestData.DATA2.get(recordCount)[1]); // column b
+
+ recordCount++;
+ }
+ assertThat(recordCount).isEqualTo(TestData.DATA2.size());
+ }
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testProjectRecordBatchNoStatisticsClearing(byte recordBatchMagic)
throws Exception {
+ // Test that statistics clearing only happens for V2+ versions
Review Comment:
Since we have changed the stats to V1, we should update the tests to reflect
that. IIUC, this should test only V0 and we may also need to update other
tests.
##########
fluss-common/src/main/java/org/apache/fluss/record/ArrowNullCountReader.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.FieldNode;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.Message;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.RecordBatch;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Field;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache;
+import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ArrowUtils;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utility for extracting null counts from Arrow RecordBatch FlatBuffer
metadata. Used by both
+ * FileChannelLogRecordBatch and DefaultLogRecordBatch to read null counts
from Arrow metadata
+ * instead of the statistics binary format (V2).
+ */
+class ArrowNullCountReader {
Review Comment:
Could you revert the commit of `get null count from arrow meta`, I think we
can review this in an separate pull request. I have some concerns on the cache
and deserialization performance.
##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -887,6 +893,22 @@ public static ProduceLogResponse makeProduceLogResponse(
return produceResponse;
}
+ public static @Nullable Map<Long, FilterInfo>
getTableFilterInfoMap(FetchLogRequest request) {
+ Map<Long, FilterInfo> result = null;
+ for (PbFetchLogReqForTable tableReq : request.getTablesReqsList()) {
+ if (tableReq.hasFilterPredicate()) {
+ if (result == null) {
+ result = new HashMap<>();
+ }
+ int schemaId = tableReq.hasFilterSchemaId() ?
tableReq.getFilterSchemaId() : -1;
Review Comment:
Is `-1` valid? If not, we should fail-fast by throwing exception.
##########
fluss-server/src/main/java/org/apache/fluss/server/log/PredicateSchemaResolver.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.utils.SchemaUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Resolves a filter predicate for a given batch schema ID, handling schema
evolution transparently.
+ *
+ * <p>When the batch schema matches the predicate schema, the original
predicate is returned
+ * directly. When they differ, the predicate is adapted using field index
mapping and the result is
+ * cached to avoid redundant adaptation across batches and segments.
+ *
+ * <p>Adapted predicates are cached in a simple HashMap since the resolver is
created per-request
+ * and does not require thread-safety or eviction policies.
+ */
+@NotThreadSafe
+public final class PredicateSchemaResolver {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PredicateSchemaResolver.class);
+
+ private final Predicate predicate;
+ private final int predicateSchemaId;
+ @Nullable private final Schema predicateSchema;
+ @Nullable private final SchemaGetter schemaGetter;
+
+ private final Map<Integer, Optional<Predicate>> cache = new HashMap<>();
+
+ public PredicateSchemaResolver(
+ Predicate predicate, int predicateSchemaId, @Nullable SchemaGetter
schemaGetter) {
Review Comment:
It seems the `schemaGetter` is never null in production code, only tests set
it to null. However, this makes the logic (`schemaGetter` null/non-null
handling) confusing and complex. , maybe we can force this field to be not
null, and simplify the code.
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java:
##########
@@ -1494,13 +1504,91 @@ private LogReadInfo readRecords(FetchParams
fetchParams, LogTablet logTablet)
// todo validate fetched epoch.
- FetchDataInfo fetchDataInfo =
- logTablet.read(
- readOffset,
- fetchParams.maxFetchBytes(),
- fetchParams.isolation(),
- fetchParams.minOneMessage(),
- fetchParams.projection());
+ // Create ReadContext for batch filtering if needed.
+ // Only ARROW format has batch-level statistics (V1+ magic) for filter
evaluation.
+ // INDEXED and COMPACTED formats use V0 magic without statistics, so
filter pushdown
+ // would be a no-op — skip it entirely to avoid unnecessary overhead.
+ Predicate resolvedFilter = null;
+ LogRecordReadContext readContext = null;
+ PredicateSchemaResolver predicateResolver = null;
+ FilterInfo filterInfo =
fetchParams.getFilterInfo(tableBucket.getTableId());
+ if (filterInfo != null && logFormat == LogFormat.ARROW) {
+ try {
+ int filterSchemaId = filterInfo.getSchemaId();
+ RowType rowType = null;
+ int schemaIdForContext = -1;
+ if (filterSchemaId >= 0) {
+ Schema filterSchema =
schemaGetter.getSchema(filterSchemaId);
+ if (filterSchema == null) {
+ LOG.warn(
+ "Filter schema not found (schemaId={}) for
table {} bucket {}, falling back to unfiltered read.",
+ filterSchemaId,
+ tableInfo.getTablePath(),
+ tableBucket);
+ } else {
+ rowType = filterSchema.getRowType();
+ schemaIdForContext = filterSchemaId;
+ }
+ } else {
+ rowType = tableInfo.getSchema().getRowType();
+ schemaIdForContext = tableInfo.getSchemaId();
+ }
+ if (rowType != null) {
+ resolvedFilter =
+
PredicateMessageUtils.toPredicate(filterInfo.getPbPredicate(), rowType);
+ if (resolvedFilter != null) {
+ readContext =
+ LogRecordReadContext.createArrowReadContext(
+ rowType, schemaIdForContext,
schemaGetter);
+ predicateResolver =
+ new PredicateSchemaResolver(
+ resolvedFilter, schemaIdForContext,
schemaGetter);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to initialize filter context for {}, "
+ + "falling back to unfiltered read.",
+ tableBucket,
+ e);
+ // Safe fallback: reset all variables to ensure consistent
null state,
+ // so the read proceeds as if no filter was requested.
+ resolvedFilter = null;
+ if (readContext != null) {
+ try {
+ readContext.close();
+ } catch (Exception closeEx) {
+ LOG.debug("Failed to close readContext for {}",
tableBucket, closeEx);
+ }
+ }
+ readContext = null;
+ predicateResolver = null;
+ }
+ }
+
+ FetchDataInfo fetchDataInfo;
+ try {
+ fetchDataInfo =
+ logTablet.read(
+ readOffset,
+ fetchParams.maxFetchBytes(),
+ fetchParams.isolation(),
+ fetchParams.minOneMessage(),
+ fetchParams.projection(),
+ resolvedFilter,
+ readContext,
+ predicateResolver);
+ } finally {
+ // Close readContext eagerly — it is only used for statistics
extraction during
+ // batch filtering and is NOT referenced by the returned
FetchDataInfo records.
+ if (readContext != null) {
+ try {
+ readContext.close();
+ } catch (Exception e) {
+ LOG.debug("Failed to close readContext for {}",
tableBucket, e);
+ }
Review Comment:
nit: can use `IOUtils.closeQuietly(..)` to close it.
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java:
##########
@@ -540,19 +647,154 @@ public FetchDataInfo read(
}
}
- public void changeFileSuffixes(String oldSuffix, String newSuffix) throws
IOException {
- fileLogRecords.renameTo(
- new File(
- FileUtils.replaceSuffix(
- fileLogRecords.file().getPath(), oldSuffix,
newSuffix)));
- lazyOffsetIndex.renameTo(
- new File(
- FileUtils.replaceSuffix(
- lazyOffsetIndex.file().getPath(), oldSuffix,
newSuffix)));
- lazyTimeIndex.renameTo(
- new File(
- FileUtils.replaceSuffix(
- lazyTimeIndex.file().getPath(), oldSuffix,
newSuffix)));
+ // NOTE: Server-side filter pushdown currently only works for ARROW format
log tables.
+ // INDEXED and COMPACTED formats use V0 batch magic which does not include
batch-level
+ // statistics (min/max/nullCount). Without statistics, the predicate
cannot be evaluated
+ // against a batch, so all batches are included as a safe fallback.
+ @Nullable
+ private FetchDataInfo readWithFilter(
+ long startOffset,
+ int maxSize,
+ long maxPosition,
+ boolean minOneMessage,
+ @Nullable FileLogProjection projection,
+ Predicate recordBatchFilter,
+ LogRecordBatch.ReadContext readContext,
+ @Nullable PredicateSchemaResolver predicateResolver)
+ throws IOException {
+
+ if (maxSize < 0) {
+ throw new IllegalArgumentException(
+ "Invalid max size " + maxSize + " for log read from
segment " + fileLogRecords);
+ }
+
+ // Use translateOffset to precisely locate the starting position, same
as readWithoutFilter
+ FileLogRecords.LogOffsetPosition startOffsetAndSize =
translateOffset(startOffset, 0);
+ if (startOffsetAndSize == null) {
+ return null;
+ }
+ int startPosition = startOffsetAndSize.getPosition();
+
+ // Iterate batches from the translated position.
+ // Note: AbstractIterator doesn't implement AutoCloseable, so no
explicit cleanup needed.
+ // The iterator's internal state will be garbage collected when this
method returns.
+ AbstractIterator<FileChannelLogRecordBatch> iter =
+ fileLogRecords.batchIterator(startPosition, (int) maxPosition);
+
+ MultiBytesView.Builder builder = null;
+ int accumulatedSize = 0;
+ FileChannelLogRecordBatch firstIncludedBatch = null;
+ FileChannelLogRecordBatch lastIncludedBatch = null;
Review Comment:
lastIncludedBatch is not used, remove
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]