wuchong commented on code in PR #8: URL: https://github.com/apache/fluss-benchmarks/pull/8#discussion_r2958397669
########## e2e-iot/fluss_flink_realtime/src/main/java/org/apache/fluss/benchmark/e2eplatformaws/producer/ProducerMetrics.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.benchmark.e2eplatformaws.producer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +import com.sun.net.httpserver.HttpServer; + +/** + * Simple Prometheus metrics server for the producer. + * Exposes metrics on port 8080 at /metrics endpoint. + */ +public class ProducerMetrics { + private static final Logger LOG = LoggerFactory.getLogger(ProducerMetrics.class); + + private final LongAdder totalRecords = new LongAdder(); + private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); + private final AtomicLong lastStatsTime = new AtomicLong(System.currentTimeMillis()); + private final AtomicLong lastStatsRecords = new AtomicLong(0); + + private HttpServer server; + private final int port; + + public ProducerMetrics(int port) { + this.port = port; + } + + public void start() throws IOException { + // Bind to 0.0.0.0 to make it accessible from outside the container + server = HttpServer.create(new InetSocketAddress("0.0.0.0", port), 0); + server.createContext("/metrics", this::handleMetrics); + server.setExecutor(null); // Use default executor + server.start(); + LOG.info("Producer metrics server started on port {} (bound to 0.0.0.0)", port); + } + + public void stop() { + if (server != null) { + server.stop(0); + LOG.info("Producer metrics server stopped"); + } + } + + public void recordWrite() { + totalRecords.increment(); + } + + public void updateStats(long records) { + lastStatsRecords.set(records); + lastStatsTime.set(System.currentTimeMillis()); + } + + private void handleMetrics(com.sun.net.httpserver.HttpExchange exchange) throws IOException { + long currentTime = System.currentTimeMillis(); + long total = totalRecords.sum(); + long elapsedSeconds = (currentTime - startTime.get()) / 1000; + long windowRecords = lastStatsRecords.get(); Review Comment: `updateStats()` stores the cumulative record count, but `/metrics` divides that total by only the time since the last update. After the first stats window, `fluss_producer_records_per_second_window` keeps drifting upward even if the actual short-term rate is flat or falling. This needs to use a delta count for the current window. ########## e2e-iot/fluss_flink_realtime/run_kind_demo.sh: ########## @@ -0,0 +1,175 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#!/usr/bin/env bash +set -euo pipefail + +# Automation script to deploy Fluss on Kind and run the producer + Flink aggregator demo. +# This script automates the steps in kind_cluster_demo.md + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# Navigate to project root (3 levels up from this script) +WORKDIR=$(cd "${SCRIPT_DIR}/../../.." && pwd) Review Comment: From `e2e-iot/fluss_flink_realtime`, `../../..` resolves to the parent of `fluss-benchmarks`, so this script looks for `demos/demo/fluss_flink_realtime_demo` outside the repository. In the current layout that path does not exist, so `run_kind_demo.sh` cannot build or find the jar before it starts the cluster; `test-local.sh` and `e2e-iot/high-infra/k8s/jobs/check-table-buckets.sh` still use the same stale path. ########## e2e-iot/fluss_flink_realtime/entrypoint.sh: ########## @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#!/bin/bash Review Comment: Because `Dockerfile` uses exec-form `ENTRYPOINT ["/opt/flink/bin/entrypoint.sh"]`, this file needs `#!` in its first two bytes. With the ASF header before the shebang, running the image without overriding `command` returns `exec format error` instead of launching Java. ########## e2e-iot/fluss_flink_realtime/src/main/java/org/apache/fluss/benchmark/e2eplatformaws/producer/FlussSensorProducerAppMultiInstance.java: ########## @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.benchmark.e2eplatformaws.producer; + +import org.apache.fluss.benchmark.e2eplatformaws.model.SensorDataMinimal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.types.DataTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Random; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +/** + * Multi-instance Fluss producer that supports distributed device generation across multiple producer instances. + * + * Features: + * - Supports 100,000 devices total, distributed across multiple producer instances + * - Each instance handles a specific device ID range based on instance-id and total-producers + * - Multiple threads write in parallel to maximize throughput + * - Each device has its own generator with independent state + * - Device IDs are hashed to 48 buckets automatically by Fluss (via sensor_id hash) + * - Rate is distributed evenly across all devices in the instance's range + */ +public final class FlussSensorProducerAppMultiInstance { + private static final Logger LOG = LoggerFactory.getLogger(FlussSensorProducerAppMultiInstance.class); + + // Total number of devices across all producer instances + private static final int TOTAL_DEVICES = 100_000; + + // Set IPv4-only properties in static initializer to ensure they're set before any class loading + static { + System.setProperty("java.net.preferIPv4Stack", "true"); + System.setProperty("java.net.preferIPv4Addresses", "true"); + System.setProperty("java.net.useSystemProxies", "false"); + } + + public static void main(String[] args) throws Exception { + // Ensure IPv4 properties are set (redundant but safe) + System.setProperty("java.net.preferIPv4Stack", "true"); + System.setProperty("java.net.preferIPv4Addresses", "true"); + + ProducerOptions options = ProducerOptions.parse(args); + LOG.info("Starting multi-instance Fluss producer with options: {}", options); + + // Validate instance configuration + if (options.totalProducers <= 0) { + throw new IllegalArgumentException("total-producers must be > 0"); + } + if (options.instanceId < 0 || options.instanceId >= options.totalProducers) { + throw new IllegalArgumentException( + String.format("instance-id must be >= 0 and < total-producers (%d)", options.totalProducers)); + } + + // Calculate device ID range for this instance + int devicesPerInstance = TOTAL_DEVICES / options.totalProducers; + int startDeviceId = options.instanceId * devicesPerInstance; + int endDeviceId = (options.instanceId == options.totalProducers - 1) + ? TOTAL_DEVICES // Last instance gets any remainder + : (options.instanceId + 1) * devicesPerInstance; + int deviceCount = endDeviceId - startDeviceId; + + LOG.info("Instance {} of {}: Handling devices {} to {} ({} devices)", + options.instanceId, options.totalProducers, startDeviceId, endDeviceId - 1, deviceCount); + + Configuration flussConf = new Configuration(); + flussConf.set(ConfigOptions.BOOTSTRAP_SERVERS, Collections.singletonList(options.bootstrap)); + flussConf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, options.writerBufferSize); + flussConf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, options.writerBatchSize); + // Set batch timeout from environment variable (default: 50ms for optimal throughput) + String batchTimeoutStr = getEnv("CLIENT_WRITER_BATCH_TIMEOUT", "50ms"); + try { + // Parse duration string like "10ms", "50ms", etc. + long millis = Long.parseLong(batchTimeoutStr.replaceAll("[^0-9]", "")); + flussConf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(millis)); + LOG.info("Set CLIENT_WRITER_BATCH_TIMEOUT to {}ms", millis); + } catch (Exception e) { + LOG.warn("Failed to parse CLIENT_WRITER_BATCH_TIMEOUT '{}', using default 50ms", batchTimeoutStr); + flussConf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(50)); + } + + + // Start Prometheus metrics server + ProducerMetrics metrics = new ProducerMetrics(8080); + metrics.start(); + LOG.info("Prometheus metrics server started on port 8080"); + + TablePath tablePath = TablePath.of(options.database, options.table); + + try (Connection connection = ConnectionFactory.createConnection(flussConf)) { + ensureSchema(connection, tablePath, options.bucketCount); + + try (Table table = connection.getTable(tablePath)) { + AtomicBoolean running = new AtomicBoolean(true); + Runtime.getRuntime() + .addShutdownHook(new Thread(() -> shutdown(running), "fluss-producer-shutdown")); + + // Create thread pool for parallel writing + int numThreads = options.numWriterThreads; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch completionLatch = new CountDownLatch(numThreads); + + // Shared counters for statistics + LongAdder totalSent = new LongAdder(); + AtomicLong startNano = new AtomicLong(System.nanoTime()); + AtomicLong lastStatsNano = new AtomicLong(startNano.get()); + + // Rate control: distribute rate across threads + // Each thread should produce: totalRate / numThreads records per second + double ratePerThread = options.recordsPerSecond > 0 + ? (double) options.recordsPerSecond / numThreads + : 0.0; + long nanosPerRecordPerThread = ratePerThread > 0 + ? (long) (TimeUnit.SECONDS.toNanos(1) / ratePerThread) + : 0; + + LOG.info("Target rate: {} rec/s total, {} rec/s per thread, {} writer threads", + options.recordsPerSecond, String.format(Locale.ROOT, "%.2f", ratePerThread), numThreads); + + // Divide devices among threads + int devicesPerThread = (deviceCount + numThreads - 1) / numThreads; // Ceiling division + + // Start writer threads + for (int threadId = 0; threadId < numThreads; threadId++) { + int threadStartDevice = startDeviceId + threadId * devicesPerThread; + int threadEndDevice = Math.min(threadStartDevice + devicesPerThread, endDeviceId); + + if (threadStartDevice >= endDeviceId) { + // No devices for this thread + completionLatch.countDown(); + continue; + } + + final int finalThreadId = threadId; + final int finalThreadStartDevice = threadStartDevice; + final int finalThreadEndDevice = threadEndDevice; + + executor.submit(() -> { + try { + UpsertWriter writer = table.newUpsert().createWriter(); + DeviceRangeGenerator generator = new DeviceRangeGenerator( + finalThreadStartDevice, + finalThreadEndDevice); + + long threadSent = 0; + long threadStartNano = System.nanoTime(); + long stopAtCount = options.totalRecords > 0 ? options.totalRecords : Long.MAX_VALUE; + long stopAtTime = options.runDuration.isZero() + ? Long.MAX_VALUE + : System.nanoTime() + options.runDuration.toNanos(); + + LOG.info("[Thread {}] Starting - will stop at {} total records", finalThreadId, stopAtCount); + + while (running.get() && totalSent.sum() < stopAtCount && System.nanoTime() < stopAtTime) { + SensorDataMinimal record = generator.next(); + writeToFluss(writer, record); + threadSent++; + totalSent.increment(); + long currentTotal = totalSent.sum(); + metrics.recordWrite(); + + if (currentTotal % 10 == 0) { Review Comment: At the default `--rate 200000`, this logs about 20,000 INFO lines per second per producer instance because every tenth record emits a message. With `slf4j-simple` writing synchronously to stdout, that I/O dominates CPU and makes the benchmark throughput numbers unusable. I think we can change this to a time interval logging, or change to a DEBUG log level by default. ########## e2e-iot/fluss_flink_realtime/TEST_LOCAL.md: ########## @@ -0,0 +1,207 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +# Local Testing Guide + +This guide explains how to test the Fluss producer and Flink job locally with the minimal schema. + +## Prerequisites + +1. **Maven** - For building the JAR +2. **Fluss 0.8.0** - Extracted to `demos/demo/deploy_local_kind_fluss/fluss-0.8.0-incubating/` +3. **Flink 1.20.3** (optional) - For running Flink job locally +4. **Java 11+** - For running Java applications + +## Quick Test (Automated) + +Run the automated test script: + +```bash +cd /Users/vijayabhaskarv/IOT/FLUSS +./demos/demo/fluss_flink_realtime_demo/test-local.sh +``` + +This script will: +1. Build the demo JAR +2. Start Fluss local cluster +3. Create table with 48 buckets +4. Start producer (instance 0, 100K devices) +5. Start Flink aggregation job (if Flink is available) + +## Manual Testing (Step-by-Step) + +### Step 1: Build the JAR + +```bash +cd /Users/vijayabhaskarv/IOT/FLUSS Review Comment: There are many places still using the speicifc directory `/Users/vijayabhaskarv/`, could you change that to a more general directory like `~/` ########## e2e-iot/fluss_flink_realtime/k8s/k8s-producer-job.yaml: ########## @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: batch/v1 +kind: Job +metadata: + name: fluss-producer + labels: + app: fluss-producer +spec: + backoffLimit: 0 # Don't retry on failure + completions: 1 # Only 1 completion needed + parallelism: 1 # Only 1 pod at a time + template: + metadata: + labels: + app: fluss-producer + spec: + restartPolicy: Never + initContainers: + - name: wait-for-fluss + image: busybox:1.36 + command: + - sh + - -c + - | + echo "Waiting for Fluss coordinator to be ready..." + COORD_HOST="coordinator-server-hs.default.svc.cluster.local" + # Use ping -4 to force IPv4 and test basic connectivity, then use nc for port check + # First ensure we can resolve to IPv4 + until ping -4 -c 1 -W 1 "$COORD_HOST" >/dev/null 2>&1; do + echo "Waiting for Fluss DNS resolution..." + sleep 2 + done + # Now check if the port is open using nc (with resolved IP, it should use IPv4) + until nc -zv "$COORD_HOST" 9124 2>&1 | grep -q "open"; do + echo "Waiting for Fluss on port 9124..." + sleep 2 + done + echo "Fluss coordinator is ready!" + containers: + - name: producer + image: fluss-demo:latest + imagePullPolicy: Never # Use local image loaded into Kind + securityContext: + runAsUser: 0 # Run as root to allow writing to /etc/hosts + command: + - /app/entrypoint.sh Review Comment: This image copies the wrapper to `/opt/flink/bin/entrypoint.sh` and the jar to `/opt/flink/usrlib/fluss-flink-realtime-demo.jar`, but this Job still executes `/app/entrypoint.sh` with `/app/fluss-flink-realtime-demo.jar`. In the `deploy_k8s_jobs.sh` flow the producer pod may fails immediately with `no such file or directory` before Java starts; `k8s-flink-aggregator-job.yaml` has the same mismatch. ########## e2e-iot/fluss_flink_realtime/TEST_LOCAL.md: ########## @@ -0,0 +1,207 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +# Local Testing Guide + +This guide explains how to test the Fluss producer and Flink job locally with the minimal schema. + +## Prerequisites + +1. **Maven** - For building the JAR +2. **Fluss 0.8.0** - Extracted to `demos/demo/deploy_local_kind_fluss/fluss-0.8.0-incubating/` Review Comment: Fluss have official released 0.9, could you upgrade it to 0.9.0? ########## e2e-iot/fluss_flink_realtime/src/main/java/org/apache/fluss/benchmark/e2eplatformaws/inspect/FlussTableBucketChecker.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.benchmark.e2eplatformaws.inspect; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TablePath; + +import java.util.Collections; + +/** + * Simple CLI utility to check the number of buckets in a Fluss table. + * Uses Fluss Admin API to query table metadata directly. + */ +public final class FlussTableBucketChecker { + + private FlussTableBucketChecker() {} + + public static void main(String[] args) throws Exception { + if (args.length != 3) { + System.err.println("Usage: FlussTableBucketChecker <bootstrap-host:port> <database> <table>"); + System.err.println("Example: FlussTableBucketChecker localhost:9124 iot sensor_readings"); + System.exit(1); + } + + String bootstrap = args[0]; + String database = args[1]; + String table = args[2]; + + Configuration conf = new Configuration(); + conf.set(ConfigOptions.BOOTSTRAP_SERVERS, Collections.singletonList(bootstrap)); + + TablePath tablePath = TablePath.of(database, table); + + try (Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin()) { + + // Check if table exists + boolean tableExists = admin.tableExists(tablePath).get(); + if (!tableExists) { + System.err.println("ERROR: Table '" + tablePath + "' does not exist"); + System.exit(1); + } + + // Get table info and bucket count + var tableInfo = admin.getTableInfo(tablePath).get(); + int bucketCount = tableInfo.getNumBuckets(); + + // Output result + System.out.println("========================================"); + System.out.println("Fluss Table Bucket Count Check"); + System.out.println("========================================"); + System.out.println("Bootstrap: " + bootstrap); + System.out.println("Database: " + database); + System.out.println("Table: " + table); + System.out.println("Buckets: " + bucketCount); + System.out.println("========================================"); + + if (bucketCount == 48) { Review Comment: This helper exits non-zero for anything other than 48 buckets, but the high-infra scripts and docs in this branch create `sensor_readings` with 128 buckets by default. Running `check-table-buckets.sh` against the documented deployment will therefore flag a healthy table as broken. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
