This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch tracing-impl
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 45b7a83b4eb62107cedae91b7f92610832e2f2c0
Author: lahiruj <[email protected]>
AuthorDate: Fri Jun 5 00:30:16 2026 -0400

    Propagate request context across event bus so subscribers inherit trace IDs
---
 pkg/events/bus.go                                  |  64 +++++-
 pkg/events/bus_test.go                             | 225 +++++++++++++++++++++
 pkg/events/compute_allocation_diff_subscribe.go    |   9 +-
 ...ation_membership_resource_override_subscribe.go |   9 +-
 .../compute_allocation_membership_subscribe.go     |   9 +-
 ...ompute_allocation_resource_mapping_subscribe.go |   9 +-
 .../compute_allocation_resource_subscribe.go       |   9 +-
 pkg/events/compute_allocation_subscribe.go         |   9 +-
 pkg/events/compute_cluster_user_subscribe.go       |   9 +-
 pkg/events/organization_subscribe.go               |   9 +-
 pkg/events/project_subscribe.go                    |   9 +-
 pkg/events/types.go                                |   3 +-
 pkg/events/user_identity_subscribe.go              |   9 +-
 pkg/events/user_subscribe.go                       |   9 +-
 pkg/service/compute_allocation.go                  |   6 +-
 pkg/service/compute_allocation_diff.go             |   4 +-
 pkg/service/compute_allocation_membership.go       |   8 +-
 ...pute_allocation_membership_resource_override.go |   6 +-
 pkg/service/compute_allocation_resource.go         |   6 +-
 pkg/service/compute_allocation_resource_mapping.go |   6 +-
 pkg/service/compute_cluster.go                     |   6 +-
 pkg/service/compute_cluster_user.go                |   6 +-
 pkg/service/organization.go                        |   6 +-
 pkg/service/project.go                             |   8 +-
 pkg/service/user.go                                |   8 +-
 pkg/service/user_identity.go                       |   6 +-
 pkg/service/user_merge.go                          |   4 +-
 27 files changed, 382 insertions(+), 89 deletions(-)

diff --git a/pkg/events/bus.go b/pkg/events/bus.go
index e2e419b25..2044e02ca 100644
--- a/pkg/events/bus.go
+++ b/pkg/events/bus.go
@@ -1,5 +1,15 @@
 package events
 
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "runtime/debug"
+
+       "github.com/apache/airavata-custos/internal/tracing"
+       "go.opentelemetry.io/otel/codes"
+)
+
 func New() *Bus {
        return &Bus{
                subs: make(map[string][]EventSubscriberFunc),
@@ -17,21 +27,47 @@ func (b *Bus) Subscribe(topic EventType, handler 
EventSubscriberFunc) {
 
 // Publish sends an event to all subscribers of the given topic.
 // Each handler runs in its own goroutine so publishers never block.
-func (b *Bus) Publish(topic EventType, payload any) {
+func (b *Bus) Publish(ctx context.Context, topic EventType, payload any) {
+       ctx, span := tracing.Start(ctx, "bus.publish:"+string(topic))
+       defer span.End()
+
        b.mu.RLock()
        handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)]))
        copy(handlers, b.subs[string(topic)])
        b.mu.RUnlock()
 
        event := Event{Type: topic, Payload: payload}
+       detached := context.WithoutCancel(ctx)
        for _, h := range handlers {
-               go h(event, payload)
+               go safeDispatch(detached, h, event, payload)
        }
 }
 
+func safeDispatch(ctx context.Context, h EventSubscriberFunc, event Event, 
payload any) {
+       ctx, span := tracing.Start(ctx, "bus.subscribe:"+string(event.Type))
+       defer span.End()
+
+       defer func() {
+               if r := recover(); r != nil {
+                       err := fmt.Errorf("subscriber panic: %v", r)
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, "subscriber panic")
+                       slog.Error("event subscriber panicked",
+                               "topic", event.Type,
+                               "panic", r,
+                               "stack", string(debug.Stack()),
+                       )
+               }
+       }()
+       h(ctx, event, payload)
+}
+
 // PublishSync is like Publish but calls handlers in the caller's goroutine.
 // Useful when you need to guarantee ordering or want backpressure.
-func (b *Bus) PublishSync(topic EventType, payload any) {
+func (b *Bus) PublishSync(ctx context.Context, topic EventType, payload any) {
+       ctx, span := tracing.Start(ctx, "bus.publish:"+string(topic))
+       defer span.End()
+
        b.mu.RLock()
        handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)]))
        copy(handlers, b.subs[string(topic)])
@@ -39,6 +75,26 @@ func (b *Bus) PublishSync(topic EventType, payload any) {
 
        event := Event{Type: topic, Payload: payload}
        for _, h := range handlers {
-               h(event, payload)
+               dispatchSync(ctx, h, event, payload)
        }
 }
