r-reis opened a new issue, #15262: URL: https://github.com/apache/pinot/issues/15262
I'm trying to use pinot stream Kafka data, but the schema registry from Confluent only works for Avro and Protobuf. JSON messages work as long as you do not use the JSON schema. How to reproduce the error. docker-compose: ``` services: broker: image: confluentinc/cp-kafka:7.9.0 hostname: broker container_name: broker ports: - "9092:9092" - "9101:9101" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: "broker,controller" KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093" KAFKA_LISTENERS: "PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092" KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" schema-registry: image: confluentinc/cp-schema-registry:7.9.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092" SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: "broker:29092" CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 # CLASSPATH required due to CC-2422 CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.9.0.jar CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" control-center: image: confluentinc/cp-enterprise-control-center:7.9.0 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - connect - ksqldb-server ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092" CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: "connect:8083" CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: "/connectors" CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 ksqldb-server: image: confluentinc/cp-ksqldb-server:7.9.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.9.0 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:7.9.0 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b broker:29092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:7.9.0 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: "broker:29092" KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes pinot-controller: image: apachepinot/pinot:latest command: "StartController -zkAddress zookeeper:2181" container_name: "pinot-controller" restart: unless-stopped ports: - "9000:9000" pinot-broker: image: apachepinot/pinot:latest command: "StartBroker -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-broker" ports: - "8099:8099" depends_on: - pinot-controller pinot-server: image: apachepinot/pinot:latest command: "StartServer -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-server" depends_on: - pinot-broker networks: default: driver: bridge ``` After the services start, go to the portal on localhost:9021 Create a topic called "topic_1", define it with JSON schema. ex: ``` { "properties": { "created_at": { "connect.index": 4, "connect.type": "int64", "connect.version": 1, "default": 0, "title": "io.debezium.time.Timestamp", "type": "integer" }, "id": { "connect.index": 0, "connect.type": "int32", "default": 0, "type": "integer" } }, "title": "topic_1.Value", "type": "object" } ``` Post a message using the schema (this can be done after or before setting up pinots table) ``` { "id": 1, "created_at": 1734980659034 } ``` After that, create the schema and table on pinot: ``` { "schemaName": "topic_1", "dimensionFieldSpecs": [ { "name": "id", "dataType": "INT" } ], "dateTimeFieldSpecs": [ { "name": "created_at", "dataType": "LONG", "format" : "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ] } ``` table config: ``` { "tableName": "topic_1", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "created_at", "timeType": "MILLISECONDS", "replicasPerPartition": "1" }, "tenants": {}, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "stream.kafka.metadata.populate" : "true", "streamType": "kafka", "stream.kafka.decoder.prop.format": "JSON", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "topic_1", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory", "stream.kafka.schema.registry.url": "http://schema-registry:8081", "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", "stream.kafka.broker.list": "broker:29092", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.threshold.segment.size": "50M", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "key.serializer": "shaded.org.apache.kafka.connect.storage.StringDeserializer", "value.serializer": "shaded.org.apache.kafka.connect.storage.StringDeserializer" } }, "ingestionConfig": { "continueOnError": true }, "metadata": { "customConfigs": {} } } ``` My guess is that since we have the Avro and Protbuf decoder for confluent we also need the JSON one: https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaDeserializer.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org