Hey Gustavo, Sorry for missing your feedback, thank you for taking the time to look at the proposal!
I think your point on throwing errors during harness building for table arguments present in the eval but not configured makes sense! In terms of the processing time, you're completely right - I added a method to handle processing time changes, since it would affect how state TTL is also handled. In terms of looking at the other harnesses, you're right that I left out parallelism concerns. I think for now it might be out of scope and warrant a follow up enhancement to the harness. In terms of getting side outputs - would this also be relevant for table functions? I wasn't very familiar with them, but the Flink docs suggests this is more of a datastream thing than a table thing. Kind regards, Mika On Tue, 17 Mar 2026, at 12:29 PM, Gustavo de Morais wrote: > Hey Mika, > > Thanks for driving this! In general, this is useful for prototyping and in > general to speed up testing. So +1. Some feedback from my side: > > >For a PTF that reads from a table argument that hasn't been configured, I > think it would return null, yes. > I think it'd be a better dev UX if the harness throws during build "table > argument 'purchases' is declared in eval() but was never configured. > > >I'm not entirely sure on the real/processing time considerations - my aim > here was mostly around letting users validate timer behaviour, and timer > registration/firing in PTFs is based on watermarks > You've addressed event time and we need to detail in the flip what would be > the approach for processing time. The API would ideally support similar > methods as we have here > https://github.com/confluentinc/flink/blob/a7a8dba2127ad719ca7932969b2934a0955e1bba/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L790-L804. > If you want this to be out of scope, you can mention that explicitly in the > FLIP. > > Also, comparing this to the current harness support for operators, here are > some other relevant things that devs might need out of harness test you > might want to include or mention that they are out of scope: > > - Side outputs: AbstractStreamOperatorTestHarness has > getSideOutput(OutputTag<X>). > - No parallelism config, Snapshot is opaque: The existing harness takes > maxParallelism, parallelism, and subtaskIndex. Key group assignment (and > therefore state distribution) depends on maxParallelism. > AbstractStreamOperatorTestHarness.snapshot() returns OperatorSubtaskState, > which you can pass to repartitionOperatorState() to simulate parallelism > changes > > > Kind regards, > Gustavo > > On Thu, 12 Mar 2026 at 14:38, Mika Naylor <[email protected]> wrote: > > > Hey Martijn, > > > > Thank you for the detailed feedback! > > > > > 1. The FLIP has good examples of harness construction via the builder, > > but > > > doesn't address lifecycle management. For comparison, the existing > > > DataStream test harnesses (documented at [1]) have explicit open() and > > > implement AutoCloseable. Since PTFs can acquire resources in open(), the > > > harness needs to manage this lifecycle. Could you clarify how > > > open()/close() on the underlying PTF is handled? Is close() called > > > automatically, or does the user need to trigger it? An end-to-end example > > > showing cleanup would help. > > > > This is a good point! I must have missed the AutoClosable implementation > > in the public interfaces section - but I did intend to add it! So it would > > be closed automatically (but I suppose a user could also call close() > > itself). For the open, I was planning on calling it during when the harness > > is built using `Builder.build()`. > > > > > 2. The existing operator test harnesses support snapshot() and > > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > > > cycles. This is important for catching state serialization bugs, which > > are > > > a common source of production issues. The FLIP provides > > withInitialState() > > > for setup, but there's no way to take a snapshot mid-test and restore > > into > > > a fresh harness. Are we deliberately excluding this, or should we > > consider > > > adding it? > > > > I would absolutely consider adding this, thank you for pointing it out. I > > think being able to take a `.snapshot()` from a harness and then initialise > > a new harness via a `restore()` on the builder would make sense, as well as > > maybe supporting `.restore()` on the harness itself after it has been built. > > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) can > > > behave differently with heap vs. RocksDB backends due to serialization > > > differences. The existing harnesses support setStateBackend(). Should the > > > PTF test harness support this as well? At minimum, it would be good to > > > document which backend is used by default. > > > > I had just intended to support heap backend and document this, but this is > > a good point - supporting `setStateBackend()` makes sense here, similar to > > the existing harnesses. I'll add this to the spec and document the default. > > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > > > testing join-like PTFs. The builder Javadoc describes when static rows > > are > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > > > delivery, but a few things remain unclear: How is the schema for these > > rows > > > determined: is it inferred from the Row structure, or does it need to > > match > > > the eval() signature's type hints? And what happens if a PTF reads from a > > > table argument that hasn't been configured via the builder: does it > > receive > > > null, or does the harness throw at build time? > > > > I wasn't quite sure what the right approach is here, I thought that > > inferring it from the Row structure would work but it feels odd to ignore > > the eval type hints. Perhaps I can try the Row structure approach, and it > > feels unergonomic explore the second approach. > > > > For a PTF that reads from a table argument that hasnt been configured, I > > think it would return null, yes. > > > > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > > @Nullable. > > > This looks like a bug -> should it be Boolean (boxed)? > > > > Oops, good catch. I'm not sure why I marked this as nullable, either a > > timer has fired or it hasn't, im not sure returning null makes sense. Maybe > > returning a non-nullable primitive is fine here. > > > > > - getOutputByKind(RowKind) implies that output preserves RowKind > > metadata. > > > Could you confirm that getOutput() also retains this? The generic <OUT> > > > type parameter could use more specification on what's guaranteed. > > > > I would like getOutput to somehow retain this, but I'm not quite sure how > > the return type could look like in this case. Perhaps `RowData`? I'm not > > entirely sure if we have an interface that would cleanly capture this. > > > > > - Have you considered optional changelog consistency validation (e.g., > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could > > be a > > > useful debugging aid. > > > > I hadn't, no, but this is a useful idea. Could be togglable on the builder > > with a `.withChangelogValidation` method. > > > > > - What's the error model when eval() or a timer callback throws? > > Propagated > > > directly, or wrapped? > > > > I would say propagated directly, unless you think wrapping them could be > > useful here. > > > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. > > Could > > > you clarify whether the harness will be validated against those > > scenarios, > > > or whether it's intended to replace them for certain use cases? > > > > I think just validated against them, as a way of making sure that the > > harness covers the right set of features we want to capture. I don't think > > it would replace them in this case. > > > > Thank you a ton for the feedback and ideas! I will update the FLIP > > documentation based on them, it's very much appreciated. > > > > Kind regards, > > Mika > > > > On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote: > > > Hey Mika, > > > > > > Thanks for putting this FLIP together. A dedicated test harness for PTFs > > is > > > a welcome addition. The builder-pattern API and the state/timer > > > introspection features are well thought out. > > > > > > I have a few questions and suggestions after reviewing the FLIP: > > > > > > 1. The FLIP has good examples of harness construction via the builder, > > but > > > doesn't address lifecycle management. For comparison, the existing > > > DataStream test harnesses (documented at [1]) have explicit open() and > > > implement AutoCloseable. Since PTFs can acquire resources in open(), the > > > harness needs to manage this lifecycle. Could you clarify how > > > open()/close() on the underlying PTF is handled? Is close() called > > > automatically, or does the user need to trigger it? An end-to-end example > > > showing cleanup would help. > > > > > > 2. The existing operator test harnesses support snapshot() and > > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > > > cycles. This is important for catching state serialization bugs, which > > are > > > a common source of production issues. The FLIP provides > > withInitialState() > > > for setup, but there's no way to take a snapshot mid-test and restore > > into > > > a fresh harness. Are we deliberately excluding this, or should we > > consider > > > adding it? > > > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) can > > > behave differently with heap vs. RocksDB backends due to serialization > > > differences. The existing harnesses support setStateBackend(). Should the > > > PTF test harness support this as well? At minimum, it would be good to > > > document which backend is used by default. > > > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > > > testing join-like PTFs. The builder Javadoc describes when static rows > > are > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > > > delivery, but a few things remain unclear: How is the schema for these > > rows > > > determined: is it inferred from the Row structure, or does it need to > > match > > > the eval() signature's type hints? And what happens if a PTF reads from a > > > table argument that hasn't been configured via the builder: does it > > receive > > > null, or does the harness throw at build time? > > > > > > A few smaller points: > > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > > @Nullable. > > > This looks like a bug -> should it be Boolean (boxed)? > > > - getOutputByKind(RowKind) implies that output preserves RowKind > > metadata. > > > Could you confirm that getOutput() also retains this? The generic <OUT> > > > type parameter could use more specification on what's guaranteed. > > > - Have you considered optional changelog consistency validation (e.g., > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could > > be a > > > useful debugging aid. > > > - What's the error model when eval() or a timer callback throws? > > Propagated > > > directly, or wrapped? > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. > > Could > > > you clarify whether the harness will be validated against those > > scenarios, > > > or whether it's intended to replace them for certain use cases? > > > > > > Overall I'm +1 on the direction. The core API design is clean and covers > > > the main testing needs well. Addressing the lifecycle and > > > checkpoint/restore gaps would bring it in line with what Flink users > > > already have for DataStream UDF testing. > > > > > > Thanks, > > > > > > Martijn > > > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators > > > > > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]> wrote: > > > > > > > Hey David, > > > > > > > > Yeah, I think in terms of scope I aim for more providing a framework > > for > > > > unit testing the behavior of custom PTFs. I'd like to include as much > > > > validation as possible but there might be validation steps that aren't > > > > possible to do without dipping into the engine side of things. > > > > > > > > I'm not entirely sure on the real/processing time considerations - my > > aim > > > > here was mostly around letting users validate timer behaviour, and > > timer > > > > registration/firing in PTFs is based on watermarks, if I read the doc > > > > correctly. > > > > > > > > Kind regards, > > > > Mika > > > > > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote: > > > > > Hi Mika, > > > > > This sounds like a good idea, in terms of scope, Is the idea that > > this > > > > is purely for unit tests or is this additionally proposed as > > validation / > > > > test harness for use when developing custom PTFs. > > > > > I guess this allows us to create a common set of tests that all PTFs > > > > need to pass using this harness. > > > > > > > > > > I would assume there are real (not event) time considerations for > > some > > > > PTFs, it would be worth mentioning how we should handle that. > > > > > > > > > > Kind regards, David. > > > > > > > > > > From: Mika Naylor <[email protected]> > > > > > Date: Tuesday, 3 March 2026 at 16:46 > > > > > To: [email protected] <[email protected]> > > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a > > ProcessTableFunction > > > > Test Harness > > > > > > > > > > Hey everyone! > > > > > > > > > > I would like to kick off a discussion on FLIP-567: Introduce a > > > > ProcessTableFunction Test Harness[1]. > > > > > > > > > > Currently, testing PTFs require full integration tests against a > > running > > > > Flink cluster. This FLIP would introduce a developer-friendly test > > harness > > > > for unit testing PTFs and would provide introspection to output, state, > > > > timers, and watermarks for assertions and behaviour validation. This > > would > > > > let developers iterate and test their PTFs without needing to run a > > > > fullscale integration test against a live Flink cluster. > > > > > > > > > > Would love any thoughts and feedback the community might have on this > > > > proposal. > > > > > > > > > > Kind regards, > > > > > Mika Naylor > > > > > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness > > > > > > > > > > Unless otherwise stated above: > > > > > > > > > > IBM United Kingdom Limited > > > > > Registered in England and Wales with number 741598 > > > > > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > > > > Winchester, Hampshire SO21 2JN > > > > > > > > > > > > > > >