+
+func dispatchSync(ctx context.Context, h EventSubscriberFunc, event Event, 
payload any) {
+       ctx, span := tracing.Start(ctx, "bus.subscribe:"+string(event.Type))
+       defer span.End()
+
+       defer func() {
+               if r := recover(); r != nil {
+                       err := fmt.Errorf("subscriber panic: %v", r)
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, "subscriber panic")
+                       slog.Error("event subscriber panicked",
+                               "topic", event.Type,
+                               "panic", r,
+                               "stack", string(debug.Stack()),
+                       )
+                       panic(r)
+               }
+       }()
+       h(ctx, event, payload)
+}
diff --git a/pkg/events/bus_test.go b/pkg/events/bus_test.go
new file mode 100644
index 000000000..46af671a6
--- /dev/null
+++ b/pkg/events/bus_test.go
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package events
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "go.opentelemetry.io/otel"
+       "go.opentelemetry.io/otel/codes"
+       sdktrace "go.opentelemetry.io/otel/sdk/trace"
+)
+
+type ctxKey string
+
+const testCtxKey ctxKey = "trace-id"
+
+const topicTest EventType = "test::topic"
+
+func TestPublishSyncPropagatesContext(t *testing.T) {
+       bus := New()
+
+       got := make(chan string, 1)
+       bus.Subscribe(topicTest, func(ctx context.Context, _ Event, _ 
interface{}) {
+               v, _ := ctx.Value(testCtxKey).(string)
+               got <- v
+       })
+
+       ctx := context.WithValue(context.Background(), testCtxKey, "abc123")
+       bus.PublishSync(ctx, topicTest, "payload")
+
+       select {
+       case v := <-got:
+               if v != "abc123" {
+                       t.Fatalf("PublishSync did not propagate ctx value, got 
%q", v)
+               }
+       case <-time.After(time.Second):
+               t.Fatalf("subscriber never ran")
+       }
+}
+
+func TestPublishAsyncPropagatesContext(t *testing.T) {
+       bus := New()
+
+       got := make(chan string, 1)
+       bus.Subscribe(topicTest, func(ctx context.Context, _ Event, _ 
interface{}) {
+               v, _ := ctx.Value(testCtxKey).(string)
+               got <- v
+       })
+
+       ctx := context.WithValue(context.Background(), testCtxKey, "def456")
+       bus.Publish(ctx, topicTest, "payload")
+
+       select {
+       case v := <-got:
+               if v != "def456" {
+                       t.Fatalf("Publish did not propagate ctx value, got %q", 
v)
+               }
+       case <-time.After(time.Second):
+               t.Fatalf("subscriber never ran")
+       }
+}
+
+func TestPublishAsyncDetachesCancellation(t *testing.T) {
+       bus := New()
+
+       started := make(chan struct{})
+       done := make(chan error, 1)
+       bus.Subscribe(topicTest, func(ctx context.Context, _ Event, _ 
interface{}) {
+               close(started)
+               select {
+               case <-ctx.Done():
+                       done <- ctx.Err()
+               case <-time.After(200 * time.Millisecond):
+                       done <- nil
+               }
+       })
+
+       ctx, cancel := context.WithCancel(context.Background())
+       bus.Publish(ctx, topicTest, "payload")
+
+       <-started
+       cancel()
+
+       select {
+       case err := <-done:
+               if err != nil {
+                       t.Fatalf("expected subscriber ctx to be detached from 
cancellation, got err=%v", err)
+               }
+       case <-time.After(time.Second):
+               t.Fatalf("subscriber never finished")
+       }
+}
+
+func TestPublishSyncPanicPropagatesToCaller(t *testing.T) {
+       bus := New()
+
+       bus.Subscribe(topicTest, func(context.Context, Event, interface{}) {
+               panic("boom")
+       })
+
+       var recovered any
+       func() {
+               defer func() { recovered = recover() }()
+               bus.PublishSync(context.Background(), topicTest, nil)
+       }()
+
+       if recovered == nil {
+               t.Fatalf("expected sync publish to surface subscriber panic to 
caller")
+       }
+}
+
+func TestPublishAsyncSubscriberPanicDoesNotKillOthers(t *testing.T) {
+       bus := New()
+
+       var ran atomic.Int32
+       var wg sync.WaitGroup
+       wg.Add(2)
+       bus.Subscribe(topicTest, func(context.Context, Event, interface{}) {
+               defer wg.Done()
+               ran.Add(1)
+               panic("boom")
+       })
+       bus.Subscribe(topicTest, func(context.Context, Event, interface{}) {
+               defer wg.Done()
+               ran.Add(1)
+       })
+
+       bus.Publish(context.Background(), topicTest, nil)
+
+       finished := make(chan struct{})
+       go func() {
+               wg.Wait()
+               close(finished)
+       }()
+
+       select {
+       case <-finished:
+       case <-time.After(time.Second):
+       }
+
+       if got := ran.Load(); got != 2 {
+               t.Fatalf("expected both subscribers to run, got %d", got)
+       }
+}
+
+type recordingProcessor struct {
+       mu    sync.Mutex
+       spans []sdktrace.ReadOnlySpan
+}
+
+func (p *recordingProcessor) OnStart(context.Context, sdktrace.ReadWriteSpan) 
{}
+func (p *recordingProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       p.spans = append(p.spans, s)
+}
+func (p *recordingProcessor) Shutdown(context.Context) error   { return nil }
+func (p *recordingProcessor) ForceFlush(context.Context) error { return nil }
+
+func (p *recordingProcessor) findByName(name string) sdktrace.ReadOnlySpan {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       for _, s := range p.spans {
+               if s.Name() == name {
+                       return s
+               }
+       }
+       return nil
+}
+
+func TestSafeDispatchSetsSpanErrorOnPanic(t *testing.T) {
+       rec := &recordingProcessor{}
+       tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec))
+       prev := otel.GetTracerProvider()
+       otel.SetTracerProvider(tp)
+       t.Cleanup(func() { otel.SetTracerProvider(prev) })
+
+       bus := New()
+       bus.Subscribe(topicTest, func(context.Context, Event, interface{}) {
+               panic("boom")
+       })
+
+       bus.Publish(context.Background(), topicTest, nil)
+
+       deadline := time.Now().Add(time.Second)
+       var sub sdktrace.ReadOnlySpan
+       for time.Now().Before(deadline) {
+               _ = tp.ForceFlush(context.Background())
+               sub = rec.findByName("bus.subscribe:" + string(topicTest))
+               if sub != nil {
+                       break
+               }
+               time.Sleep(5 * time.Millisecond)
+       }
+
+       if sub == nil {
+               var names []string
+               for _, s := range rec.spans {
+                       names = append(names, s.Name())
+               }
+               t.Fatalf("did not see bus.subscribe span; got names=%v", names)
+       }
+       if got := sub.Status().Code; got != codes.Error {
+               t.Fatalf("expected bus.subscribe span status=Error, got %v", 
got)
+       }
+}
diff --git a/pkg/events/compute_allocation_diff_subscribe.go 
b/pkg/events/compute_allocation_diff_subscribe.go
index 126aa2b4c..83c1b723c 100644
--- a/pkg/events/compute_allocation_diff_subscribe.go
+++ b/pkg/events/compute_allocation_diff_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // ComputeAllocationDiffHandler handles compute allocation diff lifecycle 
events with a typed payload.
-type ComputeAllocationDiffHandler func(diff models.ComputeAllocationDiff)
+type ComputeAllocationDiffHandler func(ctx context.Context, diff 
models.ComputeAllocationDiff)
 
 // SubscribeComputeAllocationDiffCreated registers a typed handler invoked 
whenever a
 // compute_allocation_diff::create event is published. Events with payloads 
that are
@@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationDiffDeleted(handler 
ComputeAllocationDif
 }
 
 func (b *Bus) subscribeComputeAllocationDiff(topic EventType, handler 
ComputeAllocationDiffHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch d := value.(type) {
                case models.ComputeAllocationDiff:
-                       handler(d)
+                       handler(ctx, d)
                case *models.ComputeAllocationDiff:
                        if d != nil {
-                               handler(*d)
+                               handler(ctx, *d)
                        }
                default:
                        slog.Warn("compute allocation diff event payload has 
unexpected type",
diff --git 
a/pkg/events/compute_allocation_membership_resource_override_subscribe.go 
b/pkg/events/compute_allocation_membership_resource_override_subscribe.go
index ea77026fa..6a381a524 100644
--- a/pkg/events/compute_allocation_membership_resource_override_subscribe.go
+++ b/pkg/events/compute_allocation_membership_resource_override_subscribe.go
@@ -18,6 +18,7 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
@@ -25,7 +26,7 @@ import (
 
 // ComputeAllocationMembershipResourceOverrideHandler handles lifecycle
 // events for membership resource overrides with a typed payload.
-type ComputeAllocationMembershipResourceOverrideHandler func(o 
models.ComputeAllocationMembershipResourceOverride)
+type ComputeAllocationMembershipResourceOverrideHandler func(ctx 
context.Context, o models.ComputeAllocationMembershipResourceOverride)
 
 // SubscribeComputeAllocationMembershipResourceOverrideCreated registers a
 // typed handler invoked whenever a
@@ -49,13 +50,13 @@ func (b *Bus) 
SubscribeComputeAllocationMembershipResourceOverrideDeleted(handle
 }
 
 func (b *Bus) subscribeMembershipResourceOverride(topic EventType, handler 
ComputeAllocationMembershipResourceOverrideHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch o := value.(type) {
                case models.ComputeAllocationMembershipResourceOverride:
-                       handler(o)
+                       handler(ctx, o)
                case *models.ComputeAllocationMembershipResourceOverride:
                        if o != nil {
-                               handler(*o)
+                               handler(ctx, *o)
                        }
                default:
                        slog.Warn("compute allocation membership resource 
override event payload has unexpected type",
diff --git a/pkg/events/compute_allocation_membership_subscribe.go 
b/pkg/events/compute_allocation_membership_subscribe.go
index 79c3f6cf0..13c1256e2 100644
--- a/pkg/events/compute_allocation_membership_subscribe.go
+++ b/pkg/events/compute_allocation_membership_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // ComputeAllocationMembershipHandler handles compute allocation membership 
lifecycle events with a typed payload.
-type ComputeAllocationMembershipHandler func(membership 
models.ComputeAllocationMembership)
+type ComputeAllocationMembershipHandler func(ctx context.Context, membership 
models.ComputeAllocationMembership)
 
 // SubscribeComputeAllocationMembershipCreated registers a typed handler 
invoked whenever a
 // compute_allocation_membership::create event is published. Events with 
payloads that are
@@ -30,13 +31,13 @@ func (b *Bus) 
SubscribeComputeAllocationMembershipDeleted(handler ComputeAllocat
 }
 
 func (b *Bus) subscribeComputeAllocationMembership(topic EventType, handler 
ComputeAllocationMembershipHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch m := value.(type) {
                case models.ComputeAllocationMembership:
-                       handler(m)
+                       handler(ctx, m)
                case *models.ComputeAllocationMembership:
                        if m != nil {
-                               handler(*m)
+                               handler(ctx, *m)
                        }
                default:
                        slog.Warn("compute allocation membership event payload 
has unexpected type",
diff --git a/pkg/events/compute_allocation_resource_mapping_subscribe.go 
b/pkg/events/compute_allocation_resource_mapping_subscribe.go
index 25522204a..e8e6be175 100644
--- a/pkg/events/compute_allocation_resource_mapping_subscribe.go
+++ b/pkg/events/compute_allocation_resource_mapping_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // ComputeAllocationResourceMappingHandler handles compute allocation resource 
mapping lifecycle events with a typed payload.
-type ComputeAllocationResourceMappingHandler func(mapping 
models.ComputeAllocationResourceMapping)
+type ComputeAllocationResourceMappingHandler func(ctx context.Context, mapping 
models.ComputeAllocationResourceMapping)
 
 // SubscribeComputeAllocationResourceMappingCreated registers a typed handler 
invoked whenever a
 // compute_allocation_resource_mapping::create event is published. Events with 
payloads that are
@@ -30,13 +31,13 @@ func (b *Bus) 
SubscribeComputeAllocationResourceMappingDeleted(handler ComputeAl
 }
 
 func (b *Bus) subscribeComputeAllocationResourceMapping(topic EventType, 
handler ComputeAllocationResourceMappingHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch m := value.(type) {
                case models.ComputeAllocationResourceMapping:
-                       handler(m)
+                       handler(ctx, m)
                case *models.ComputeAllocationResourceMapping:
                        if m != nil {
-                               handler(*m)
+                               handler(ctx, *m)
                        }
                default:
                        slog.Warn("compute allocation resource mapping event 
payload has unexpected type",
diff --git a/pkg/events/compute_allocation_resource_subscribe.go 
b/pkg/events/compute_allocation_resource_subscribe.go
index bca3308b5..e45d89072 100644
--- a/pkg/events/compute_allocation_resource_subscribe.go
+++ b/pkg/events/compute_allocation_resource_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // ComputeAllocationResourceHandler handles compute allocation resource 
lifecycle events with a typed payload.
-type ComputeAllocationResourceHandler func(resource 
models.ComputeAllocationResource)
+type ComputeAllocationResourceHandler func(ctx context.Context, resource 
models.ComputeAllocationResource)
 
 // SubscribeComputeAllocationResourceCreated registers a typed handler invoked 
whenever a
 // compute_allocation_resource::create event is published. Events with 
payloads that are
@@ -30,13 +31,13 @@ func (b *Bus) 
SubscribeComputeAllocationResourceDeleted(handler ComputeAllocatio
 }
 
 func (b *Bus) subscribeComputeAllocationResource(topic EventType, handler 
ComputeAllocationResourceHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch r := value.(type) {
                case models.ComputeAllocationResource:
-                       handler(r)
+                       handler(ctx, r)
                case *models.ComputeAllocationResource:
                        if r != nil {
-                               handler(*r)
+                               handler(ctx, *r)
                        }
                default:
                        slog.Warn("compute allocation resource event payload 
has unexpected type",
diff --git a/pkg/events/compute_allocation_subscribe.go 
b/pkg/events/compute_allocation_subscribe.go
index 93fc2e3c8..93500c816 100644
--- a/pkg/events/compute_allocation_subscribe.go
+++ b/pkg/events/compute_allocation_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // ComputeAllocationHandler handles compute allocation lifecycle events with a 
typed payload.
-type ComputeAllocationHandler func(allocation models.ComputeAllocation)
+type ComputeAllocationHandler func(ctx context.Context, allocation 
models.ComputeAllocation)
 
 // SubscribeComputeAllocationCreated registers a typed handler invoked 
whenever a
 // compute_allocation::create event is published. Events with payloads that are
@@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationDeleted(handler 
ComputeAllocationHandler
 }
 
 func (b *Bus) subscribeComputeAllocation(topic EventType, handler 
ComputeAllocationHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch a := value.(type) {
                case models.ComputeAllocation:
-                       handler(a)
+                       handler(ctx, a)
                case *models.ComputeAllocation:
                        if a != nil {
-                               handler(*a)
+                               handler(ctx, *a)
                        }
                default:
                        slog.Warn("compute allocation event payload has 
unexpected type",
diff --git a/pkg/events/compute_cluster_user_subscribe.go 
b/pkg/events/compute_cluster_user_subscribe.go
index 5ba7a11a7..1ef7d60d1 100644
--- a/pkg/events/compute_cluster_user_subscribe.go
+++ b/pkg/events/compute_cluster_user_subscribe.go
@@ -18,6 +18,7 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
@@ -25,7 +26,7 @@ import (
 
 // ComputeClusterUserHandler handles compute-cluster user lifecycle events
 // with a typed payload.
-type ComputeClusterUserHandler func(user models.ComputeClusterUser)
+type ComputeClusterUserHandler func(ctx context.Context, user 
models.ComputeClusterUser)
 
 // SubscribeComputeClusterUserCreated registers a typed handler invoked
 // whenever a compute_cluster_user::create event is published.
@@ -46,13 +47,13 @@ func (b *Bus) SubscribeComputeClusterUserDeleted(handler 
ComputeClusterUserHandl
 }
 
 func (b *Bus) subscribeComputeClusterUser(topic EventType, handler 
ComputeClusterUserHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch u := value.(type) {
                case models.ComputeClusterUser:
-                       handler(u)
+                       handler(ctx, u)
                case *models.ComputeClusterUser:
                        if u != nil {
-                               handler(*u)
+                               handler(ctx, *u)
                        }
                default:
                        slog.Warn("compute cluster user event payload has 
unexpected type",
diff --git a/pkg/events/organization_subscribe.go 
b/pkg/events/organization_subscribe.go
index 68a6b528e..5b5674957 100644
--- a/pkg/events/organization_subscribe.go
+++ b/pkg/events/organization_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // OrganizationHandler handles organization lifecycle events with a typed 
payload.
-type OrganizationHandler func(organization models.Organization)
+type OrganizationHandler func(ctx context.Context, organization 
models.Organization)
 
 // SubscribeOrganizationCreated registers a typed handler invoked whenever an
 // organization::create event is published. Events with payloads that are not a
@@ -29,13 +30,13 @@ func (b *Bus) SubscribeOrganizationDeleted(handler 
OrganizationHandler) {
 }
 
 func (b *Bus) subscribeOrganization(topic EventType, handler 
OrganizationHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch o := value.(type) {
                case models.Organization:
-                       handler(o)
+                       handler(ctx, o)
                case *models.Organization:
                        if o != nil {
-                               handler(*o)
+                               handler(ctx, *o)
                        }
                default:
                        slog.Warn("organization event payload has unexpected 
type",
diff --git a/pkg/events/project_subscribe.go b/pkg/events/project_subscribe.go
index da7cb7819..874c3c08d 100644
--- a/pkg/events/project_subscribe.go
+++ b/pkg/events/project_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // ProjectHandler handles project lifecycle events with a typed payload.
-type ProjectHandler func(project models.Project)
+type ProjectHandler func(ctx context.Context, project models.Project)
 
 // SubscribeProjectCreated registers a typed handler invoked whenever a
 // project::create event is published. Events with payloads that are not a
@@ -31,13 +32,13 @@ func (b *Bus) SubscribeProjectDeleted(handler 
ProjectHandler) {
 }
 
 func (b *Bus) subscribeProject(topic EventType, handler ProjectHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch p := value.(type) {
                case models.Project:
-                       handler(p)
+                       handler(ctx, p)
                case *models.Project:
                        if p != nil {
-                               handler(*p)
+                               handler(ctx, *p)
                        }
                default:
                        slog.Warn("project event payload has unexpected type",
diff --git a/pkg/events/types.go b/pkg/events/types.go
index 4110782c6..4d9cb2b62 100644
--- a/pkg/events/types.go
+++ b/pkg/events/types.go
@@ -20,6 +20,7 @@
 package events
 
 import (
+       "context"
        "sync"
 )
 
@@ -126,7 +127,7 @@ type Event struct {
 }
 
 // EventSubscriberFunc is a function type that can be registered to receive 
events from the bus.
-type EventSubscriberFunc func(event Event, value interface{})
+type EventSubscriberFunc func(ctx context.Context, event Event, value 
interface{})
 
 // Bus is a lightweight, in-memory, topic-based pub/sub event bus.
 // Modules publish and subscribe by topic without knowing about each other.
diff --git a/pkg/events/user_identity_subscribe.go 
b/pkg/events/user_identity_subscribe.go
index dfbf7df47..06159522a 100644
--- a/pkg/events/user_identity_subscribe.go
+++ b/pkg/events/user_identity_subscribe.go
@@ -18,13 +18,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // UserIdentityHandler handles user-identity lifecycle events with a typed 
payload.
-type UserIdentityHandler func(identity models.UserIdentity)
+type UserIdentityHandler func(ctx context.Context, identity 
models.UserIdentity)
 
 // SubscribeUserIdentityCreated registers a typed handler invoked whenever a
 // user_identity::create event is published.
@@ -45,13 +46,13 @@ func (b *Bus) SubscribeUserIdentityDeleted(handler 
UserIdentityHandler) {
 }
 
 func (b *Bus) subscribeUserIdentity(topic EventType, handler 
UserIdentityHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch e := value.(type) {
                case models.UserIdentity:
-                       handler(e)
+                       handler(ctx, e)
                case *models.UserIdentity:
                        if e != nil {
-                               handler(*e)
+                               handler(ctx, *e)
                        }
                default:
                        slog.Warn("user identity event payload has unexpected 
type",
diff --git a/pkg/events/user_subscribe.go b/pkg/events/user_subscribe.go
index 7c08a49c5..c9bc92b07 100644
--- a/pkg/events/user_subscribe.go
+++ b/pkg/events/user_subscribe.go
@@ -1,13 +1,14 @@
 package events
 
 import (
+       "context"
        "log/slog"
 
        "github.com/apache/airavata-custos/pkg/models"
 )
 
 // UserHandler handles user lifecycle events with a typed payload.
-type UserHandler func(user models.User)
+type UserHandler func(ctx context.Context, user models.User)
 
 // SubscribeUserCreated registers a typed handler invoked whenever a
 // user::create event is published. Events with payloads that are not a
@@ -29,13 +30,13 @@ func (b *Bus) SubscribeUserDeleted(handler UserHandler) {
 }
 
 func (b *Bus) subscribeUser(topic EventType, handler UserHandler) {
-       b.Subscribe(topic, func(event Event, value interface{}) {
+       b.Subscribe(topic, func(ctx context.Context, event Event, value 
interface{}) {
                switch u := value.(type) {
                case models.User:
-                       handler(u)
+                       handler(ctx, u)
                case *models.User:
                        if u != nil {
-                               handler(*u)
+                               handler(ctx, *u)
                        }
                default:
                        slog.Warn("user event payload has unexpected type",
diff --git a/pkg/service/compute_allocation.go 
b/pkg/service/compute_allocation.go
index 47e010a48..dbc817ea4 100644
--- a/pkg/service/compute_allocation.go
+++ b/pkg/service/compute_allocation.go
@@ -67,7 +67,7 @@ func (s *Service) CreateComputeAllocation(ctx 
context.Context, alloc *models.Com
                return nil, fmt.Errorf("create compute allocation: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationCreateEvent, alloc)
+       s.eventBus.Publish(ctx, events.ComputeAllocationCreateEvent, alloc)
        return alloc, nil
 }
 
@@ -113,7 +113,7 @@ func (s *Service) UpdateComputeAllocation(ctx 
context.Context, alloc *models.Com
                return fmt.Errorf("update compute allocation: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationUpdateEvent, alloc)
+       s.eventBus.Publish(ctx, events.ComputeAllocationUpdateEvent, alloc)
        return nil
 }
 
@@ -135,6 +135,6 @@ func (s *Service) DeleteComputeAllocation(ctx 
context.Context, id string) error
                return fmt.Errorf("delete compute allocation: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationDeleteEvent, alloc)
+       s.eventBus.Publish(ctx, events.ComputeAllocationDeleteEvent, alloc)
        return nil
 }
diff --git a/pkg/service/compute_allocation_diff.go 
b/pkg/service/compute_allocation_diff.go
index 0375e1fee..ac250b905 100644
--- a/pkg/service/compute_allocation_diff.go
+++ b/pkg/service/compute_allocation_diff.go
@@ -63,7 +63,7 @@ func (s *Service) CreateComputeAllocationDiff(ctx 
context.Context, diff *models.
                return nil, fmt.Errorf("create compute allocation diff: %w", 
err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationDiffCreateEvent, diff)
+       s.eventBus.Publish(ctx, events.ComputeAllocationDiffCreateEvent, diff)
        return diff, nil
 }
 
@@ -129,6 +129,6 @@ func (s *Service) DeleteComputeAllocationDiff(ctx 
context.Context, id string) er
                return fmt.Errorf("delete compute allocation diff: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationDiffDeleteEvent, diff)
+       s.eventBus.Publish(ctx, events.ComputeAllocationDiffDeleteEvent, diff)
        return nil
 }
diff --git a/pkg/service/compute_allocation_membership.go 
b/pkg/service/compute_allocation_membership.go
index f61bbdc06..cbf60cc5a 100644
--- a/pkg/service/compute_allocation_membership.go
+++ b/pkg/service/compute_allocation_membership.go
@@ -71,7 +71,7 @@ func (s *Service) CreateComputeAllocationMembership(ctx 
context.Context, m *mode
                return nil, fmt.Errorf("create compute allocation membership: 
%w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationMembershipCreateEvent, m)
+       s.eventBus.Publish(ctx, events.ComputeAllocationMembershipCreateEvent, 
m)
        return m, nil
 }
 
@@ -150,7 +150,7 @@ func (s *Service) UpdateComputeAllocationMembership(ctx 
context.Context, m *mode
                return nil, fmt.Errorf("update compute allocation membership: 
%w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, m)
+       s.eventBus.Publish(ctx, events.ComputeAllocationMembershipUpdateEvent, 
m)
        return m, nil
 }
 
@@ -177,7 +177,7 @@ func (s *Service) UpdateMembershipStatus(ctx 
context.Context, id string, status
                return nil, fmt.Errorf("update compute allocation membership 
status: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, 
existing)
+       s.eventBus.Publish(ctx, events.ComputeAllocationMembershipUpdateEvent, 
existing)
        return existing, nil
 }
 
@@ -199,6 +199,6 @@ func (s *Service) DeleteComputeAllocationMembership(ctx 
context.Context, id stri
                return fmt.Errorf("delete compute allocation membership: %w", 
err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationMembershipDeleteEvent, 
existing)
+       s.eventBus.Publish(ctx, events.ComputeAllocationMembershipDeleteEvent, 
existing)
        return nil
 }
diff --git a/pkg/service/compute_allocation_membership_resource_override.go 
b/pkg/service/compute_allocation_membership_resource_override.go
index e3148cd00..d4e6cc355 100644
--- a/pkg/service/compute_allocation_membership_resource_override.go
+++ b/pkg/service/compute_allocation_membership_resource_override.go
@@ -77,7 +77,7 @@ func (s *Service) 
CreateComputeAllocationMembershipResourceOverride(ctx context.
                return nil, fmt.Errorf("create membership resource override: 
%w", err)
        }
 
-       
s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideCreateEvent,
 o)
+       s.eventBus.Publish(ctx, 
events.ComputeAllocationMembershipResourceOverrideCreateEvent, o)
        return o, nil
 }
 
@@ -170,7 +170,7 @@ func (s *Service) 
UpdateComputeAllocationMembershipResourceOverride(ctx context.
                return nil, fmt.Errorf("update membership resource override: 
%w", err)
        }
 
-       
s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideUpdateEvent,
 o)
+       s.eventBus.Publish(ctx, 
events.ComputeAllocationMembershipResourceOverrideUpdateEvent, o)
        return o, nil
 }
 
@@ -192,6 +192,6 @@ func (s *Service) 
DeleteComputeAllocationMembershipResourceOverride(ctx context.
                return fmt.Errorf("delete membership resource override: %w", 
err)
        }
 
-       
s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideDeleteEvent,
 existing)
+       s.eventBus.Publish(ctx, 
events.ComputeAllocationMembershipResourceOverrideDeleteEvent, existing)
        return nil
 }
diff --git a/pkg/service/compute_allocation_resource.go 
b/pkg/service/compute_allocation_resource.go
index 4efb28b51..f8ee2c182 100644
--- a/pkg/service/compute_allocation_resource.go
+++ b/pkg/service/compute_allocation_resource.go
@@ -48,7 +48,7 @@ func (s *Service) CreateComputeAllocationResource(ctx 
context.Context, resource
                return nil, fmt.Errorf("create compute allocation resource: 
%w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationResourceCreateEvent, 
resource)
+       s.eventBus.Publish(ctx, events.ComputeAllocationResourceCreateEvent, 
resource)
        return resource, nil
 }
 
@@ -85,7 +85,7 @@ func (s *Service) UpdateComputeAllocationResource(ctx 
context.Context, resource
                return fmt.Errorf("update compute allocation resource: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationResourceUpdateEvent, 
resource)
+       s.eventBus.Publish(ctx, events.ComputeAllocationResourceUpdateEvent, 
resource)
        return nil
 }
 
@@ -107,6 +107,6 @@ func (s *Service) DeleteComputeAllocationResource(ctx 
context.Context, id string
                return fmt.Errorf("delete compute allocation resource: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationResourceDeleteEvent, 
resource)
+       s.eventBus.Publish(ctx, events.ComputeAllocationResourceDeleteEvent, 
resource)
        return nil
 }
diff --git a/pkg/service/compute_allocation_resource_mapping.go 
b/pkg/service/compute_allocation_resource_mapping.go
index ce5f4ef18..e6d154ec0 100644
--- a/pkg/service/compute_allocation_resource_mapping.go
+++ b/pkg/service/compute_allocation_resource_mapping.go
@@ -76,7 +76,7 @@ func (s *Service) AttachResourceToAllocation(ctx 
context.Context, allocationID,
                return nil, fmt.Errorf("attach resource to allocation: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationResourceMappingCreateEvent, 
mapping)
+       s.eventBus.Publish(ctx, 
events.ComputeAllocationResourceMappingCreateEvent, mapping)
        return mapping, nil
 }
 
@@ -109,7 +109,7 @@ func (s *Service) UpdateAllocationResourceMapping(ctx 
context.Context, allocatio
                return nil, fmt.Errorf("update allocation resource mapping: 
%w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationResourceMappingUpdateEvent, 
existing)
+       s.eventBus.Publish(ctx, 
events.ComputeAllocationResourceMappingUpdateEvent, existing)
        return existing, nil
 }
 
@@ -135,7 +135,7 @@ func (s *Service) DetachResourceFromAllocation(ctx 
context.Context, allocationID
                return fmt.Errorf("detach resource from allocation: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeAllocationResourceMappingDeleteEvent, 
existing)
+       s.eventBus.Publish(ctx, 
events.ComputeAllocationResourceMappingDeleteEvent, existing)
        return nil
 }
 
diff --git a/pkg/service/compute_cluster.go b/pkg/service/compute_cluster.go
index a9d5b3091..382f22efc 100644
--- a/pkg/service/compute_cluster.go
+++ b/pkg/service/compute_cluster.go
@@ -51,7 +51,7 @@ func (s *Service) CreateComputeCluster(ctx context.Context, 
cluster *models.Comp
                return nil, fmt.Errorf("create compute cluster: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeClusterCreateEvent, cluster)
+       s.eventBus.Publish(ctx, events.ComputeClusterCreateEvent, cluster)
        return cluster, nil
 }
 
@@ -103,7 +103,7 @@ func (s *Service) UpdateComputeCluster(ctx context.Context, 
cluster *models.Comp
                return fmt.Errorf("update compute cluster: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeClusterUpdateEvent, cluster)
+       s.eventBus.Publish(ctx, events.ComputeClusterUpdateEvent, cluster)
        return nil
 }
 
@@ -125,6 +125,6 @@ func (s *Service) DeleteComputeCluster(ctx context.Context, 
id string) error {
                return fmt.Errorf("delete compute cluster: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeClusterDeleteEvent, cluster)
+       s.eventBus.Publish(ctx, events.ComputeClusterDeleteEvent, cluster)
        return nil
 }
diff --git a/pkg/service/compute_cluster_user.go 
b/pkg/service/compute_cluster_user.go
index 0aeca1f27..e97d33a8c 100644
--- a/pkg/service/compute_cluster_user.go
+++ b/pkg/service/compute_cluster_user.go
@@ -79,7 +79,7 @@ func (s *Service) CreateComputeClusterUser(ctx 
context.Context, cu *models.Compu
                }
        }
 
-       s.eventBus.Publish(events.ComputeClusterUserCreateEvent, cu)
+       s.eventBus.Publish(ctx, events.ComputeClusterUserCreateEvent, cu)
        return cu, nil
 }
 
@@ -177,7 +177,7 @@ func (s *Service) UpdateComputeClusterUser(ctx 
context.Context, cu *models.Compu
                return fmt.Errorf("update compute cluster user: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeClusterUserUpdateEvent, cu)
+       s.eventBus.Publish(ctx, events.ComputeClusterUserUpdateEvent, cu)
        return nil
 }
 
@@ -199,6 +199,6 @@ func (s *Service) DeleteComputeClusterUser(ctx 
context.Context, id string) error
                return fmt.Errorf("delete compute cluster user: %w", err)
        }
 
-       s.eventBus.Publish(events.ComputeClusterUserDeleteEvent, cu)
+       s.eventBus.Publish(ctx, events.ComputeClusterUserDeleteEvent, cu)
        return nil
 }
diff --git a/pkg/service/organization.go b/pkg/service/organization.go
index 50effd1e7..15bb88925 100644
--- a/pkg/service/organization.go
+++ b/pkg/service/organization.go
@@ -53,7 +53,7 @@ func (s *Service) CreateOrganization(ctx context.Context, org 
*models.Organizati
                return nil, fmt.Errorf("create organization: %w", err)
        }
 
-       s.eventBus.Publish(events.OrganizationCreateEvent, org)
+       s.eventBus.Publish(ctx, events.OrganizationCreateEvent, org)
        return org, nil
 }
 
@@ -93,7 +93,7 @@ func (s *Service) UpdateOrganization(ctx context.Context, org 
*models.Organizati
                return fmt.Errorf("update organization: %w", err)
        }
 
-       s.eventBus.Publish(events.OrganizationUpdateEvent, org)
+       s.eventBus.Publish(ctx, events.OrganizationUpdateEvent, org)
        return nil
 }
 
@@ -115,6 +115,6 @@ func (s *Service) DeleteOrganization(ctx context.Context, 
id string) error {
                return fmt.Errorf("delete organization: %w", err)
        }
 
-       s.eventBus.Publish(events.OrganizationDeleteEvent, org)
+       s.eventBus.Publish(ctx, events.OrganizationDeleteEvent, org)
        return nil
 }
diff --git a/pkg/service/project.go b/pkg/service/project.go
index 338542473..53d57617f 100644
--- a/pkg/service/project.go
+++ b/pkg/service/project.go
@@ -69,7 +69,7 @@ func (s *Service) CreateProject(ctx context.Context, project 
*models.Project) (*
                return nil, fmt.Errorf("create project: %w", err)
        }
 
-       s.eventBus.Publish(events.ProjectCreateEvent, project)
+       s.eventBus.Publish(ctx, events.ProjectCreateEvent, project)
        return project, nil
 }
 
@@ -143,7 +143,7 @@ func (s *Service) UpdateProject(ctx context.Context, 
project *models.Project) er
                return fmt.Errorf("update project: %w", err)
        }
 
-       s.eventBus.Publish(events.ProjectUpdateEvent, project)
+       s.eventBus.Publish(ctx, events.ProjectUpdateEvent, project)
        return nil
 }
 
@@ -170,7 +170,7 @@ func (s *Service) UpdateProjectStatus(ctx context.Context, 
id string, status mod
        }
        existing.Status = status
 
-       s.eventBus.Publish(events.ProjectUpdateEvent, existing)
+       s.eventBus.Publish(ctx, events.ProjectUpdateEvent, existing)
        return existing, nil
 }
 
@@ -192,6 +192,6 @@ func (s *Service) DeleteProject(ctx context.Context, id 
string) error {
                return fmt.Errorf("delete project: %w", err)
        }
 
-       s.eventBus.Publish(events.ProjectDeleteEvent, project)
+       s.eventBus.Publish(ctx, events.ProjectDeleteEvent, project)
        return nil
 }
diff --git a/pkg/service/user.go b/pkg/service/user.go
index adc7f6a60..1da52c807 100644
--- a/pkg/service/user.go
+++ b/pkg/service/user.go
@@ -64,7 +64,7 @@ func (s *Service) CreateUser(ctx context.Context, user 
*models.User) (*models.Us
                return nil, fmt.Errorf("create user: %w", err)
        }
 
-       s.eventBus.Publish(events.UserCreateEvent, user)
+       s.eventBus.Publish(ctx, events.UserCreateEvent, user)
        return user, nil
 }
 
@@ -167,7 +167,7 @@ func (s *Service) UpdateUser(ctx context.Context, user 
*models.User) error {
                return fmt.Errorf("update user: %w", err)
        }
 
-       s.eventBus.Publish(events.UserUpdateEvent, user)
+       s.eventBus.Publish(ctx, events.UserUpdateEvent, user)
        return nil
 }
 
@@ -194,7 +194,7 @@ func (s *Service) UpdateUserStatus(ctx context.Context, id 
string, status models
        }
        existing.Status = status
 
-       s.eventBus.Publish(events.UserUpdateEvent, existing)
+       s.eventBus.Publish(ctx, events.UserUpdateEvent, existing)
        return existing, nil
 }
 
@@ -216,6 +216,6 @@ func (s *Service) DeleteUser(ctx context.Context, id 
string) error {
                return fmt.Errorf("delete user: %w", err)
        }
 
-       s.eventBus.Publish(events.UserDeleteEvent, existing)
+       s.eventBus.Publish(ctx, events.UserDeleteEvent, existing)
        return nil
 }
diff --git a/pkg/service/user_identity.go b/pkg/service/user_identity.go
index e9d9a7ce7..2fc218a55 100644
--- a/pkg/service/user_identity.go
+++ b/pkg/service/user_identity.go
@@ -64,7 +64,7 @@ func (s *Service) CreateUserIdentity(ctx context.Context, e 
*models.UserIdentity
                return nil, fmt.Errorf("create user identity: %w", err)
        }
 
-       s.eventBus.Publish(events.UserIdentityCreateEvent, e)
+       s.eventBus.Publish(ctx, events.UserIdentityCreateEvent, e)
        return e, nil
 }
 
@@ -163,7 +163,7 @@ func (s *Service) UpdateUserIdentity(ctx context.Context, e 
*models.UserIdentity
                return fmt.Errorf("update user identity: %w", err)
        }
 
-       s.eventBus.Publish(events.UserIdentityUpdateEvent, e)
+       s.eventBus.Publish(ctx, events.UserIdentityUpdateEvent, e)
        return nil
 }
 
@@ -185,6 +185,6 @@ func (s *Service) DeleteUserIdentity(ctx context.Context, 
id string) error {
                return fmt.Errorf("delete user identity: %w", err)
        }
 
-       s.eventBus.Publish(events.UserIdentityDeleteEvent, e)
+       s.eventBus.Publish(ctx, events.UserIdentityDeleteEvent, e)
        return nil
 }
diff --git a/pkg/service/user_merge.go b/pkg/service/user_merge.go
index f0a65263d..083d63f91 100644
--- a/pkg/service/user_merge.go
+++ b/pkg/service/user_merge.go
@@ -97,7 +97,7 @@ func (s *Service) MergeUsers(ctx context.Context, 
survivingID, retiringID string
        }
 
        retiring.Status = models.UserMerged
-       s.eventBus.Publish(events.UserUpdateEvent, retiring)
-       s.eventBus.Publish(events.UserUpdateEvent, survivor)
+       s.eventBus.Publish(ctx, events.UserUpdateEvent, retiring)
+       s.eventBus.Publish(ctx, events.UserUpdateEvent, survivor)
        return survivor, nil
 }


Reply via email to