david-streamlio commented on code in PR #25967:
URL: https://github.com/apache/pulsar/pull/25967#discussion_r3389157865
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
+
+### Change 2: Add `IncrementalWindowFunction<X, T>` interface
+
+**Path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/IncrementalWindowFunction.java`
+
+```java
+@FunctionalInterface
+public interface IncrementalWindowFunction<X, T> {
+ /**
+ * Process the triggered window.
+ *
+ * @param inputWindow the window view for this activation, providing
access to
+ * all current events ({@link Window#get()}),
+ * newly added events ({@link Window#getNew()}), and
+ * expired events ({@link Window#getExpired()}).
Review Comment:
In addition to the list-mutability question already raised on this PR,
please document the **lifetime** of the `Window` reference: is it valid only
during the `process()` call, or may a user retain it across triggers?
Lifetime/ownership contracts matter once this interface is public.
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
Review Comment:
Promoting an internal type to public API is exactly the surface the PIP
process exists to scrutinize, so this shouldn't be left as an either/or
("replaced by a reference … or removed entirely"). Please commit to one
approach and spell out what happens to any existing references to the old
`org.apache.pulsar.functions.windowing.Window` (even though it's an internal
package today).
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
+
+### Change 2: Add `IncrementalWindowFunction<X, T>` interface
+
+**Path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/IncrementalWindowFunction.java`
+
+```java
+@FunctionalInterface
+public interface IncrementalWindowFunction<X, T> {
+ /**
+ * Process the triggered window.
+ *
+ * @param inputWindow the window view for this activation, providing
access to
+ * all current events ({@link Window#get()}),
+ * newly added events ({@link Window#getNew()}), and
+ * expired events ({@link Window#getExpired()}).
+ * @param context the window function context
+ * @return the output, or {@code null} to suppress output
+ */
+ T process(Window<Record<X>> inputWindow, WindowContext context) throws
Exception;
+}
+```
+
+#### Example: sliding-window sum
+
+```java
+/**
+ * Maintains the sum of integer values in the current sliding window
incrementally.
+ */
+public class SlidingWindowSumFunction implements
IncrementalWindowFunction<Integer, Integer> {
+
+ private static final String RUNNING_SUM_KEY = "running-sum";
+
+ @Override
+ public Integer process(Window<Record<Integer>> window, WindowContext
context) throws Exception {
+ long newEventsSum = 0;
+ for (Record<Integer> record : window.getNew()) {
+ newEventsSum += record.getValue();
+ }
+ long expiredSum = 0;
+ for (Record<Integer> record : window.getExpired()) {
+ expiredSum += record.getValue();
+ }
+ long netDelta = newEventsSum - expiredSum;
+ if (netDelta != 0) {
+ context.incrCounter(RUNNING_SUM_KEY, netDelta);
+ }
+ return (int) context.getCounter(RUNNING_SUM_KEY);
+ }
+}
+```
+
+### Change 3: Update `WindowFunctionExecutor`
+
+**Path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java`
+
+#### 3a. Add field
+
+```java
+protected IncrementalWindowFunction<T, X> incrementalWindowFunction;
+```
+
+#### 3b. Extend `initializeUserFunction()`
+
+Detect `IncrementalWindowFunction` via `instanceof` in
`initializeUserFunction()`, following the same pattern used for
`WindowFunction` today:
+
+```java
+@SuppressWarnings("unchecked")
+private void initializeUserFunction(WindowConfig windowConfig) {
+ // ...
+ if (userClassObject instanceof java.util.function.Function) {
+ // existing logic, unchanged
+ bareWindowFunction = ...;
+ } else if (userClassObject instanceof IncrementalWindowFunction) {
+ incrementalWindowFunction = (IncrementalWindowFunction<T, X>)
userClassObject;
+ } else if (userClassObject instanceof WindowFunction) {
+ // existing logic, unchanged
+ windowFunction = (WindowFunction<T, X>) userClassObject;
+ } else {
+ throw new IllegalArgumentException("Window function does not implement
the correct interface");
+ }
+}
+```
+
+#### 3c. Update `process(Window<Record<T>>, WindowContext)`
+
+```java
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ if (this.bareWindowFunction != null) {
+ Collection<T> values = inputWindow.get().stream()
+ .map(Record::getValue).collect(Collectors.toList());
+ return this.bareWindowFunction.apply(values);
+ } else if (this.incrementalWindowFunction != null) {
+ // pass the full Window view; user can access getNew() / getExpired()
+ return this.incrementalWindowFunction.process(inputWindow, context);
+ } else {
+ // existing behavior: pass full message collection only
+ return this.windowFunction.process(inputWindow.get(), context);
+ }
+}
+```
+
+### Change 4: Update deployment validation (`functions/utils`)
+
+Submit-time validation must accept `IncrementalWindowFunction` the same way it
already accepts `WindowFunction`.
+
+| File | Change |
+|------|--------|
+| `FunctionConfigUtils.doJavaChecks()` | Add `IncrementalWindowFunction` to
the allowed user-class interfaces. |
+| `FunctionCommon.getFunctionClassParent()` | When `windowConfig` is set,
resolve `IncrementalWindowFunction` before `WindowFunction` so input/output
type inference for SerDe and schema checks stays correct. |
+
Review Comment:
Not required by the template, but reviewers usually ask: a sentence on
intended test coverage (executor dispatch for each interface type, and
deployment-validation acceptance of the new interface) would strengthen the
proposal.
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
+
+### Change 2: Add `IncrementalWindowFunction<X, T>` interface
+
+**Path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/IncrementalWindowFunction.java`
+
+```java
+@FunctionalInterface
+public interface IncrementalWindowFunction<X, T> {
+ /**
+ * Process the triggered window.
+ *
+ * @param inputWindow the window view for this activation, providing
access to
+ * all current events ({@link Window#get()}),
+ * newly added events ({@link Window#getNew()}), and
+ * expired events ({@link Window#getExpired()}).
+ * @param context the window function context
+ * @return the output, or {@code null} to suppress output
+ */
+ T process(Window<Record<X>> inputWindow, WindowContext context) throws
Exception;
+}
+```
+
+#### Example: sliding-window sum
+
+```java
+/**
+ * Maintains the sum of integer values in the current sliding window
incrementally.
+ */
+public class SlidingWindowSumFunction implements
IncrementalWindowFunction<Integer, Integer> {
+
+ private static final String RUNNING_SUM_KEY = "running-sum";
+
+ @Override
+ public Integer process(Window<Record<Integer>> window, WindowContext
context) throws Exception {
+ long newEventsSum = 0;
+ for (Record<Integer> record : window.getNew()) {
+ newEventsSum += record.getValue();
+ }
+ long expiredSum = 0;
+ for (Record<Integer> record : window.getExpired()) {
+ expiredSum += record.getValue();
+ }
+ long netDelta = newEventsSum - expiredSum;
+ if (netDelta != 0) {
+ context.incrCounter(RUNNING_SUM_KEY, netDelta);
+ }
+ return (int) context.getCounter(RUNNING_SUM_KEY);
+ }
+}
+```
+
+### Change 3: Update `WindowFunctionExecutor`
+
+**Path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java`
+
+#### 3a. Add field
+
+```java
+protected IncrementalWindowFunction<T, X> incrementalWindowFunction;
+```
+
+#### 3b. Extend `initializeUserFunction()`
+
+Detect `IncrementalWindowFunction` via `instanceof` in
`initializeUserFunction()`, following the same pattern used for
`WindowFunction` today:
+
+```java
+@SuppressWarnings("unchecked")
+private void initializeUserFunction(WindowConfig windowConfig) {
+ // ...
+ if (userClassObject instanceof java.util.function.Function) {
+ // existing logic, unchanged
+ bareWindowFunction = ...;
+ } else if (userClassObject instanceof IncrementalWindowFunction) {
Review Comment:
The dispatch order here is `Function` → `IncrementalWindowFunction` →
`WindowFunction`. A user class implementing both `IncrementalWindowFunction`
and `WindowFunction` (or both `Function` and `IncrementalWindowFunction`)
resolves by this precedence. Since that becomes an observable public-API
contract, please state the ordering explicitly and confirm it's intentional.
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
+
+### Change 2: Add `IncrementalWindowFunction<X, T>` interface
+
+**Path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/IncrementalWindowFunction.java`
+
+```java
+@FunctionalInterface
+public interface IncrementalWindowFunction<X, T> {
+ /**
+ * Process the triggered window.
+ *
+ * @param inputWindow the window view for this activation, providing
access to
+ * all current events ({@link Window#get()}),
+ * newly added events ({@link Window#getNew()}), and
+ * expired events ({@link Window#getExpired()}).
+ * @param context the window function context
+ * @return the output, or {@code null} to suppress output
+ */
+ T process(Window<Record<X>> inputWindow, WindowContext context) throws
Exception;
+}
+```
+
+#### Example: sliding-window sum
+
+```java
+/**
+ * Maintains the sum of integer values in the current sliding window
incrementally.
+ */
+public class SlidingWindowSumFunction implements
IncrementalWindowFunction<Integer, Integer> {
+
+ private static final String RUNNING_SUM_KEY = "running-sum";
+
+ @Override
+ public Integer process(Window<Record<Integer>> window, WindowContext
context) throws Exception {
+ long newEventsSum = 0;
+ for (Record<Integer> record : window.getNew()) {
+ newEventsSum += record.getValue();
+ }
+ long expiredSum = 0;
+ for (Record<Integer> record : window.getExpired()) {
+ expiredSum += record.getValue();
+ }
+ long netDelta = newEventsSum - expiredSum;
+ if (netDelta != 0) {
+ context.incrCounter(RUNNING_SUM_KEY, netDelta);
+ }
+ return (int) context.getCounter(RUNNING_SUM_KEY);
+ }
+}
+```
+
+### Change 3: Update `WindowFunctionExecutor`
+
+**Path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java`
+
+#### 3a. Add field
+
+```java
+protected IncrementalWindowFunction<T, X> incrementalWindowFunction;
+```
+
+#### 3b. Extend `initializeUserFunction()`
+
+Detect `IncrementalWindowFunction` via `instanceof` in
`initializeUserFunction()`, following the same pattern used for
`WindowFunction` today:
+
+```java
+@SuppressWarnings("unchecked")
+private void initializeUserFunction(WindowConfig windowConfig) {
+ // ...
+ if (userClassObject instanceof java.util.function.Function) {
+ // existing logic, unchanged
+ bareWindowFunction = ...;
+ } else if (userClassObject instanceof IncrementalWindowFunction) {
+ incrementalWindowFunction = (IncrementalWindowFunction<T, X>)
userClassObject;
+ } else if (userClassObject instanceof WindowFunction) {
+ // existing logic, unchanged
+ windowFunction = (WindowFunction<T, X>) userClassObject;
+ } else {
+ throw new IllegalArgumentException("Window function does not implement
the correct interface");
+ }
+}
+```
+
+#### 3c. Update `process(Window<Record<T>>, WindowContext)`
+
+```java
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ if (this.bareWindowFunction != null) {
+ Collection<T> values = inputWindow.get().stream()
+ .map(Record::getValue).collect(Collectors.toList());
+ return this.bareWindowFunction.apply(values);
+ } else if (this.incrementalWindowFunction != null) {
+ // pass the full Window view; user can access getNew() / getExpired()
+ return this.incrementalWindowFunction.process(inputWindow, context);
+ } else {
+ // existing behavior: pass full message collection only
+ return this.windowFunction.process(inputWindow.get(), context);
+ }
+}
+```
+
+### Change 4: Update deployment validation (`functions/utils`)
+
+Submit-time validation must accept `IncrementalWindowFunction` the same way it
already accepts `WindowFunction`.
+
+| File | Change |
+|------|--------|
+| `FunctionConfigUtils.doJavaChecks()` | Add `IncrementalWindowFunction` to
the allowed user-class interfaces. |
+| `FunctionCommon.getFunctionClassParent()` | When `windowConfig` is set,
resolve `IncrementalWindowFunction` before `WindowFunction` so input/output
type inference for SerDe and schema checks stays correct. |
+
+
+## Public-facing Changes
+
+### Public API
+
+#### New interface: `org.apache.pulsar.functions.api.Window<T>`
+
+Promoted from the internal package.
+
+| Method | Description |
+|--------|-------------|
+| `List<T> get()` | All events in the current window |
+| `List<T> getNew()` | Events added since the last trigger |
+| `List<T> getExpired()` | Events removed since the last trigger |
+| `Long getStartTimestamp()` | Window start time (non-null for time-based
windows, otherwise `null`) |
+| `Long getEndTimestamp()` | Window end time (watermark in event-time mode,
system time in processing-time mode) |
Review Comment:
`getStartTimestamp()` documents its null behavior, but `getEndTimestamp()`'s
description implies it is never null. Please confirm and capture this in the
Javadoc, since both methods are now public.
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
+
+### Change 2: Add `IncrementalWindowFunction<X, T>` interface
+
+**Path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/IncrementalWindowFunction.java`
+
+```java
+@FunctionalInterface
+public interface IncrementalWindowFunction<X, T> {
+ /**
+ * Process the triggered window.
+ *
+ * @param inputWindow the window view for this activation, providing
access to
+ * all current events ({@link Window#get()}),
+ * newly added events ({@link Window#getNew()}), and
+ * expired events ({@link Window#getExpired()}).
+ * @param context the window function context
+ * @return the output, or {@code null} to suppress output
+ */
+ T process(Window<Record<X>> inputWindow, WindowContext context) throws
Exception;
+}
+```
+
+#### Example: sliding-window sum
+
+```java
+/**
+ * Maintains the sum of integer values in the current sliding window
incrementally.
+ */
+public class SlidingWindowSumFunction implements
IncrementalWindowFunction<Integer, Integer> {
+
+ private static final String RUNNING_SUM_KEY = "running-sum";
+
+ @Override
+ public Integer process(Window<Record<Integer>> window, WindowContext
context) throws Exception {
+ long newEventsSum = 0;
+ for (Record<Integer> record : window.getNew()) {
+ newEventsSum += record.getValue();
+ }
+ long expiredSum = 0;
+ for (Record<Integer> record : window.getExpired()) {
+ expiredSum += record.getValue();
+ }
+ long netDelta = newEventsSum - expiredSum;
+ if (netDelta != 0) {
+ context.incrCounter(RUNNING_SUM_KEY, netDelta);
+ }
+ return (int) context.getCounter(RUNNING_SUM_KEY);
+ }
+}
+```
+
+### Change 3: Update `WindowFunctionExecutor`
+
+**Path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java`
+
+#### 3a. Add field
+
+```java
+protected IncrementalWindowFunction<T, X> incrementalWindowFunction;
+```
+
+#### 3b. Extend `initializeUserFunction()`
+
+Detect `IncrementalWindowFunction` via `instanceof` in
`initializeUserFunction()`, following the same pattern used for
`WindowFunction` today:
+
+```java
+@SuppressWarnings("unchecked")
+private void initializeUserFunction(WindowConfig windowConfig) {
+ // ...
+ if (userClassObject instanceof java.util.function.Function) {
+ // existing logic, unchanged
+ bareWindowFunction = ...;
+ } else if (userClassObject instanceof IncrementalWindowFunction) {
+ incrementalWindowFunction = (IncrementalWindowFunction<T, X>)
userClassObject;
+ } else if (userClassObject instanceof WindowFunction) {
+ // existing logic, unchanged
+ windowFunction = (WindowFunction<T, X>) userClassObject;
+ } else {
+ throw new IllegalArgumentException("Window function does not implement
the correct interface");
+ }
+}
+```
+
+#### 3c. Update `process(Window<Record<T>>, WindowContext)`
+
+```java
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ if (this.bareWindowFunction != null) {
+ Collection<T> values = inputWindow.get().stream()
+ .map(Record::getValue).collect(Collectors.toList());
+ return this.bareWindowFunction.apply(values);
+ } else if (this.incrementalWindowFunction != null) {
+ // pass the full Window view; user can access getNew() / getExpired()
+ return this.incrementalWindowFunction.process(inputWindow, context);
+ } else {
+ // existing behavior: pass full message collection only
+ return this.windowFunction.process(inputWindow.get(), context);
+ }
+}
+```
+
+### Change 4: Update deployment validation (`functions/utils`)
+
+Submit-time validation must accept `IncrementalWindowFunction` the same way it
already accepts `WindowFunction`.
+
+| File | Change |
+|------|--------|
+| `FunctionConfigUtils.doJavaChecks()` | Add `IncrementalWindowFunction` to
the allowed user-class interfaces. |
+| `FunctionCommon.getFunctionClassParent()` | When `windowConfig` is set,
resolve `IncrementalWindowFunction` before `WindowFunction` so input/output
type inference for SerDe and schema checks stays correct. |
+
+
+## Public-facing Changes
+
+### Public API
+
+#### New interface: `org.apache.pulsar.functions.api.Window<T>`
+
+Promoted from the internal package.
+
+| Method | Description |
+|--------|-------------|
+| `List<T> get()` | All events in the current window |
+| `List<T> getNew()` | Events added since the last trigger |
+| `List<T> getExpired()` | Events removed since the last trigger |
+| `Long getStartTimestamp()` | Window start time (non-null for time-based
windows, otherwise `null`) |
+| `Long getEndTimestamp()` | Window end time (watermark in event-time mode,
system time in processing-time mode) |
+
+#### New interface:
`org.apache.pulsar.functions.api.IncrementalWindowFunction<X, T>`
+
+New public interface.
+
+| Method | Description |
+|--------|-------------|
+| `T process(Window<Record<X>> inputWindow, WindowContext context)` |
User-implemented window logic with access to incremental and expired events |
+
+### Configuration
+
+No new `WindowConfig` fields or CLI options. Existing window settings
(`windowLength*`, `slidingInterval*`, event-time options, etc.) apply unchanged.
+
+At runtime, `WindowFunctionExecutor` auto-detects `IncrementalWindowFunction`
via `instanceof`. At submit time, `FunctionConfigUtils.doJavaChecks()` and
`FunctionCommon.getFunctionClassParent()` are updated to accept the new
interface (see Change 4).
+
+
+# Backward & Forward Compatibility
+
+## Existing `WindowFunction` users
+
+**Fully backward compatible.** The
`WindowFunctionExecutor.initializeUserFunction()` detection path for
`WindowFunction` is unchanged; all existing implementations behave identically
after upgrade.
+
+## Upgrade
+
+No special steps required. After upgrading to a Pulsar version that includes
this feature, the new interfaces are available immediately.
+
+## Downgrade / Rollback
+
+To roll back to a version without this feature:
+
+- User functions that implement `IncrementalWindowFunction` must be rewritten
to implement `WindowFunction`, replacing `getNew()` / `getExpired()` logic with
manual diffing over the full message collection, before they can be deployed on
the older version.
+
+## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+
+There is no wire-protocol change between Functions Workers. No special
geo-replication considerations apply.
Review Comment:
The template requires three sections that are currently missing — could you
add them, even if brief?
- **Security Considerations** — this is a pure API addition with no new
endpoints, so a sentence confirming "no new REST/protocol surface, no new auth
or multi-tenancy implications" is enough.
- **Monitoring / Metrics** — please state explicitly "no new metrics;
runtime behavior is unchanged."
- **Alternatives** — the most important one. Why a *new* interface rather
than (a) `default` methods on `WindowFunction`, (b) an overloaded
`process(Window, ...)`, or (c) a config flag? Documenting why these were
rejected will pre-empt the obvious review questions. It's also the right place
to defend the name `IncrementalWindowFunction`, since it exposes expired events
too, not just increments.
##########
pip/pip-484.md:
##########
@@ -0,0 +1,336 @@
+# PIP-484: Expose Incremental Window Events via IncrementalWindowFunction
+
+# Background knowledge
+
+## Pulsar Window Functions
+
+Pulsar Window Functions are a specialized form of Pulsar Function that group
incoming messages into windows based on time or message count, and invoke the
user function with a batch of messages each time a window fires.
+
+Window types:
+
+- **Tumbling window**: adjacent windows do not overlap; each message belongs
to exactly one window.
+- **Sliding window**: adjacent windows may overlap; a message can belong to
multiple windows.
+
+Time semantics:
+
+- **Processing time**: windows are driven by the clock at which messages enter
the system.
+- **Event time**: windows are driven by timestamps embedded in messages, with
watermarks used to track event-time progress.
+
+## Existing public API
+
+```
+pulsar-functions/api-java
+└── org.apache.pulsar.functions.api
+ ├── WindowFunction<X, T> // user-implemented window function
interface
+ └── WindowContext // context interface for window functions
+```
+
+`WindowFunction` signature:
+
+```java
+@FunctionalInterface
+public interface WindowFunction<X, T> {
+ T process(Collection<Record<X>> input, WindowContext context) throws
Exception;
+}
+```
+
+On each trigger, the user function receives a `Collection<Record<X>>`
containing **all messages in the current window**.
+
+## Internal runtime pipeline
+
+```
+pulsar-functions/instance
+└── org.apache.pulsar.functions.windowing
+ ├── Window<T> // window view interface (internal package
today)
+ ├── WindowImpl<T> // Window implementation holding three
event lists
+ ├── WindowManager<T> // window manager; classifies events
+ ├── WindowFunctionExecutor<T,X> // executor bridging runtime and user
function
+ ├── WindowLifecycleListener<T> // window lifecycle callbacks
+ ├── EvictionPolicy<T> // eviction policy (decides when events
expire)
+ └── TriggerPolicy<T> // trigger policy (decides when to fire a
window)
+```
+
+The internal `Window<T>` interface already exposes incremental views:
+
+```java
+public interface Window<T> {
+ List<T> get(); // all events in the current window
+ List<T> getNew(); // events added since the last trigger
+ List<T> getExpired(); // events removed since the last trigger
+ Long getStartTimestamp(); // window start timestamp
+ Long getEndTimestamp(); // window end timestamp (reference time)
+}
+```
+
+# Motivation
+
+## Problem: `getNew()` / `getExpired()` data is discarded at the public API
layer
+
+`WindowManager.onTrigger()` already classifies events into three categories on
every window activation:
+
+| Category | Meaning |
+|----------|---------|
+| `tuples` | all events currently in the window |
+| `newTuples` | events newly added since the last trigger |
+| `expiredTuples` | events removed since the last trigger |
+
+These three lists are passed into `WindowImpl` and delivered to the executor
via `WindowLifecycleListener.onActivation()`. However,
`WindowFunctionExecutor.process(Window, WindowContext)` only passes
`inputWindow.get()` to the user function; newly added and expired events are
discarded:
+
+```java
+// WindowFunctionExecutor.java (current implementation)
+public X process(Window<Record<T>> inputWindow, WindowContext context) throws
Exception {
+ // ...
+ return this.windowFunction.process(inputWindow.get(), context); // full
window only; getNew()/getExpired() dropped
+}
+```
+
+## Impact
+
+Users cannot perform efficient incremental computation. Typical affected
scenarios:
+
+1. **Incremental aggregation** (sliding-window statistics): on each trigger
most messages in the window are unchanged; re-scanning the full collection is
wasteful.
+2. **State maintenance**: when external state must track which messages
entered or left the window, users must diff full collections manually —
inefficient and error-prone.
+3. **Expired-event handling**: side effects such as resource release or
counter decrements when messages leave the window.
+
+# Goals
+
+## In Scope
+
+- Expose the `Window<T>` interface in the public API (including `getNew()`,
`getExpired()`, and timestamp methods).
+- Add a new public `IncrementalWindowFunction<X, T>` interface so users can
receive the full `Window<Record<X>>` view.
+- Have `WindowFunctionExecutor` transparently support the new interface
without requiring configuration or deployment changes.
+- Update Functions deployment validation (`FunctionConfigUtils.doJavaChecks`)
so `IncrementalWindowFunction` implementations pass the same Java class checks
as `WindowFunction`.
+- Preserve all existing behavior for current `WindowFunction` users.
+
+## Out of Scope
+
+- Incremental support for `java.util.function.Function` (bare window
functions).
+- Equivalent capability for Python / Go Functions.
+- Changes to window state snapshot / checkpoint mechanisms.
+
+# High Level Design
+
+Introduce a new public interface `IncrementalWindowFunction<X, T>` whose
`process` method accepts `Window<Record<X>>` instead of
`Collection<Record<X>>`, giving users access to:
+
+- `window.get()` — all messages in the current window
+- `window.getNew()` — messages added since the last trigger
+- `window.getExpired()` — messages removed since the last trigger
+- `window.getStartTimestamp()` / `window.getEndTimestamp()` — window time
boundaries
+
+`WindowFunctionExecutor` detects at initialization whether the user class
implements `IncrementalWindowFunction`. If so, it passes the `Window` object
directly; otherwise it follows the existing code path.
+
+Data flow (after the change):
+
+```mermaid
+flowchart TD
+ WM["WindowManager.onTrigger()"]
+ WL["onActivation(tuples, newTuples, expiredTuples)"]
+ WE["WindowFunctionExecutor.processWindow()"]
+ WI["WindowImpl(tuples, newTuples, expiredTuples)"]
+
+ subgraph executor ["process(Window, WindowContext) dispatch"]
+ P1["IncrementalWindowFunction\n→ process(window, context)"]
+ P2["WindowFunction\n→ process(window.get(), context)"]
+ P3["bareWindowFunction\n→ apply(values)"]
+ end
+
+ WM --> WL --> WE --> WI --> executor
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Change 1: Move `Window<T>` to `api-java`
+
+**Current path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java`
+
+**New path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Window.java`
+
+The interface methods remain unchanged; only the package declaration and
license header are updated:
+
+```java
+// pulsar-functions/api-java/.../api/Window.java
+public interface Window<T> {
+ List<T> get();
+ List<T> getNew();
+ List<T> getExpired();
+ Long getEndTimestamp();
+ Long getStartTimestamp();
+}
+```
+
+The existing internal `Window.java` is replaced by a reference to the
`api-java` interface (or removed entirely, with `WindowImpl` implementing the
new public interface directly).
+
+### Change 2: Add `IncrementalWindowFunction<X, T>` interface
+
+**Path**:
`pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/IncrementalWindowFunction.java`
+
+```java
+@FunctionalInterface
+public interface IncrementalWindowFunction<X, T> {
+ /**
+ * Process the triggered window.
+ *
+ * @param inputWindow the window view for this activation, providing
access to
+ * all current events ({@link Window#get()}),
+ * newly added events ({@link Window#getNew()}), and
+ * expired events ({@link Window#getExpired()}).
+ * @param context the window function context
+ * @return the output, or {@code null} to suppress output
+ */
+ T process(Window<Record<X>> inputWindow, WindowContext context) throws
Exception;
+}
+```
+
+#### Example: sliding-window sum
+
+```java
+/**
+ * Maintains the sum of integer values in the current sliding window
incrementally.
+ */
+public class SlidingWindowSumFunction implements
IncrementalWindowFunction<Integer, Integer> {
+
+ private static final String RUNNING_SUM_KEY = "running-sum";
+
+ @Override
+ public Integer process(Window<Record<Integer>> window, WindowContext
context) throws Exception {
+ long newEventsSum = 0;
+ for (Record<Integer> record : window.getNew()) {
+ newEventsSum += record.getValue();
+ }
+ long expiredSum = 0;
+ for (Record<Integer> record : window.getExpired()) {
+ expiredSum += record.getValue();
+ }
+ long netDelta = newEventsSum - expiredSum;
+ if (netDelta != 0) {
+ context.incrCounter(RUNNING_SUM_KEY, netDelta);
+ }
+ return (int) context.getCounter(RUNNING_SUM_KEY);
+ }
+}
+```
+
+### Change 3: Update `WindowFunctionExecutor`
+
+**Path**:
`pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java`
+
+#### 3a. Add field
+
+```java
+protected IncrementalWindowFunction<T, X> incrementalWindowFunction;
Review Comment:
The public interface is declared `IncrementalWindowFunction<X, T>` (X=input,
T=output), but this executor field is `<T, X>`. This matches the internal
`WindowFunction<T,X>` convention, so it's defensible — but the doc shows both
orderings without comment, which will trip readers. A one-line note clarifying
the convention would help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]