Hi Hongshun Wang, thanks for your interest in this topic! > To mitigate this, I suggest embedding transaction abort logic directly into > the sink's shutdown path
Flink already has abort logic in the shutdown path, but it is not enough to prevent lingering transactions in the scenarios described in the FLIP, when the Flink process terminates unexpectedly. E.g., when the JVM crashes or is OS-OOM-killed, no shutdown hook runs and no cleanup occurs. On top of that, the shutdown path can only abort, not commit. The data may have been successfully checkpointed by Flink, but the checkpoint completion notification may not have been received yet, so the shutdown path cannot safely execute a commit. Aborting this transaction means losing data that Flink considers successfully delivered. The CLI tool's commit operation is the only way to recover this data. The tool is designed as a complement to Flink's built-in cleanup, not a replacement, it covers the gap that no in-process mechanism can fully close. > the 15-minute transaction timeout window makes it operationally fragile. You are probably referring to the broker's max.transaction.timeout.ms, which defaults to 15 minutes. Flink's default transaction.timeout.ms is 1 hour (see KafkaSinkBuilder.DEFAULT_KAFKA_TRANSACTION_TIMEOUT). The KafkaSink docs highly recommend tweaking the Kafka transaction timeout, otherwise data loss may happen. If data loss is unacceptable, operators may increase it further, making the window where downstream consumers are blocked even longer (hours or days), which strengthens the case for a tool that can resolve the situation immediately. On Tue, 31 Mar 2026 at 04:17, Hongshun Wang <[email protected]> wrote: > > Hi Aleksandr, > > > This scenario occurs when a job with exactly-once KafkaSink is > permanently stopped between a checkpoint and the subsequent commit. > The lingering transaction blocks downstream read_committed consumers > until the transaction timeout fires, and the transaction data is lost > upon abort. > > To mitigate this, I suggest embedding transaction abort logic directly into > the sink's shutdown path. While a separate cleanup job might seem like a > workaround, the 15-minute transaction timeout window makes it operationally > fragile. Integrating this natively would ensure deterministic cleanup > without relying on external triggers or manual intervention. > > Best, > Hongshun > > On Mon, Mar 30, 2026 at 8:13 PM Aleksandr Savonin <[email protected]> > wrote: > > > Hi Hongshun, > > Thank you for the feedback. > > > > > Heise's work under FLIP-511 has already addressed most client-side > > transaction issues. > > > > FLIP-511 improves Flink's built-in recovery. But Flink's native > > recovery only helps when the job restarts. This tool targets scenarios > > described in the FLIP's motivation - any scenario where Flink fails to > > commit or abort a transaction and Flink job is no longer running. In > > these cases there is no Flink runtime to execute the recovery logic, > > and operators currently have no supported tool. > > > > > > > Are you still hitting blockers in production? > > > > This scenario occurs when a job with exactly-once KafkaSink is > > permanently stopped between a checkpoint and the subsequent commit. > > The lingering transaction blocks downstream read_committed consumers > > until the transaction timeout fires, and the transaction data is lost > > upon abort. > > > > > > > The CLI logic seems to duplicate what already runs during Flink Writer > > restart/recovery. Could you clarify why a separate CLI is needed instead of > > relying on the native recovery path? > > > > Flink's committer does resume and commit transactions, but only within > > a running job. When the job is permanently gone, there is no Flink > > process to execute that logic. The CLI gives operators the ability to > > manually commit an abandoned transaction, preserving data that would > > otherwise be lost. > > > > > > > And I also don't think flink dist needs a cli about a connector's cli > > which is better to add in flink kafka connector' repo. > > > > Agreed - this is already proposed as a module within > > flink-connector-kafka, packaged as a standalone uber-jar. It has no > > dependency on flink dist. > > > > > > > This cannot be fixed client-side and only a Kafka broker update (KIP-890 > > [2]) will resolve it. > > > > Fully agreed, that is a different problem. This tool targets the > > complementary case where the transaction is properly tracked by Kafka > > but no Flink process exists to commit or abort it. > > > > Kind regards, > > Aleksandr Savonin > > > > On Mon, 30 Mar 2026 at 04:44, Hongshun Wang <[email protected]> > > wrote: > > > > > > Hi Aleksandr, > > > > > > Thanks for your hard work on this CLI and for proactively surfacing these > > > production scenarios. I really appreciate your contributions. > > > > > > Heise's work under FLIP-511 has already addressed most client-side > > > transaction issues. Are you still hitting blockers in production? > > > > > > The CLI logic seems to duplicate what already runs during Flink Writer > > > restart/recovery. Could you clarify why a separate CLI is needed instead > > of > > > relying on the native recovery path? And I also don't think flink dist > > > needs a cli about a connector's cli which is better to add in flink > > kafka > > > connector' repo. > > > > > > One known gap remains: server-side transaction timeouts without > > > ADD_PARTITIONS_TO_TXN , and server-side terminates the transaction. This > > > cannot be fixed client-side and only a Kafka broker update (KIP-890 [2]) > > > will resolve it. > > > > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector?src=contextnavpagetreemode > > > [2] > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense > > > > > > On Sat, Mar 28, 2026 at 8:12 PM Shekhar Rajak <[email protected] > > > > > > wrote: > > > > > > > Hi, > > > > Curious to know if we debug and analyse the root cause and try to > > > > fix/enhance in the Kafka transaction coordinator side, will this issue > > be > > > > resolved ? Do we really need to think about transaction management tool > > > > specifically for kafka ? > > > > Regards, > > > > Shekhar Rajak, > > > > > > > > > > > > > > > > > > > > On Wednesday 25 March 2026 at 05:51:20 pm GMT+5:30, Aleksandr > > Savonin < > > > > [email protected]> wrote: > > > > > > > > Hi everyone, > > > > I'd like to start a discussion on FLIP-572 [1]. > > > > When a Flink job using exactly-once KafkaSink fails and does not > > recover, > > > > Kafka transactions can remain in the ONGOING state, blocking all > > downstream > > > > read_committed consumers at the Last Stable Offset until the broker > > timeout > > > > expires. > > > > There is currently no built-in tooling to resolve this, Kafka's own > > > > kafka-transactions.sh cannot commit Flink transactions since that > > requires > > > > Flink-specific internals. > > > > This FLIP proposes a standalone CLI tool, that allows operators to > > abort or > > > > commit lingering transactions without a running Flink cluster. > > > > Looking forward to your feedback. > > > > > > > > Kind regards, > > > > Aleksandr Savonin > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-572%3A+Introduce+Flink-Kafka+Transactions+Management+Tool > > > > > > > > > > > > -- > > Kind regards, > > Aleksandr > > -- Kind regards, Aleksandr
