wuchong commented on code in PR #8: URL: https://github.com/apache/fluss-benchmarks/pull/8#discussion_r2959502213
########## e2e-iot/high-infra/helm-charts/fluss/Chart.yaml: ########## @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: v2 +appVersion: 0.8.0-incubating Review Comment: It seems this helm charts is copied from `apache/fluss` project. Can we just to reuse the helm charts to avoid duplicate work? ########## e2e-iot/fluss_flink_realtime/JDBCFlinkConsumer.java: ########## @@ -0,0 +1,731 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iot.pipeline.flink; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.source.PulsarSource; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class JDBCFlinkConsumer { + private static final ObjectMapper mapper = new ObjectMapper(); + + /** + * AVRO Deserialization Schema for SensorData - converts to SensorRecord directly + */ + public static class AvroSensorDataDeserializationSchema implements DeserializationSchema<SensorRecord> { + private transient Schema avroSchema; + private transient org.apache.avro.io.DatumReader<GenericRecord> datumReader; + + @Override + public void open(InitializationContext context) throws Exception { + // Load AVRO schema from resources (now included in JAR) + try (InputStream schemaStream = getClass().getResourceAsStream("/avro/SensorData.avsc")) { + if (schemaStream == null) { + throw new RuntimeException("AVRO schema file not found: /avro/SensorData.avsc"); + } + avroSchema = new Schema.Parser().parse(schemaStream); + datumReader = new org.apache.avro.generic.GenericDatumReader<>(avroSchema); + System.out.println("✅ Loaded AVRO schema from JAR: " + avroSchema.getName()); + } catch (Exception e) { + throw new RuntimeException("Failed to load AVRO schema", e); + } + } + + @Override + public SensorRecord deserialize(byte[] message) throws IOException { + try { + // Deserialize AVRO binary message + org.apache.avro.io.Decoder decoder = org.apache.avro.io.DecoderFactory.get().binaryDecoder(message, null); + GenericRecord avroRecord = datumReader.read(null, decoder); + + // Convert directly to SensorRecord to avoid Kryo serialization issues + return new SensorRecord(avroRecord); + } catch (Exception e) { + throw new IOException("Failed to deserialize AVRO message", e); + } + } + + @Override + public boolean isEndOfStream(SensorRecord nextElement) { + return false; + } + + @Override + public TypeInformation<SensorRecord> getProducedType() { + return TypeInformation.of(SensorRecord.class); + } + } + + public static void main(String[] args) throws Exception { + String pulsarUrl = System.getenv().getOrDefault("PULSAR_URL", "pulsar://localhost:6650"); + String pulsarAdminUrl = System.getenv().getOrDefault("PULSAR_ADMIN_URL", "http://localhost:8080"); + String baseTopicName = System.getenv().getOrDefault("PULSAR_TOPIC", "persistent://public/default/iot-sensor-data"); + String clickhouseUrl = System.getenv().getOrDefault("CLICKHOUSE_URL", "jdbc:clickhouse://localhost:8123/benchmark"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // Use parallelism from FlinkDeployment YAML or default to 4 + // env.setParallelism(4); // REMOVED - let YAML control parallelism + + // NOTE: Checkpointing is now configured via FlinkDeployment YAML + // The config includes: interval, mode (EXACTLY_ONCE), state backend (RocksDB), etc. + // This code is checkpoint-aware and will participate in checkpointing automatically + + System.out.println("Starting JDBC Flink IoT Consumer with AVRO Support and 1-Minute Aggregation..."); + System.out.println("Pulsar URL: " + pulsarUrl); + System.out.println("Pulsar Admin URL: " + pulsarAdminUrl); + System.out.println("Consuming Topic: " + baseTopicName + " (all partitions)"); + System.out.println("ClickHouse URL: " + clickhouseUrl); Review Comment: Is this class actually being used? I haven't seen any references to it, and our benchmarks don't seem to include any joint tests with Pulsar or ClickHouse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
