This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.12
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.12 by this push:
new d6cf16ea7d [ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved
readability and maintainability
d6cf16ea7d is described below
commit d6cf16ea7db3ed0d24c81e6c1fb36407d61145e1
Author: eunhwa99 <[email protected]>
AuthorDate: Mon Aug 4 17:01:15 2025 +0900
[ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved readability and
maintainability
### What is this PR for?
This PR refactors the `InfluxDBInterpreter.class` to improve code
readability, maintainability, and adherence to modern Java practices, without
altering its runtime behavior or core logic.
- Key changes include:
- Renamed getInfluxDBClient() to better reflect its purpose (e.g.,
getQueryApi()), improving semantic clarity.
- Removed unnecessary code
- (e.g., InterpreterContext) from methods where they are unused.
- Throwing exceptions from methods like open(), close(), cancel(),
getFormType(), and getProgress() where exceptions are not thrown.
- Extracted long nested logic blocks in `internalInterpret()` into
smaller, well-named private methods.
- Replaced imperative loops with Stream operations for collection
processing.
These changes aim to make the codebase more modular, clean by reducing
boilerplate code, and approachable for future contributors and reviewers.
### What type of PR is it?
Refactoring
### Todos
* [ ] - Task
### What is the Jira issue?
* [ZEPPELIN-6264](https://issues.apache.org/jira/browse/ZEPPELIN-6264)
### How should this be tested?
* No functional changes; existing tests should pass as-is.
### Screenshots (if appropriate)
### Questions:
* Does the license files need to update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Closes #5005 from eunhwa99/ZEPPELIN-6264.
Signed-off-by: Philipp Dallig <[email protected]>
(cherry picked from commit ab52456f84b6fe9244a8be3e8b57aeb587b22e5a)
Signed-off-by: Philipp Dallig <[email protected]>
---
.../zeppelin/influxdb/InfluxDBInterpreter.java | 119 ++++++++++++---------
1 file changed, 66 insertions(+), 53 deletions(-)
diff --git
a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
index bb8e62bd9c..3f718fabda 100644
---
a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
+++
b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
@@ -14,8 +14,8 @@
*/
package org.apache.zeppelin.influxdb;
+import com.influxdb.query.FluxRecord;
import java.util.Properties;
-import java.util.StringJoiner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -24,6 +24,7 @@ import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.InfluxDBClientOptions;
import com.influxdb.client.QueryApi;
+import java.util.stream.Collectors;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.slf4j.Logger;
@@ -83,71 +84,83 @@ public class InfluxDBInterpreter extends
AbstractInterpreter {
LOGGER.debug("Run Flux command '{}'", query);
query = query.trim();
- QueryApi queryService = getInfluxDBClient(context);
+ QueryApi queryService = getQueryApi();
- final int[] actualIndex = {-1};
+ final int[] currentTableIndex = {-1};
AtomicReference<InterpreterResult> resultRef = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
- StringBuilder result = new StringBuilder();
+ StringBuilder resultBuilder = new StringBuilder();
queryService.query(
query,
//process record
- (cancellable, fluxRecord) -> {
-
- Integer tableIndex = fluxRecord.getTable();
- if (actualIndex[0] != tableIndex) {
- result.append(NEWLINE);
- result.append(TABLE_MAGIC_TAG);
- actualIndex[0] = tableIndex;
-
- //add column names to table header
- StringJoiner joiner = new StringJoiner(TAB);
- fluxRecord.getValues().keySet().forEach(c ->
joiner.add(replaceReservedChars(c)));
- result.append(joiner.toString());
- result.append(NEWLINE);
- }
-
- StringJoiner rowsJoiner = new StringJoiner(TAB);
- for (Object value : fluxRecord.getValues().values()) {
- if (value == null) {
- value = EMPTY_COLUMN_VALUE;
- }
- rowsJoiner.add(replaceReservedChars(value.toString()));
- }
- result.append(rowsJoiner.toString());
- result.append(NEWLINE);
- },
-
- throwable -> {
-
- LOGGER.error(throwable.getMessage(), throwable);
- resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
- throwable.getMessage()));
-
- countDownLatch.countDown();
-
- }, () -> {
- //on complete
- InterpreterResult intpResult = new
InterpreterResult(InterpreterResult.Code.SUCCESS);
- intpResult.add(result.toString());
- resultRef.set(intpResult);
- countDownLatch.countDown();
- }
+ (cancellable, fluxRecord) -> handleRecord(fluxRecord,
currentTableIndex, resultBuilder),
+ throwable -> handleError(throwable, resultRef, countDownLatch),
+ () -> handleComplete(resultBuilder, resultRef, countDownLatch)
);
+
+ awaitLatch(countDownLatch);
+
+ return resultRef.get();
+ }
+
+ private void handleRecord(FluxRecord fluxRecord, int[] currentTableIndex,
+ StringBuilder resultBuilder) {
+ Integer tableIndex = fluxRecord.getTable();
+ if (currentTableIndex[0] != tableIndex) {
+ appendTableHeader(fluxRecord, resultBuilder);
+ currentTableIndex[0] = tableIndex;
+ }
+
+ appendTableRow(fluxRecord, resultBuilder);
+ }
+
+ private void appendTableHeader(FluxRecord fluxRecord, StringBuilder
resultBuilder) {
+ resultBuilder.append(NEWLINE).append(TABLE_MAGIC_TAG);
+ String headerLine = fluxRecord.getValues().keySet().stream()
+ .map(this::replaceReservedChars)
+ .collect(Collectors.joining(TAB));
+ resultBuilder.append(headerLine).append(NEWLINE);
+ }
+
+ private void appendTableRow(FluxRecord fluxRecord, StringBuilder
resultBuilder) {
+ String rowLine = fluxRecord.getValues().values().stream()
+ .map(v -> v == null ? EMPTY_COLUMN_VALUE : v.toString())
+ .map(this::replaceReservedChars)
+ .collect(Collectors.joining(TAB));
+ resultBuilder.append(rowLine).append(NEWLINE);
+ }
+
+ private static void handleError(Throwable throwable,
AtomicReference<InterpreterResult> resultRef,
+ CountDownLatch countDownLatch) {
+ LOGGER.error(throwable.getMessage(), throwable);
+ resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
+ throwable.getMessage()));
+
+ countDownLatch.countDown();
+ }
+
+ private static void handleComplete(StringBuilder resultBuilder,
+ AtomicReference<InterpreterResult> resultRef,
+ CountDownLatch countDownLatch) {
+ InterpreterResult intpResult = new
InterpreterResult(InterpreterResult.Code.SUCCESS);
+ intpResult.add(resultBuilder.toString());
+ resultRef.set(intpResult);
+ countDownLatch.countDown();
+ }
+
+ private static void awaitLatch(CountDownLatch countDownLatch) throws
InterpreterException {
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new InterpreterException(e);
}
-
- return resultRef.get();
}
- private QueryApi getInfluxDBClient(InterpreterContext context) {
+ private QueryApi getQueryApi() {
if (queryApi == null) {
queryApi = this.client.getQueryApi();
}
@@ -156,7 +169,7 @@ public class InfluxDBInterpreter extends
AbstractInterpreter {
@Override
- public void open() throws InterpreterException {
+ public void open() {
if (this.client == null) {
InfluxDBClientOptions opt = InfluxDBClientOptions.builder()
@@ -172,7 +185,7 @@ public class InfluxDBInterpreter extends
AbstractInterpreter {
}
@Override
- public void close() throws InterpreterException {
+ public void close() {
if (this.client != null) {
this.client.close();
this.client = null;
@@ -180,17 +193,17 @@ public class InfluxDBInterpreter extends
AbstractInterpreter {
}
@Override
- public void cancel(InterpreterContext context) throws InterpreterException {
+ public void cancel(InterpreterContext context) {
}
@Override
- public FormType getFormType() throws InterpreterException {
+ public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
- public int getProgress(InterpreterContext context) throws
InterpreterException {
+ public int getProgress(InterpreterContext context) {
return 0;
}