wuchong commented on code in PR #8:
URL: https://github.com/apache/fluss-benchmarks/pull/8#discussion_r2959502213


##########
e2e-iot/high-infra/helm-charts/fluss/Chart.yaml:
##########
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: v2
+appVersion: 0.8.0-incubating

Review Comment:
   It seems this helm charts is copied from `apache/fluss` project. Can we just 
to reuse the helm charts to avoid duplicate work?



##########
e2e-iot/fluss_flink_realtime/JDBCFlinkConsumer.java:
##########
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.iot.pipeline.flink;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JDBCFlinkConsumer {
+    private static final ObjectMapper mapper = new ObjectMapper();
+    
+    /**
+     * AVRO Deserialization Schema for SensorData - converts to SensorRecord 
directly
+     */
+    public static class AvroSensorDataDeserializationSchema implements 
DeserializationSchema<SensorRecord> {
+        private transient Schema avroSchema;
+        private transient org.apache.avro.io.DatumReader<GenericRecord> 
datumReader;
+        
+        @Override
+        public void open(InitializationContext context) throws Exception {
+            // Load AVRO schema from resources (now included in JAR)
+            try (InputStream schemaStream = 
getClass().getResourceAsStream("/avro/SensorData.avsc")) {
+                if (schemaStream == null) {
+                    throw new RuntimeException("AVRO schema file not found: 
/avro/SensorData.avsc");
+                }
+                avroSchema = new Schema.Parser().parse(schemaStream);
+                datumReader = new 
org.apache.avro.generic.GenericDatumReader<>(avroSchema);
+                System.out.println("✅ Loaded AVRO schema from JAR: " + 
avroSchema.getName());
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to load AVRO schema", e);
+            }
+        }
+        
+        @Override
+        public SensorRecord deserialize(byte[] message) throws IOException {
+            try {
+                // Deserialize AVRO binary message
+                org.apache.avro.io.Decoder decoder = 
org.apache.avro.io.DecoderFactory.get().binaryDecoder(message, null);
+                GenericRecord avroRecord = datumReader.read(null, decoder);
+                
+                // Convert directly to SensorRecord to avoid Kryo 
serialization issues
+                return new SensorRecord(avroRecord);
+            } catch (Exception e) {
+                throw new IOException("Failed to deserialize AVRO message", e);
+            }
+        }
+        
+        @Override
+        public boolean isEndOfStream(SensorRecord nextElement) {
+            return false;
+        }
+        
+        @Override
+        public TypeInformation<SensorRecord> getProducedType() {
+            return TypeInformation.of(SensorRecord.class);
+        }
+    }
+    
+    public static void main(String[] args) throws Exception {
+        String pulsarUrl = System.getenv().getOrDefault("PULSAR_URL", 
"pulsar://localhost:6650");
+        String pulsarAdminUrl = 
System.getenv().getOrDefault("PULSAR_ADMIN_URL", "http://localhost:8080";);
+        String baseTopicName = System.getenv().getOrDefault("PULSAR_TOPIC", 
"persistent://public/default/iot-sensor-data");
+        String clickhouseUrl = System.getenv().getOrDefault("CLICKHOUSE_URL", 
"jdbc:clickhouse://localhost:8123/benchmark");
+        
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // Use parallelism from FlinkDeployment YAML or default to 4
+        // env.setParallelism(4);  // REMOVED - let YAML control parallelism
+        
+        // NOTE: Checkpointing is now configured via FlinkDeployment YAML
+        // The config includes: interval, mode (EXACTLY_ONCE), state backend 
(RocksDB), etc.
+        // This code is checkpoint-aware and will participate in checkpointing 
automatically
+        
+        System.out.println("Starting JDBC Flink IoT Consumer with AVRO Support 
and 1-Minute Aggregation...");
+        System.out.println("Pulsar URL: " + pulsarUrl);
+        System.out.println("Pulsar Admin URL: " + pulsarAdminUrl);
+        System.out.println("Consuming Topic: " + baseTopicName + " (all 
partitions)");
+        System.out.println("ClickHouse URL: " + clickhouseUrl);

Review Comment:
   Is this class actually being used? I haven't seen any references to it, and 
our benchmarks don't seem to include any joint tests with Pulsar or ClickHouse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to