r-reis opened a new issue, #15262:
URL: https://github.com/apache/pinot/issues/15262

   I'm trying to use pinot stream Kafka data, but the schema registry from 
Confluent only works for Avro and Protobuf.
   JSON messages work as long as you do not use the JSON schema.
    
   How to reproduce the error. 
   docker-compose:
   ```
   services:
     broker:
       image: confluentinc/cp-kafka:7.9.0
       hostname: broker
       container_name: broker
       ports:
         - "9092:9092"
         - "9101:9101"
       environment:
         KAFKA_NODE_ID: 1
         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
         KAFKA_ADVERTISED_LISTENERS: 
"PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092"
         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
         KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
         KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
         KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
         KAFKA_JMX_PORT: 9101
         KAFKA_JMX_HOSTNAME: localhost
         KAFKA_PROCESS_ROLES: "broker,controller"
         KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093"
         KAFKA_LISTENERS: 
"PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092"
         KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
         KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
         KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
         # Replace CLUSTER_ID with a unique base64 UUID using 
"bin/kafka-storage.sh random-uuid"
         # See 
https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
         CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
   
     schema-registry:
       image: confluentinc/cp-schema-registry:7.9.0
       hostname: schema-registry
       container_name: schema-registry
       depends_on:
         - broker
       ports:
         - "8081:8081"
       environment:
         SCHEMA_REGISTRY_HOST_NAME: schema-registry
         SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
         SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
   
     connect:
       image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
       hostname: connect
       container_name: connect
       depends_on:
         - broker
         - schema-registry
       ports:
         - "8083:8083"
       environment:
         CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
         CONNECT_REST_ADVERTISED_HOST_NAME: connect
         CONNECT_GROUP_ID: compose-connect-group
         CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
         CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
         CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
         CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
         CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
         CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
         CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
         CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
         CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
         CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 
http://schema-registry:8081
         # CLASSPATH required due to CC-2422
         CLASSPATH: 
/usr/share/java/monitoring-interceptors/monitoring-interceptors-7.9.0.jar
         CONNECT_PRODUCER_INTERCEPTOR_CLASSES: 
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
         CONNECT_CONSUMER_INTERCEPTOR_CLASSES: 
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
         CONNECT_PLUGIN_PATH: 
"/usr/share/java,/usr/share/confluent-hub-components"
   
     control-center:
       image: confluentinc/cp-enterprise-control-center:7.9.0
       hostname: control-center
       container_name: control-center
       depends_on:
         - broker
         - schema-registry
         - connect
         - ksqldb-server
       ports:
         - "9021:9021"
       environment:
         CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"
         CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: "connect:8083"
         CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: "/connectors"
         CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088";
         CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088";
         CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
         CONTROL_CENTER_REPLICATION_FACTOR: 1
         CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
         CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
         CONFLUENT_METRICS_TOPIC_REPLICATION: 1
         PORT: 9021
   
     ksqldb-server:
       image: confluentinc/cp-ksqldb-server:7.9.0
       hostname: ksqldb-server
       container_name: ksqldb-server
       depends_on:
         - broker
         - connect
       ports:
         - "8088:8088"
       environment:
         KSQL_CONFIG_DIR: "/etc/ksql"
         KSQL_BOOTSTRAP_SERVERS: "broker:29092"
         KSQL_HOST_NAME: ksqldb-server
         KSQL_LISTENERS: "http://0.0.0.0:8088";
         KSQL_CACHE_MAX_BYTES_BUFFERING: 0
         KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
         KSQL_PRODUCER_INTERCEPTOR_CLASSES: 
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
         KSQL_CONSUMER_INTERCEPTOR_CLASSES: 
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
         KSQL_KSQL_CONNECT_URL: "http://connect:8083";
         KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
         KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
         KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
   
     ksqldb-cli:
       image: confluentinc/cp-ksqldb-cli:7.9.0
       container_name: ksqldb-cli
       depends_on:
         - broker
         - connect
         - ksqldb-server
       entrypoint: /bin/sh
       tty: true
   
     ksql-datagen:
       image: confluentinc/ksqldb-examples:7.9.0
       hostname: ksql-datagen
       container_name: ksql-datagen
       depends_on:
         - ksqldb-server
         - broker
         - schema-registry
         - connect
       command: "bash -c 'echo Waiting for Kafka to be ready... && \
         cub kafka-ready -b broker:29092 1 40 && \
         echo Waiting for Confluent Schema Registry to be ready... && \
         cub sr-ready schema-registry 8081 40 && \
         echo Waiting a few seconds for topic creation to finish... && \
         sleep 11 && \
         tail -f /dev/null'"
       environment:
         KSQL_CONFIG_DIR: "/etc/ksql"
         STREAMS_BOOTSTRAP_SERVERS: broker:29092
         STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
         STREAMS_SCHEMA_REGISTRY_PORT: 8081
   
     rest-proxy:
       image: confluentinc/cp-kafka-rest:7.9.0
       depends_on:
         - broker
         - schema-registry
       ports:
         - 8082:8082
       hostname: rest-proxy
       container_name: rest-proxy
       environment:
         KAFKA_REST_HOST_NAME: rest-proxy
         KAFKA_REST_BOOTSTRAP_SERVERS: "broker:29092"
         KAFKA_REST_LISTENERS: "http://0.0.0.0:8082";
         KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
   
     zookeeper:
       image: bitnami/zookeeper:latest
       ports:
         - 2181:2181
       environment:
         - ALLOW_ANONYMOUS_LOGIN=yes
   
     pinot-controller:
       image: apachepinot/pinot:latest
       command: "StartController -zkAddress zookeeper:2181"
       container_name: "pinot-controller"
       restart: unless-stopped
       ports:
         - "9000:9000"
   
     pinot-broker:
       image: apachepinot/pinot:latest
       command: "StartBroker -zkAddress zookeeper:2181"
       restart: unless-stopped
       container_name: "pinot-broker"
       ports:
         - "8099:8099"
       depends_on:
         - pinot-controller
   
     pinot-server:
       image: apachepinot/pinot:latest
       command: "StartServer -zkAddress zookeeper:2181"
       restart: unless-stopped
       container_name: "pinot-server"
       depends_on:
         - pinot-broker
   
   networks:
     default:
       driver: bridge
   
   ```
   
   After the services start, go to the portal on localhost:9021
   Create a topic called "topic_1", define it with JSON schema. ex:
   ```
   {
     "properties": {
       "created_at": {
         "connect.index": 4,
         "connect.type": "int64",
         "connect.version": 1,
         "default": 0,
         "title": "io.debezium.time.Timestamp",
         "type": "integer"
       },
       "id": {
         "connect.index": 0,
         "connect.type": "int32",
         "default": 0,
         "type": "integer"
       }
     },
     "title": "topic_1.Value",
     "type": "object"
   }
   ```
   Post a message using the schema (this can be done after or before setting up 
pinots table)
   ```
   {
     "id": 1,
     "created_at": 1734980659034
   }
   ```
   
   After that, create the schema and table on pinot:
   ```
   {
     "schemaName": "topic_1",
     "dimensionFieldSpecs": [
       {
         "name": "id",
         "dataType": "INT"
       }
     ],
         "dateTimeFieldSpecs": [
       {
         "name": "created_at",
         "dataType": "LONG",
                        "format" : "1:MILLISECONDS:EPOCH",
                        "granularity": "1:MILLISECONDS"
       }
        ]
   }
   ```
   table config:
   ```
   {
     "tableName": "topic_1",
     "tableType": "REALTIME",
     "segmentsConfig": {
       "timeColumnName": "created_at",
       "timeType": "MILLISECONDS",
       "replicasPerPartition": "1"
     },
     "tenants": {},
     "tableIndexConfig": {
       "loadMode": "MMAP",
       "streamConfigs": {
         "stream.kafka.metadata.populate" : "true",
         "streamType": "kafka",
                        "stream.kafka.decoder.prop.format": "JSON",
         "stream.kafka.consumer.type": "lowlevel",
         "stream.kafka.topic.name": "topic_1",
                        "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
         "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
                
                        "stream.kafka.schema.registry.url": 
"http://schema-registry:8081";,
                        "stream.kafka.decoder.prop.schema.registry.rest.url": 
"http://schema-registry:8081";,
                        
         "stream.kafka.broker.list": "broker:29092",
         "realtime.segment.flush.threshold.rows": "0",
         "realtime.segment.flush.threshold.time": "24h",
         "realtime.segment.flush.threshold.segment.size": "50M",
         "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
                        
         "key.serializer": 
"shaded.org.apache.kafka.connect.storage.StringDeserializer",
         "value.serializer": 
"shaded.org.apache.kafka.connect.storage.StringDeserializer"
       }
     },
        "ingestionConfig": {
                "continueOnError": true
        },
     "metadata": {
       "customConfigs": {}
     }
   }
   
   ```
   
   My guess is that since we have the Avro and Protbuf decoder for confluent we 
also need the JSON one:
   
https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaDeserializer.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to