Dear,
I am currently running a chaos test on a system (essentially starting nodes
that process something and randomly knockign them out). It appeared to work
fine with regular tests but I am seeing occasional duplicate key value
violattions of a uniqueness constraint on one of the complexer CTE-based
queries. Something that only happens with concurrency where nodes restart
and ample load.
I can not reproduce it by taking out the query and running it manually in
PG Admin, it behaves fine if I do so and does exactly what I expect.
The query looks like this (it uses Rust SQLX which is why there is some
unnesting happening on the parameters).
WITH unnested_inputs AS (
SELECT * FROM (
SELECT
unnest($1::uuid[]) AS event_id,
unnest($2::varchar[]) AS type,
unnest($3::int[]) AS version,
unnest($4::uuid[]) AS causation_id,
unnest($5::uuid[]) AS correlation_id,
unnest($6::text[]) AS idempotency_key,
unnest($7::jsonb[]) AS data,
unnest($8::jsonb[]) AS metadata,
unnest($9::text[]) AS subscription_id,
unnest($10::text[]) AS subscription_instance_identifier,
unnest($11::bigint[]) AS applied_order_id
) AS inputs
),
to_update_subscription_logs AS (
SELECT sl.id as subscription_log_id, sl.node_id, sl.status, ui.*
FROM subscription_log sl
JOIN unnested_inputs ui
ON sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned'
AND sl.subscription_id = ui.subscription_id
AND sl.subscription_instance_identifier =
ui.subscription_instance_identifier
FOR UPDATE NOWAIT -- if something is updating it, we probably shouldn't
touch it anymore.
),
updated_logs AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
WHERE sl.id = usl.subscription_log_id
AND usl.node_id = $12
),
inserted_event_log AS (
INSERT INTO event_log (
event_id, type, version, causation_id, correlation_id,
idempotency_key, data, metadata, created_at
)
SELECT
event_id, type, version, usl.causation_id, correlation_id,
idempotency_key, data, metadata, CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
),
inserted_output_routing_info AS (
INSERT INTO output_event_routing (event_id, subscription_id,
subscription_instance_identifier, applied_order_id)
SELECT event_id, subscription_id, subscription_instance_identifier,
applied_order_id
FROM to_update_subscription_logs usl
),
SELECT * FROM to_update_subscription_logs
The tables look as follows:
CREATE TABLE event_log (
event_id UUID PRIMARY KEY,
event_order_id BIGINT REFERENCES event(order_id),
type varchar NOT NULL,
version int NOT NULL,
causation_id UUID,
correlation_id UUID,
idempotency_key TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT constraint_event_log_unique_idempotency_key
UNIQUE(idempotency_key) -- idempotent writes.
);
CREATE TABLE output_event_routing (
event_id UUID REFERENCES event_log(event_id),
subscription_id TEXT NOT NULL,
subscription_instance_identifier TEXT,
applied_order_id BIGINT,
CONSTRAINT constraint_output_event_routing_uniqueness
UNIQUE(subscription_id, subscription_instance_identifier, applied_order_id)
);
CREATE TABLE subscription_log (
id UUID NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
event_order_id BIGINT NOT NULL,
event_correlation_id UUID NOT NULL,
subscription_instance_identifier TEXT NOT NULL,
subscription_id TEXT NOT NULL REFERENCES subscription(name),
status processing_status NOT NULL DEFAULT 'enqueued',
node_id UUID references node(id), -- is null until assigned.
);
Since I'm trying to avoid using PL/pgSQL upon request I tried to achieve
the following behaviour in CTEs:
- For given events, update the subscription log to 'processed' only if we
still are the node that is processing these and the status is still
'assigned'.
- Only for the events where the previous succeeded, continue processing by
inserting in the event_log and inserting in
the inserted_output_routing_info.
The mechanism aims to make sure we don't insert results of event processing
twice.
When logging the input values, we can see that there are indeed two times
the same value sets (exactly the same) passed for different nodes, that's
to be expected and exactly what has to be caught by this logic. Same
values, but another node. What we see is that one node succeeds and the
other node fails due to the uniqueness violation. Which is actually fine
from a business perspective since rolling back has the same effect, albeit
with an error that I didn't expect. However, I would love to understand
this, how can one node succeed and set the status of the log to 'processed'
and continue to insert th