twalthr commented on code in PR #27502: URL: https://github.com/apache/flink/pull/27502#discussion_r2788511874
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java: ########## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * A sink materializer that buffers records and compacts them on watermark progression. + * + * <p>This operator implements the watermark-based compaction algorithm from FLIP-558 for handling + * changelog disorder when the upsert key differs from the sink's primary key. + */ +public class WatermarkCompactingSinkMaterializer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class); + + private static final String STATE_CLEARED_WARN_MSG = + "The state is cleared because of state TTL. This will result in incorrect result. " + + "You can increase the state TTL to avoid this."; + private static final Set<String> ORDERED_STATE_BACKENDS = Set.of("rocksdb", "forst"); + + private final InsertConflictStrategy conflictStrategy; + private final TypeSerializer<RowData> serializer; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser; + private final int[] inputUpsertKey; + private final boolean hasUpsertKey; + private final RowType keyType; + private final String[] primaryKeyNames; + + private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor; + private transient MapState<Long, List<RowData>> buffer; Review Comment: please document what is in this buffer ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java: ########## @@ -390,6 +406,36 @@ private OneInputStreamOperator<RowData, RowData> createSumOperator( GeneratedRecordEqualiser rowEqualiser, GeneratedHashFunction rowHashFunction) { + // Check if we should use the watermark-compacting materializer for ERROR/NOTHING strategies + if (isErrorOrNothingConflictStrategy()) { + // Extract primary key column names and key type + int[] primaryKeys = Review Comment: nit: we have primaryKeys and pkFieldNames available on the caller side, we could pass them instead of recompute ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java: ########## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * A sink materializer that buffers records and compacts them on watermark progression. + * + * <p>This operator implements the watermark-based compaction algorithm from FLIP-558 for handling + * changelog disorder when the upsert key differs from the sink's primary key. + */ +public class WatermarkCompactingSinkMaterializer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class); + + private static final String STATE_CLEARED_WARN_MSG = + "The state is cleared because of state TTL. This will result in incorrect result. " + + "You can increase the state TTL to avoid this."; + private static final Set<String> ORDERED_STATE_BACKENDS = Set.of("rocksdb", "forst"); + + private final InsertConflictStrategy conflictStrategy; + private final TypeSerializer<RowData> serializer; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser; + private final int[] inputUpsertKey; + private final boolean hasUpsertKey; + private final RowType keyType; + private final String[] primaryKeyNames; + + private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor; + private transient MapState<Long, List<RowData>> buffer; + private transient ValueState<RowData> currentValue; Review Comment: please document what is in this currentValue ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java: ########## @@ -0,0 +1,743 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBefore; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link WatermarkCompactingSinkMaterializer}. */ +class WatermarkCompactingSinkMaterializerTest { + + private static final int PRIMARY_KEY_INDEX = 1; + private static final String PRIMARY_KEY_NAME = "pk"; + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final GeneratedRecordEqualiser RECORD_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert first record (watermark is MIN_VALUE) + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); // Buffered, waiting for watermark + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // Update with same upsert key (this is the expected pattern for single-source updates) + harness.processElement(updateAfterRecord(1L, 1, "a2")); + assertEmitsNothing(harness); + + // Advance watermark again + harness.processWatermark(200L); + assertEmits(harness, updateAfter(1L, 1, "a2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // Delete and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, delete(1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and delete before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testDoNothingKeepsFirstRecord() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should keep the first record + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "first")); + } + } + + @Test + void testDoErrorThrowsOnConflict() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should throw exception with key info + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation") + .hasMessageContaining("[pk=1]"); + } + } + + @Test + void testDoErrorAllowsSameUpsertKey() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with same upsert key (updates to same source) + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processElement(updateAfterRecord(1L, 1, "v2")); + + // Compact - should not throw, just keep the latest + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "v2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testChangelogDisorderHandling(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Simulate changelog disorder from FLIP-558 example: + // Records from different sources (different upsert keys: 1L and 2L) map to same PK (1) + // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1) + // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1) Review Comment: do we want to test the second disorder case as well? or is this already tested somewhere? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java: ########## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * A sink materializer that buffers records and compacts them on watermark progression. + * + * <p>This operator implements the watermark-based compaction algorithm from FLIP-558 for handling + * changelog disorder when the upsert key differs from the sink's primary key. + */ +public class WatermarkCompactingSinkMaterializer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class); + + private static final String STATE_CLEARED_WARN_MSG = + "The state is cleared because of state TTL. This will result in incorrect result. " Review Comment: ```suggestion "The state is cleared because of state TTL. This will lead to incorrect results. " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
