This is an automated email from the ASF dual-hosted git repository.
joaoreis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f64c253 Fix a couple of issues related to CASSGO-101
5f64c253 is described below
commit 5f64c2530c9baa3b2d7b0c089f6ba0d9d2c7b7d5
Author: Bohdan Siryk <[email protected]>
AuthorDate: Thu Mar 19 18:30:22 2026 +0200
Fix a couple of issues related to CASSGO-101
There were a couple of things that had to be remediated:
- Fixed a bug when session initialization wasn't possible with both
KeyspaceOnly cache mode and KeyspaceChangeListener was set
- Fixed misleading debug logs
- Removed pointer receiver from HostListenersMux methods to be consistent
with other multiplexers
Patch by Bohdan Siryk; reviewed by João Reis for CASSGO-114
---
CHANGELOG.md | 1 +
cassandra_test.go | 142 ++++++++++++++++++++++++++++++++++++++++++++++++
event_listeners.go | 11 ++--
event_listeners_test.go | 14 ++---
metadata.go | 20 +++----
session.go | 6 +-
6 files changed, 168 insertions(+), 26 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cc8295d1..dcbbca73 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- Add options to shuffle replicas for token-aware policy and log warning when
the default behavior is used (CASSGO-106)
- Bump Go version support from 1.22 and 1.23 to 1.25 and 1.26 (CASSGO-110)
- Upgraded Github actions dependencies versions (CASSGO-111)
+- Fix a couple of issues related to CASSGO-101 (CASSGO-114)
### Fixed
diff --git a/cassandra_test.go b/cassandra_test.go
index 4f61cd74..d33de21c 100644
--- a/cassandra_test.go
+++ b/cassandra_test.go
@@ -4475,3 +4475,145 @@ func TestSessionReadyEvent(t *testing.T) {
require.Equal(t, 1, listener.readyCount)
require.Equal(t, session, listener.gotSession)
}
+
+func TestNewSession_SchemaListenersValidation(t *testing.T) {
+ // No better way to test this rather than creating a session and
checking the error
+ tests := []struct {
+ name string
+ metadata MetadataConfig
+ shouldFail bool
+ }{
+ {
+ name: "KeyspaceOnly metadata cache mode with schema
change listeners",
+ metadata: MetadataConfig{
+ CacheMode: KeyspaceOnly,
+ SchemaListener: SchemaListenersConfig{
+ KeyspaceChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: false,
+ },
+ {
+ name: "KeyspaceOnly metadata cache mode with
non-keyspace schema change listeners",
+ metadata: MetadataConfig{
+ CacheMode: KeyspaceOnly,
+ SchemaListener: SchemaListenersConfig{
+ TableChangeListener:
&schemaChangesTestListener{},
+ UserTypeChangeListener:
&schemaChangesTestListener{},
+ FunctionChangeListener:
&schemaChangesTestListener{},
+ AggregateChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: true,
+ },
+ {
+ name: "KeyspaceOnly metadata cache mode with both
keyspace and non-keyspace schema change listeners",
+ metadata: MetadataConfig{
+ CacheMode: KeyspaceOnly,
+ SchemaListener: SchemaListenersConfig{
+ KeyspaceChangeListener:
&schemaChangesTestListener{},
+ TableChangeListener:
&schemaChangesTestListener{},
+ UserTypeChangeListener:
&schemaChangesTestListener{},
+ FunctionChangeListener:
&schemaChangesTestListener{},
+ AggregateChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: true,
+ },
+ {
+ name: "Disabled metadata cache mode with no schema
change listeners",
+ metadata: MetadataConfig{
+ CacheMode: Disabled,
+ },
+ shouldFail: false,
+ },
+ {
+ name: "Disabled metadata cache mode with keyspace
change listener",
+ metadata: MetadataConfig{
+ CacheMode: Disabled,
+ SchemaListener: SchemaListenersConfig{
+ KeyspaceChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: true,
+ },
+ {
+ name: "Disabled metadata cache mode with non-keyspace
schema change listeners",
+ metadata: MetadataConfig{
+ CacheMode: Disabled,
+ SchemaListener: SchemaListenersConfig{
+ TableChangeListener:
&schemaChangesTestListener{},
+ UserTypeChangeListener:
&schemaChangesTestListener{},
+ FunctionChangeListener:
&schemaChangesTestListener{},
+ AggregateChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: true,
+ },
+ {
+ name: "Disabled metadata cache mode with all schema
change listeners",
+ metadata: MetadataConfig{
+ CacheMode: Disabled,
+ SchemaListener: SchemaListenersConfig{
+ KeyspaceChangeListener:
&schemaChangesTestListener{},
+ TableChangeListener:
&schemaChangesTestListener{},
+ UserTypeChangeListener:
&schemaChangesTestListener{},
+ FunctionChangeListener:
&schemaChangesTestListener{},
+ AggregateChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: true,
+ },
+ {
+ name: "Full metadata cache mode with keyspace change
listener",
+ metadata: MetadataConfig{
+ CacheMode: Full,
+ SchemaListener: SchemaListenersConfig{
+ KeyspaceChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: false,
+ },
+ {
+ name: "Full metadata cache mode with non-keyspace
schema change listeners",
+ metadata: MetadataConfig{
+ CacheMode: Full,
+ SchemaListener: SchemaListenersConfig{
+ TableChangeListener:
&schemaChangesTestListener{},
+ UserTypeChangeListener:
&schemaChangesTestListener{},
+ FunctionChangeListener:
&schemaChangesTestListener{},
+ AggregateChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: false,
+ },
+ {
+ name: "Full metadata cache mode with all schema change
listeners",
+ metadata: MetadataConfig{
+ CacheMode: Full,
+ SchemaListener: SchemaListenersConfig{
+ KeyspaceChangeListener:
&schemaChangesTestListener{},
+ TableChangeListener:
&schemaChangesTestListener{},
+ UserTypeChangeListener:
&schemaChangesTestListener{},
+ FunctionChangeListener:
&schemaChangesTestListener{},
+ AggregateChangeListener:
&schemaChangesTestListener{},
+ },
+ },
+ shouldFail: false,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ cluster := createCluster()
+ cluster.Metadata = test.metadata
+ session, err := cluster.CreateSession()
+ if test.shouldFail {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ session.Close()
+ }
+ })
+ }
+}
diff --git a/event_listeners.go b/event_listeners.go
index 14412ad5..e96c4c8f 100644
--- a/event_listeners.go
+++ b/event_listeners.go
@@ -266,25 +266,25 @@ type HostListenersMux struct {
TopologyChangeListeners []TopologyChangeListener
}
-func (mux *HostListenersMux) OnHostUp(event HostUpEvent) {
+func (mux HostListenersMux) OnHostUp(event HostUpEvent) {
for _, listener := range mux.HostStateChangeListeners {
listener.OnHostUp(event)
}
}
-func (mux *HostListenersMux) OnHostDown(event HostDownEvent) {
+func (mux HostListenersMux) OnHostDown(event HostDownEvent) {
for _, listener := range mux.HostStateChangeListeners {
listener.OnHostDown(event)
}
}
-func (mux *HostListenersMux) OnNewHost(event NewHostEvent) {
+func (mux HostListenersMux) OnNewHost(event NewHostEvent) {
for _, listener := range mux.TopologyChangeListeners {
listener.OnNewHost(event)
}
}
-func (mux *HostListenersMux) OnRemovedHost(event RemovedHostEvent) {
+func (mux HostListenersMux) OnRemovedHost(event RemovedHostEvent) {
for _, listener := range mux.TopologyChangeListeners {
listener.OnRemovedHost(event)
}
@@ -483,8 +483,7 @@ func (l internalSchemaListeners) hasSchemaChangeListeners()
bool {
}
func (l internalSchemaListeners) hasNonKeyspaceSchemaChangeListeners() bool {
- return l.hasKeyspace() ||
- l.hasTable() ||
+ return l.hasTable() ||
l.hasUserType() ||
l.hasFunction() ||
l.hasAggregate()
diff --git a/event_listeners_test.go b/event_listeners_test.go
index 48707e0b..ade043cd 100644
--- a/event_listeners_test.go
+++ b/event_listeners_test.go
@@ -393,14 +393,14 @@ func
TestSchemaListenersMux_OnlyTargetCategoryReceivesEvents(t *testing.T) {
func TestHostListenersMux_HostStatus(t *testing.T) {
t.Run("no listeners", func(t *testing.T) {
- mux := &HostListenersMux{}
+ mux := HostListenersMux{}
mux.OnHostUp(HostUpEvent{Host: &HostInfo{hostId: "h1"}})
mux.OnHostDown(HostDownEvent{Host: &HostInfo{hostId: "h1"}})
})
t.Run("single listener", func(t *testing.T) {
l := &mockHostStatusChangeListener{}
- mux := &HostListenersMux{
+ mux := HostListenersMux{
HostStateChangeListeners: []HostStatusChangeListener{l},
}
@@ -415,7 +415,7 @@ func TestHostListenersMux_HostStatus(t *testing.T) {
l1 := &mockHostStatusChangeListener{}
l2 := &mockHostStatusChangeListener{}
l3 := &mockHostStatusChangeListener{}
- mux := &HostListenersMux{
+ mux := HostListenersMux{
HostStateChangeListeners:
[]HostStatusChangeListener{l1, l2, l3},
}
@@ -431,14 +431,14 @@ func TestHostListenersMux_HostStatus(t *testing.T) {
func TestHostListenersMux_Topology(t *testing.T) {
t.Run("no listeners", func(t *testing.T) {
- mux := &HostListenersMux{}
+ mux := HostListenersMux{}
mux.OnNewHost(NewHostEvent{Host: &HostInfo{hostId: "h1"}})
mux.OnRemovedHost(RemovedHostEvent{Host: &HostInfo{hostId:
"h1"}})
})
t.Run("single listener", func(t *testing.T) {
l := &mockTopologyChangeListener{}
- mux := &HostListenersMux{
+ mux := HostListenersMux{
TopologyChangeListeners: []TopologyChangeListener{l},
}
@@ -453,7 +453,7 @@ func TestHostListenersMux_Topology(t *testing.T) {
l1 := &mockTopologyChangeListener{}
l2 := &mockTopologyChangeListener{}
l3 := &mockTopologyChangeListener{}
- mux := &HostListenersMux{
+ mux := HostListenersMux{
TopologyChangeListeners: []TopologyChangeListener{l1,
l2, l3},
}
@@ -471,7 +471,7 @@ func
TestHostListenersMux_OnlyTargetCategoryReceivesEvents(t *testing.T) {
statusListener := &mockHostStatusChangeListener{}
topoListener := &mockTopologyChangeListener{}
- mux := &HostListenersMux{
+ mux := HostListenersMux{
HostStateChangeListeners:
[]HostStatusChangeListener{statusListener},
TopologyChangeListeners:
[]TopologyChangeListener{topoListener},
}
diff --git a/metadata.go b/metadata.go
index 0753f6ed..6e93f769 100644
--- a/metadata.go
+++ b/metadata.go
@@ -2574,10 +2574,10 @@ func handleSchemaAggregateChanges(session *Session,
oldKeyspace, newKeyspace *Ke
}
}
- session.logger.Debug("Computed schema table change events",
- NewLogFieldInt("created_table_events_count",
len(createdEvents)),
- NewLogFieldInt("updated_table_events_count",
len(updatedEvents)),
- NewLogFieldInt("dropped_table_events_count",
len(droppedEvents)),
+ session.logger.Debug("Computed schema aggregate change events",
+ NewLogFieldInt("created_aggregate_events_count",
len(createdEvents)),
+ NewLogFieldInt("updated_aggregate_events_count",
len(updatedEvents)),
+ NewLogFieldInt("dropped_aggregate_events_count",
len(droppedEvents)),
)
for _, updatedEvent := range updatedEvents {
@@ -2619,9 +2619,9 @@ func handleSchemaUserTypeChanges(session *Session,
oldKeyspace, newKeyspace *Key
}
session.logger.Debug("Computed schema user type change events",
- NewLogFieldInt("created_table_events_count",
len(createdEvents)),
- NewLogFieldInt("updated_table_events_count",
len(updatedEvents)),
- NewLogFieldInt("dropped_table_events_count",
len(droppedEvents)),
+ NewLogFieldInt("created_user_type_events_count",
len(createdEvents)),
+ NewLogFieldInt("updated_user_type_events_count",
len(updatedEvents)),
+ NewLogFieldInt("dropped_user_type_events_count",
len(droppedEvents)),
)
for _, updatedEvent := range updatedEvents {
@@ -2663,9 +2663,9 @@ func handleSchemaFunctionChanges(session *Session,
oldKeyspace, newKeyspace *Key
}
session.logger.Debug("Computed schema function change events",
- NewLogFieldInt("created_table_events_count",
len(createdEvents)),
- NewLogFieldInt("updated_table_events_count",
len(updatedEvents)),
- NewLogFieldInt("dropped_table_events_count",
len(droppedEvents)),
+ NewLogFieldInt("created_function_events_count",
len(createdEvents)),
+ NewLogFieldInt("updated_function_events_count",
len(updatedEvents)),
+ NewLogFieldInt("dropped_function_events_count",
len(droppedEvents)),
)
for _, updatedEvent := range updatedEvents {
diff --git a/session.go b/session.go
index 2895719f..fcbdd703 100644
--- a/session.go
+++ b/session.go
@@ -187,14 +187,14 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
s.frameObserver = cfg.FrameHeaderObserver
s.streamObserver = cfg.StreamObserver
- // Propogate node status, topology and schema change listeners
+ // Propagate node status, topology and schema change listeners
s.hostListeners = newInternalHostStateListeners(
s,
cfg.Metadata.HostListener.HostStateChangeListener,
cfg.Metadata.HostListener.TopologyChangeListener,
)
- // Propogate schema change listeners
+ // Propagate schema change listeners
s.schemaListeners = newInternalSchemaChangeListeners(
cfg.Metadata.SchemaListener.KeyspaceChangeListener,
cfg.Metadata.SchemaListener.TableChangeListener,
@@ -211,7 +211,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
return nil, errors.New("Schema change listeners are not
supported in KeyspaceOnly metadata cache mode")
}
- // Propogate session ready listener
+ // Propagate session ready listener
s.sessionReadyListeners =
newInternalSessionReadyListener(cfg.Metadata.SessionReadyListener)
//Check the TLS Config before trying to connect to anything external
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]