This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch trino-435
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/trino-435 by this push:
     new d1c8da92ab9 [kafka] Support subject name mapping for Confluent Schema 
Registry (#378)
d1c8da92ab9 is described below

commit d1c8da92ab9b7e41a8aa409da7832fe0a07f3499
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sat Feb 7 01:07:58 2026 +0800

    [kafka] Support subject name mapping for Confluent Schema Registry (#378)
    
    Sometimes the subject name in Schema Registry does not match the Kafka
    topic name. This adds a new configuration property
    kafka.confluent-schema-registry-subject-mapping that allows users to
    manually specify schema.table to topic name mappings.
    
    The mapping is configured as a comma-separated list in the format:
    schema1.table1:topic1,schema2.table2:topic2
    
    When a mapping is configured for a given schema.table, the topic name
    in TopicAndSubjects is overridden with the mapped value, allowing the
    connector to correctly resolve schemas for topics whose subject names
    differ from their topic names.
---
 .../confluent/ConfluentSchemaRegistryConfig.java   | 56 ++++++++++++++++++++++
 ...uentSchemaRegistryTableDescriptionSupplier.java | 23 ++++++++-
 ...uentSchemaRegistryTableDescriptionSupplier.java |  3 +-
 3 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
index 67b0f757d2e..c2810d9f422 100644
--- 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
@@ -14,6 +14,7 @@
 package io.trino.plugin.kafka.schema.confluent;
 
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import io.airlift.configuration.Config;
 import io.airlift.configuration.ConfigDescription;
@@ -22,15 +23,20 @@ import io.airlift.units.MaxDuration;
 import io.airlift.units.MinDuration;
 import 
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy;
 import io.trino.spi.HostAddress;
+import io.trino.spi.connector.SchemaTableName;
 import jakarta.validation.constraints.Max;
 import jakarta.validation.constraints.Min;
 import jakarta.validation.constraints.Size;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 import static com.google.common.collect.Streams.stream;
 import static 
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE;
+import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 public class ConfluentSchemaRegistryConfig
@@ -46,6 +52,7 @@ public class ConfluentSchemaRegistryConfig
     private int confluentSchemaRegistryClientCacheSize = 1000;
     private EmptyFieldStrategy emptyFieldStrategy = IGNORE;
     private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, 
SECONDS);
+    private Map<SchemaTableName, String> confluentSchemaRegistrySubjectMapping 
= ImmutableMap.of();
 
     @Size(min = 1)
     public Set<HostAddress> getConfluentSchemaRegistryUrls()
@@ -117,6 +124,19 @@ public class ConfluentSchemaRegistryConfig
         return this;
     }
 
