morrySnow commented on code in PR #60122: URL: https://github.com/apache/doris/pull/60122#discussion_r2726291878
########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageUtils.java: ########## @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.InlineTable; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Utility methods for lineage event construction and filtering. + */ +public final class LineageUtils { + + public static final Logger LOG = LogManager.getLogger(LineageUtils.class); + private static final String EMPTY_STRING = ""; + private static final String CATALOG_TYPE_KEY = "type"; + private static final int NO_PLUGINS = 0; + private static final long UNKNOWN_START_TIME_MS = 0L; + private static final long UNKNOWN_DURATION_MS = 0L; + + private LineageUtils() { + } + + /** + * Check whether the parsed statement matches the current command type. + * + * @param executor statement executor containing parsed statement + * @param currentCommand current command class + * @return true if parsed command matches current command + */ + public static boolean isSameParsedCommand(StmtExecutor executor, Class<? extends Command> currentCommand) { + if (executor == null || currentCommand == null) { + return false; + } + StatementBase parsedStmt = executor.getParsedStmt(); + if (!(parsedStmt instanceof LogicalPlanAdapter)) { + return false; + } + Plan parsedPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + if (!(parsedPlan instanceof Command)) { + return false; + } + return parsedPlan.getClass().equals(currentCommand); + } + + /** + * Build a lineage event and compute lineage info if lineage plugins are enabled. + * + * @param plan the plan to extract lineage from + * @param sourceCommand the command type for the event + * @param ctx connect context holding query metadata + * @param executor statement executor for query text + */ + public static LineageEvent buildLineageEvent(Plan plan, Class<? extends Command> sourceCommand, + ConnectContext ctx, StmtExecutor executor) { + if (plan == null || ctx == null) { + return null; + } + LineageInfo lineageInfo = LineageInfoExtractor.extractLineageInfo(plan); + LineageContext context = buildLineageContext(sourceCommand, ctx, executor); + String catalog = safeString(ctx.getDefaultCatalog()); + context.setCatalog(catalog); + context.setExternalCatalogProperties(collectExternalCatalogProperties(lineageInfo)); + lineageInfo.setContext(context); + return new LineageEvent(lineageInfo); + } + + /** + * Submit lineage event if lineage plugins are enabled and command matches parsed statement. + * + * @param executor statement executor containing parsed statement + * @param lineagePlan optional lineage plan to use instead of current plan + * @param currentPlan current logical plan + * @param currentHandleClass current command class + */ + public static void submitLineageEventIfNeeded(StmtExecutor executor, Optional<Plan> lineagePlan, + LogicalPlan currentPlan, + Class<? extends Command> currentHandleClass) { + if (!LineageUtils.isSameParsedCommand(executor, currentHandleClass)) { + return; + } + if (!isLineagePluginConfigured()) { + return; + } + Plan plan = lineagePlan.orElse(currentPlan); + if (shouldSkipLineage(plan)) { + return; + } + try { + LineageEvent lineageEvent = LineageUtils.buildLineageEvent(plan, currentHandleClass, + executor.getContext(), executor); + if (lineageEvent != null) { + Env.getCurrentEnv().getLineageEventProcessor().submitLineageEvent(lineageEvent); + } + } catch (Exception e) { + // Log and ignore exceptions during lineage processing to avoid impacting query execution + LOG.error("Failed to submit lineage event", e); + } + } + + public static boolean shouldSkipLineage(Plan plan) { + return isValuesOnly(plan) || isInternalSchemaTarget(plan); + } + + private static boolean isValuesOnly(Plan plan) { + if (plan.containsType(LogicalCatalogRelation.class)) { + return false; + } + return plan.containsType(InlineTable.class, LogicalUnion.class, LogicalOneRowRelation.class); + } + + private static boolean isInternalSchemaTarget(Plan plan) { + Optional<LogicalTableSink> sink = plan.collectFirst(node -> node instanceof LogicalTableSink); + if (!sink.isPresent()) { + return false; + } + TableIf targetTable = sink.get().getTargetTable(); + if (targetTable == null || targetTable.getDatabase() == null + || targetTable.getDatabase().getCatalog() == null) { + return false; + } + String catalogName = targetTable.getDatabase().getCatalog().getName(); + String dbName = targetTable.getDatabase().getFullName(); + return InternalCatalog.INTERNAL_CATALOG_NAME.equalsIgnoreCase(catalogName) + && FeConstants.INTERNAL_DB_NAME.equalsIgnoreCase(dbName); + } + + private static Map<String, Map<String, String>> collectExternalCatalogProperties(LineageInfo lineageInfo) { Review Comment: why need catalog properties? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageUtils.java: ########## @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.InlineTable; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Utility methods for lineage event construction and filtering. + */ +public final class LineageUtils { + + public static final Logger LOG = LogManager.getLogger(LineageUtils.class); + private static final String EMPTY_STRING = ""; + private static final String CATALOG_TYPE_KEY = "type"; + private static final int NO_PLUGINS = 0; + private static final long UNKNOWN_START_TIME_MS = 0L; + private static final long UNKNOWN_DURATION_MS = 0L; + + private LineageUtils() { + } + + /** + * Check whether the parsed statement matches the current command type. + * + * @param executor statement executor containing parsed statement + * @param currentCommand current command class + * @return true if parsed command matches current command + */ + public static boolean isSameParsedCommand(StmtExecutor executor, Class<? extends Command> currentCommand) { + if (executor == null || currentCommand == null) { + return false; + } + StatementBase parsedStmt = executor.getParsedStmt(); + if (!(parsedStmt instanceof LogicalPlanAdapter)) { + return false; + } + Plan parsedPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + if (!(parsedPlan instanceof Command)) { + return false; + } + return parsedPlan.getClass().equals(currentCommand); + } + + /** + * Build a lineage event and compute lineage info if lineage plugins are enabled. + * + * @param plan the plan to extract lineage from + * @param sourceCommand the command type for the event + * @param ctx connect context holding query metadata + * @param executor statement executor for query text + */ + public static LineageEvent buildLineageEvent(Plan plan, Class<? extends Command> sourceCommand, + ConnectContext ctx, StmtExecutor executor) { + if (plan == null || ctx == null) { + return null; + } + LineageInfo lineageInfo = LineageInfoExtractor.extractLineageInfo(plan); + LineageContext context = buildLineageContext(sourceCommand, ctx, executor); + String catalog = safeString(ctx.getDefaultCatalog()); + context.setCatalog(catalog); + context.setExternalCatalogProperties(collectExternalCatalogProperties(lineageInfo)); + lineageInfo.setContext(context); + return new LineageEvent(lineageInfo); + } + + /** + * Submit lineage event if lineage plugins are enabled and command matches parsed statement. + * + * @param executor statement executor containing parsed statement + * @param lineagePlan optional lineage plan to use instead of current plan + * @param currentPlan current logical plan + * @param currentHandleClass current command class + */ + public static void submitLineageEventIfNeeded(StmtExecutor executor, Optional<Plan> lineagePlan, + LogicalPlan currentPlan, + Class<? extends Command> currentHandleClass) { + if (!LineageUtils.isSameParsedCommand(executor, currentHandleClass)) { + return; + } + if (!isLineagePluginConfigured()) { Review Comment: we should check this one at first ########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageEventProcessor.java: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +import org.apache.doris.common.Config; +import org.apache.doris.plugin.Plugin; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.lineage.AbstractLineagePlugin; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Processor that queues lineage events and dispatches them to lineage plugins. + */ +public class LineageEventProcessor { + + private static final Logger LOG = LogManager.getLogger(LineageEventProcessor.class); + private static final long UPDATE_PLUGIN_INTERVAL_MS = 60 * 1000; // 1min + private final PluginMgr pluginMgr; + private List<Plugin> lineagePlugins; + private long lastUpdateTime = 0; + private final BlockingQueue<LineageEvent> eventQueue = + new LinkedBlockingDeque<>(Config.lineage_event_queue_size); + private final AtomicBoolean isInit = new AtomicBoolean(false); + private Thread workerThread; + + /** + * Create a lineage event processor. + * + * @param pluginMgr plugin manager to load lineage plugins + */ + public LineageEventProcessor(PluginMgr pluginMgr) { + this.pluginMgr = pluginMgr; + } + + /** + * Start the background worker thread. + */ + public void start() { + if (!isInit.compareAndSet(false, true)) { + return; + } + workerThread = new Thread(new Worker(), "LineageEventProcessor"); + workerThread.setDaemon(true); + workerThread.start(); + } + + /** + * Submit a lineage event to the processing queue. + * + * @param lineageEvent lineage event to submit + * @return true if accepted, false otherwise + */ + public boolean submitLineageEvent(LineageEvent lineageEvent) { + if (lineageEvent == null) { + return false; + } + try { + if (!eventQueue.offer(lineageEvent)) { + String queryId = getQueryId(lineageEvent); + LOG.warn("the lineage event queue is full with size {}, discard the lineage event: {}", + eventQueue.size(), queryId); + return false; + } + return true; + } catch (Exception e) { + String queryId = getQueryId(lineageEvent); + LOG.warn("encounter exception when handle lineage event {}, discard the event", + queryId, e); + return false; + } + } + + /** + * Worker that polls events and invokes lineage plugins. + */ + public class Worker implements Runnable { + /** + * Run the lineage processing loop. + */ + @Override + public void run() { + LineageEvent lineageEvent; + while (true) { + // update lineage plugin list every UPDATE_PLUGIN_INTERVAL_MS. + if (lineagePlugins == null || System.currentTimeMillis() - lastUpdateTime > UPDATE_PLUGIN_INTERVAL_MS) { + lineagePlugins = pluginMgr.getActivePluginList(PluginType.LINEAGE); + lastUpdateTime = System.currentTimeMillis(); + if (lineagePlugins == null) { + lineagePlugins = Collections.emptyList(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("update lineage plugins. num: {}", lineagePlugins.size()); + } + } + + try { + lineageEvent = eventQueue.poll(5, TimeUnit.SECONDS); + if (lineageEvent == null) { + continue; + } + } catch (InterruptedException e) { + LOG.warn("encounter exception when getting lineage event from queue, ignore", e); + continue; + } + for (Plugin plugin : lineagePlugins) { + try { + AbstractLineagePlugin lineagePlugin = (AbstractLineagePlugin) plugin; + if (!lineagePlugin.eventFilter()) { Review Comment: eventFilter do not need Event as parameter? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/FastInsertIntoValuesPlanner.java: ########## @@ -64,10 +64,12 @@ protected void analyze(boolean showPlanProcess) { return; } CascadesContext cascadesContext = getCascadesContext(); + getStatementContext().getPlannerHooks().forEach(hook -> hook.beforeAnalyze(this)); Review Comment: if insert values does not generate lineage event, why add hoot here? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageEvent.java: ########## @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +/** + * Lineage event wrapping lineage info for plugins. Review Comment: why need this wrapper? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageEventProcessor.java: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +import org.apache.doris.common.Config; +import org.apache.doris.plugin.Plugin; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.lineage.AbstractLineagePlugin; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Processor that queues lineage events and dispatches them to lineage plugins. + */ +public class LineageEventProcessor { + + private static final Logger LOG = LogManager.getLogger(LineageEventProcessor.class); + private static final long UPDATE_PLUGIN_INTERVAL_MS = 60 * 1000; // 1min + private final PluginMgr pluginMgr; + private List<Plugin> lineagePlugins; + private long lastUpdateTime = 0; + private final BlockingQueue<LineageEvent> eventQueue = + new LinkedBlockingDeque<>(Config.lineage_event_queue_size); + private final AtomicBoolean isInit = new AtomicBoolean(false); + private Thread workerThread; + + /** + * Create a lineage event processor. + * + * @param pluginMgr plugin manager to load lineage plugins + */ + public LineageEventProcessor(PluginMgr pluginMgr) { + this.pluginMgr = pluginMgr; + } + + /** + * Start the background worker thread. + */ + public void start() { + if (!isInit.compareAndSet(false, true)) { + return; + } + workerThread = new Thread(new Worker(), "LineageEventProcessor"); + workerThread.setDaemon(true); + workerThread.start(); + } + + /** + * Submit a lineage event to the processing queue. + * + * @param lineageEvent lineage event to submit + * @return true if accepted, false otherwise + */ + public boolean submitLineageEvent(LineageEvent lineageEvent) { + if (lineageEvent == null) { + return false; + } + try { + if (!eventQueue.offer(lineageEvent)) { + String queryId = getQueryId(lineageEvent); + LOG.warn("the lineage event queue is full with size {}, discard the lineage event: {}", + eventQueue.size(), queryId); + return false; + } + return true; + } catch (Exception e) { + String queryId = getQueryId(lineageEvent); + LOG.warn("encounter exception when handle lineage event {}, discard the event", + queryId, e); + return false; + } + } + + /** + * Worker that polls events and invokes lineage plugins. + */ + public class Worker implements Runnable { + /** + * Run the lineage processing loop. + */ + @Override + public void run() { + LineageEvent lineageEvent; + while (true) { + // update lineage plugin list every UPDATE_PLUGIN_INTERVAL_MS. + if (lineagePlugins == null || System.currentTimeMillis() - lastUpdateTime > UPDATE_PLUGIN_INTERVAL_MS) { + lineagePlugins = pluginMgr.getActivePluginList(PluginType.LINEAGE); + lastUpdateTime = System.currentTimeMillis(); + if (lineagePlugins == null) { + lineagePlugins = Collections.emptyList(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("update lineage plugins. num: {}", lineagePlugins.size()); Review Comment: should print names here ########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageEventProcessor.java: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +import org.apache.doris.common.Config; +import org.apache.doris.plugin.Plugin; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.lineage.AbstractLineagePlugin; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Processor that queues lineage events and dispatches them to lineage plugins. + */ +public class LineageEventProcessor { + + private static final Logger LOG = LogManager.getLogger(LineageEventProcessor.class); + private static final long UPDATE_PLUGIN_INTERVAL_MS = 60 * 1000; // 1min + private final PluginMgr pluginMgr; + private List<Plugin> lineagePlugins; + private long lastUpdateTime = 0; + private final BlockingQueue<LineageEvent> eventQueue = + new LinkedBlockingDeque<>(Config.lineage_event_queue_size); + private final AtomicBoolean isInit = new AtomicBoolean(false); + private Thread workerThread; + + /** + * Create a lineage event processor. + * + * @param pluginMgr plugin manager to load lineage plugins + */ + public LineageEventProcessor(PluginMgr pluginMgr) { + this.pluginMgr = pluginMgr; + } + + /** + * Start the background worker thread. + */ + public void start() { + if (!isInit.compareAndSet(false, true)) { + return; + } + workerThread = new Thread(new Worker(), "LineageEventProcessor"); + workerThread.setDaemon(true); + workerThread.start(); + } + + /** + * Submit a lineage event to the processing queue. + * + * @param lineageEvent lineage event to submit + * @return true if accepted, false otherwise + */ + public boolean submitLineageEvent(LineageEvent lineageEvent) { + if (lineageEvent == null) { + return false; + } + try { + if (!eventQueue.offer(lineageEvent)) { + String queryId = getQueryId(lineageEvent); + LOG.warn("the lineage event queue is full with size {}, discard the lineage event: {}", + eventQueue.size(), queryId); + return false; + } + return true; + } catch (Exception e) { + String queryId = getQueryId(lineageEvent); + LOG.warn("encounter exception when handle lineage event {}, discard the event", + queryId, e); + return false; + } + } + + /** + * Worker that polls events and invokes lineage plugins. + */ + public class Worker implements Runnable { + /** + * Run the lineage processing loop. + */ + @Override + public void run() { + LineageEvent lineageEvent; + while (true) { + // update lineage plugin list every UPDATE_PLUGIN_INTERVAL_MS. + if (lineagePlugins == null || System.currentTimeMillis() - lastUpdateTime > UPDATE_PLUGIN_INTERVAL_MS) { + lineagePlugins = pluginMgr.getActivePluginList(PluginType.LINEAGE); + lastUpdateTime = System.currentTimeMillis(); + if (lineagePlugins == null) { + lineagePlugins = Collections.emptyList(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("update lineage plugins. num: {}", lineagePlugins.size()); + } + } + + try { + lineageEvent = eventQueue.poll(5, TimeUnit.SECONDS); + if (lineageEvent == null) { + continue; + } + } catch (InterruptedException e) { + LOG.warn("encounter exception when getting lineage event from queue, ignore", e); + continue; + } + for (Plugin plugin : lineagePlugins) { + try { + AbstractLineagePlugin lineagePlugin = (AbstractLineagePlugin) plugin; + if (!lineagePlugin.eventFilter()) { + continue; + } + LineageInfo lineageInfo = lineageEvent.getLineageInfo(); + if (lineageInfo == null) { + LOG.warn("lineage info is null for event {}, skip", getQueryId(lineageEvent)); + continue; + } Review Comment: move it as the first step -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
