Hi Yang,

Thanks for driving this excellent work! I left some comments below:

*1. "If we use a filter() like Fluent API, we may lead users to
misunderstand the real semantics of the interface."*

I don't think this will give users the impression that the server side
performs row-by-row filtering, because we have already provides similar
methods like "project()" and "limit()", they all not row-by-row operations.
We can add necessary Javadocs for the methods to make it clear it performs
a server-sde recordbatch-level filtering.

Personally, I don't like "createLogScanner(Predicate recordBatchFilter)"
because it will explode the method when we have more filter types and
parameters.  The existing builder API pattern has better extensibility.
Besides, the filter pushdown is record-batch-level today, but maybe
file-level in the future, and that optimization should be transparent for
users, and don't require a API change.

*2. The "Predicate.proto" file*

The FIP introduced a dedicated "Predicate.proto" file for the predicate
expressions, but it breaks the fluss single-file RPC definition. I don't
see benefits to breaks into mulitple proto files, but make it hard for
3rd-parts to understand the Fluss RPC definition. Therefore, I suggest just
put the predicate definition in "FlussApi.proto"

*3. Suggestions on some Predicate proto definitions*
(1) no definition of PbLiteralValue, this is a public API, and shouldn't
omit in FIP. API is the most important part to disucss in FIP, even if it
will make the FIP large.
(2) Can we use an int index to represent the PbFieldRef? The server side
has the schema information of the table, so it can derive the data types
and column names in server side. We also does this for projection pushdown.
(3)

*4. Statistics Data format*

The Statistics data format is the most critical part in this FIP and should
be drawn like the RecordBatch Format. I think we have many topics to
discuss with the Statistics data format:
(1) Statistics Data should be before or after Records Data? I think put
statistics with header together can make filtering record batch faster as
it can reduce to one IO.
(2) Can it statistics format support lazily deserialize for sepcific
columns? Considering 1000 columsn with only filtering one column, the
deserialization cost maybe larger than direct read without filtering.
(3) How the statistics format evolve to future statistics?


*5. FilteredLogRecordBatchIterator reads each RecordBatch*

You said the FilteredLogRecordBatchIterator will try to read the statistics
information from each the RecordBatch. I'm afraid the performance
degression introduced by this. Because fluss reads chunks without
deserialization each record batch header by default (if no projection
pushdown). However, if there is a filter pushdown, even if the filter
doesn't filter anything, and even if the log table doesn't have statistics,
it fallback to read each batch header with will be much slower than before.

*6. Migration Strategy: Deploy new version with feature flag disabled*

The migration strategy you replied to Yuxia sounds very complex and
error-prone to me.

I think an easier to for the migration or backward-compatibility is
introducing a table-level statistic configuration, such as
"table.log.statistic.enabled". Every table created in the new version
cluster, the table will be added this configuration with "true" value
unless users disable it explicilty. Then for the old tables, we still keep
the old behavior that not generating the statistics as it requires the
client upgrade. For the new tables enabled the config, it requires an
upgrade of the client, otherwise, the client or server should throw an
exception about no statistics.


Best,
Jark



On Fri, 8 Aug 2025 at 13:46, Yang Wang <[email protected]> wrote:

> 发件人: Yang Wang <[email protected]>
> Date: 2025年8月8日周五 11:17
> Subject: Re: [DISCUSS] FIP-10: Support Log RecordBatch Filter Pushdown
> To: <[email protected]>
>
>
> Hi Cheng,
>
> > Can we clarify that this filter evaluation works on a best-effort basis
> at the beginning of the FIP document? Specifically, it only performs
> coarse-grained block skipping by leveraging RecordBatch statistics.&nbsp;To
> be honest, the table.newScan().filter(recordBatchFilter) API gave me the
> impression that the server side performs row-by-row filtering.&nbsp;
>
> This question relates to what I want to apologize to @HongShun again for,
> as my reply to his review yesterday was not well considered. I will clarify
> that the previously designed API:
>
> > LogScanner createLogScanner(Predicate recordBatchFilter);
>
> It can clearly hint to the user that the filter is responsible for
> filtering recordBatch only (not at the row level) for log tables. If we use
> a filter() like Fluent API, we may lead users to misunderstand the real
> semantics of the interface.
>
> Best regards,
> Yang
>
>
> Wang Cheng <[email protected]> 于2025年8月8日周五 08:38写道:
>
> > Thanks Yang for driving this work.
> >
> >
> > Can we clarify that this filter evaluation works on a best-effort basis
> at
> > the beginning of the FIP document? Specifically, it only performs
> > coarse-grained block skipping by leveraging RecordBatch
> statistics.&nbsp;To
> > be honest, the table.newScan().filter(recordBatchFilter) API gave me the
> > impression that the server side performs row-by-row filtering.&nbsp;
> >
> >
> >
> > Regards,
> > Cheng
> >
> >
> >
> > &nbsp;
> >
> >
> >
> >
> > ------------------ Original ------------------
> > From:
> >                                                   "dev"
> >                                                                 <
> > [email protected]&gt;;
> > Date:&nbsp;Thu, Aug 7, 2025 11:11 AM
> > To:&nbsp;"dev"<[email protected]&gt;;
> >
> > Subject:&nbsp;[DISCUSS] FIP-10: Support Log RecordBatch Filter Pushdown
> >
> >
> >
> > Hello Fluss Community,
> >
> > I propose initiating discussion on FIP-10: Support Log RecordBatch Filter
> > Pushdown (
> >
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-10%3A+Support+Log+RecordBatch+Filter+Pushdown
> > ).
> > This optimization aims to improve the performance of Log table queries
> and
> > is now ready for community feedback.
> >
> > This FIP introduces RecordBatch-level filter pushdown to enable early
> > filtering at the storage layer, thereby optimizing CPU, memory, and
> network
> > resources by skipping non-matching log record batches.
> >
> > A proof-of-concept (PoC) has been implemented in the logfilter branch in
> > https://github.com/platinumhamburg/fluss and is ready for testing and
> > preview.
>

Reply via email to