+    public Map<SchemaTableName, String> 
getConfluentSchemaRegistrySubjectMapping()
+    {
+        return confluentSchemaRegistrySubjectMapping;
+    }
+
+    @Config("kafka.confluent-schema-registry-subject-mapping")
+    @ConfigDescription("Comma-separated list of schema.table to actual topic 
name mappings. Format: schema1.table1:topic1,schema2.table2:topic2")
+    public ConfluentSchemaRegistryConfig 
setConfluentSchemaRegistrySubjectMapping(String mapping)
+    {
+        this.confluentSchemaRegistrySubjectMapping = (mapping == null) ? 
ImmutableMap.of() : parseSubjectMapping(mapping);
+        return this;
+    }
+
     private static ImmutableSet<HostAddress> parseNodes(String nodes)
     {
         Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
@@ -129,4 +149,40 @@ public class ConfluentSchemaRegistryConfig
     {
         return HostAddress.fromString(value);
     }
+
+    private static ImmutableMap<SchemaTableName, String> 
parseSubjectMapping(String mapping)
+    {
+        requireNonNull(mapping, "mapping is null");
+
+        Splitter entrySplitter = 
Splitter.on(',').omitEmptyStrings().trimResults();
+        Splitter keyValueSplitter = Splitter.on(':').trimResults();
+
+        ImmutableMap.Builder<SchemaTableName, String> builder = 
ImmutableMap.builder();
+
+        for (String entry : entrySplitter.split(mapping)) {
+            List<String> parts = keyValueSplitter.splitToList(entry);
+            checkArgument(parts.size() == 2,
+                    "Invalid mapping format '%s'. Expected format: 
schema.table:topic", entry);
+
+            String schemaTable = parts.get(0);
+            String topicName = parts.get(1);
+
+            List<String> schemaTableParts = 
Splitter.on('.').trimResults().splitToList(schemaTable);
+            checkArgument(schemaTableParts.size() == 2,
+                    "Invalid schema.table format '%s'. Expected format: 
schema.table", schemaTable);
+
+            String schema = schemaTableParts.get(0);
+            String table = schemaTableParts.get(1);
+
+            checkArgument(!schema.isEmpty() && !table.isEmpty(),
+                    "Schema and table names cannot be empty in '%s'", 
schemaTable);
+            checkArgument(!topicName.isEmpty(),
+                    "Topic name cannot be empty in mapping '%s'", entry);
+
+            SchemaTableName schemaTableName = new SchemaTableName(schema, 
table);
+            builder.put(schemaTableName, topicName);
+        }
+
+        return builder.buildOrThrow();
+    }
 }
diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
index b18236eb065..40f851f454c 100644
--- 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
@@ -76,16 +76,19 @@ public class ConfluentSchemaRegistryTableDescriptionSupplier
     private final String defaultSchema;
     private final Supplier<SetMultimap<String, TopicAndSubjects>> 
topicAndSubjectsSupplier;
     private final Supplier<SetMultimap<String, String>> subjectsSupplier;
+    private final Map<SchemaTableName, String> subjectMapping;
 
     public ConfluentSchemaRegistryTableDescriptionSupplier(
             SchemaRegistryClient schemaRegistryClient,
             Map<String, SchemaParser> schemaParsers,
             String defaultSchema,
-            Duration subjectsCacheRefreshInterval)
+            Duration subjectsCacheRefreshInterval,
+            Map<SchemaTableName, String> subjectMapping)
     {
         this.schemaRegistryClient = requireNonNull(schemaRegistryClient, 
"schemaRegistryClient is null");
         this.schemaParsers = ImmutableMap.copyOf(requireNonNull(schemaParsers, 
"schemaParsers is null"));
         this.defaultSchema = requireNonNull(defaultSchema, "defaultSchema is 
null");
+        this.subjectMapping = 
ImmutableMap.copyOf(requireNonNull(subjectMapping, "subjectMapping is null"));
         topicAndSubjectsSupplier = 
memoizeWithExpiration(this::getTopicAndSubjects, 
subjectsCacheRefreshInterval.toMillis(), MILLISECONDS);
         subjectsSupplier = memoizeWithExpiration(this::getAllSubjects, 
subjectsCacheRefreshInterval.toMillis(), MILLISECONDS);
     }
@@ -97,6 +100,7 @@ public class ConfluentSchemaRegistryTableDescriptionSupplier
         private final Map<String, SchemaParser> schemaParsers;
         private final String defaultSchema;
         private final Duration subjectsCacheRefreshInterval;
+        private final Map<SchemaTableName, String> subjectMapping;
 
         @Inject
         public Factory(
@@ -109,12 +113,18 @@ public class 
ConfluentSchemaRegistryTableDescriptionSupplier
             this.schemaParsers = 
ImmutableMap.copyOf(requireNonNull(schemaParsers, "schemaParsers is null"));
             this.defaultSchema = kafkaConfig.getDefaultSchema();
             this.subjectsCacheRefreshInterval = 
confluentConfig.getConfluentSubjectsCacheRefreshInterval();
+            this.subjectMapping = 
confluentConfig.getConfluentSchemaRegistrySubjectMapping();
         }
 
         @Override
         public TableDescriptionSupplier get()
         {
-            return new 
ConfluentSchemaRegistryTableDescriptionSupplier(schemaRegistryClient, 
schemaParsers, defaultSchema, subjectsCacheRefreshInterval);
+            return new ConfluentSchemaRegistryTableDescriptionSupplier(
+                    schemaRegistryClient,
+                    schemaParsers,
+                    defaultSchema,
+                    subjectsCacheRefreshInterval,
+                    subjectMapping);
         }
     }
 
@@ -195,6 +205,15 @@ public class 
ConfluentSchemaRegistryTableDescriptionSupplier
                     
topicAndSubjects.getValueSubject().or(topicAndSubjectsFromCache::getValueSubject));
         }
 
+        // Apply subject mapping override if configured
+        if (subjectMapping.containsKey(schemaTableName)) {
+            String overrideTopic = subjectMapping.get(schemaTableName);
+            topicAndSubjects = new TopicAndSubjects(
+                    overrideTopic,
+                    topicAndSubjects.getKeySubject(),
+                    topicAndSubjects.getValueSubject());
+        }
+
         if (topicAndSubjects.getKeySubject().isEmpty() && 
topicAndSubjects.getValueSubject().isEmpty()) {
             return Optional.empty();
         }
diff --git 
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
 
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
index b9b7ca6e637..b43cf869c45 100644
--- 
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
+++ 
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
@@ -195,7 +195,8 @@ public class 
TestConfluentSchemaRegistryTableDescriptionSupplier
                 SCHEMA_REGISTRY_CLIENT,
                 ImmutableMap.of("AVRO", new AvroSchemaParser(new 
TestingTypeManager())),
                 DEFAULT_NAME,
-                new Duration(1, SECONDS));
+                new Duration(1, SECONDS),
+                ImmutableMap.of());
     }
 
     private static Schema getAvroSchema(String topicName, String 
columnNamePrefix)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to