This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fc27d7a5bd0 [Fix](trino-connector) refactor code of 
TrinoConnectorExternalTable (#34496)
fc27d7a5bd0 is described below

commit fc27d7a5bd00566ef0427c4f40d689efc20212c1
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Fri May 24 11:45:42 2024 +0800

    [Fix](trino-connector) refactor code of TrinoConnectorExternalTable (#34496)
    
    1. Fix the issue with the trino-connector accessing DeltaLake:
    Cache `ConnectorMetadata` and `ConnectorTransactionHandle`.
    
    2. refactor some code
---
 .../datasource/paimon/source/PaimonScanNode.java   | 16 ++-----
 .../datasource/paimon/source/PaimonSource.java     |  9 ++--
 .../TrinoConnectorExternalTable.java               | 30 ++++++++----
 .../trinoconnector/TrinoSchemaCacheValue.java      | 54 +++++++++++++++++++---
 .../source/TrinoConnectorScanNode.java             | 38 ++++++---------
 .../source/TrinoConnectorSource.java               | 20 ++++----
 6 files changed, 100 insertions(+), 67 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 003ced7ead7..3ab5b3ec657 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -20,12 +20,10 @@ package org.apache.doris.datasource.paimon.source;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
@@ -115,17 +113,9 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     protected void doInitialize() throws UserException {
-        ExternalTable table = (ExternalTable) desc.getTable();
-        if (table.isView()) {
-            throw new AnalysisException(
-                    String.format("Querying external view '%s.%s' is not 
supported", table.getDbName(),
-                            table.getName()));
-        }
-        computeColumnsFilter();
-        initBackendPolicy();
-        source = new PaimonSource((PaimonExternalTable) table, desc, 
columnNameToRange);
+        super.doInitialize();
+        source = new PaimonSource(desc);
         Preconditions.checkNotNull(source);
-        initSchemaParams();
         PaimonPredicateConverter paimonPredicateConverter = new 
PaimonPredicateConverter(
                 source.getPaimonTable().rowType());
         predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
@@ -330,7 +320,7 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public TableIf getTargetTable() {
-        return source.getTargetTable();
+        return desc.getTable();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 9ac44537e8a..da948d2b063 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -23,12 +23,10 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.property.constants.PaimonProperties;
-import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileAttributes;
 
 import org.apache.paimon.table.Table;
 
-import java.util.Map;
 
 public class PaimonSource {
     private final PaimonExternalTable paimonExtTable;
@@ -36,11 +34,10 @@ public class PaimonSource {
 
     private final TupleDescriptor desc;
 
-    public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
-                            Map<String, ColumnRange> columnNameToRange) {
-        this.paimonExtTable = table;
-        this.originTable = paimonExtTable.getPaimonTable();
+    public PaimonSource(TupleDescriptor desc) {
         this.desc = desc;
+        this.paimonExtTable = (PaimonExternalTable) desc.getTable();
+        this.originTable = paimonExtTable.getPaimonTable();
     }
 
     public TupleDescriptor getDesc() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
index 3cba264861e..27f9b8086a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java
@@ -143,7 +143,8 @@ public class TrinoConnectorExternalTable extends 
ExternalTable {
         }
         Map<String, ColumnMetadata> columnMetadataMap = 
columnMetadataMapBuilder.buildOrThrow();
         return Optional.of(
-                new TrinoSchemaCacheValue(columns, connectorTableHandle, 
columnHandleMap, columnMetadataMap));
+                new TrinoSchemaCacheValue(columns, connectorMetadata, 
connectorTableHandle, connectorTransactionHandle,
+                        columnHandleMap, columnMetadataMap));
     }
 
     @Override
@@ -160,11 +161,7 @@ public class TrinoConnectorExternalTable extends 
ExternalTable {
         return tTableDescriptor;
     }
 
-    protected Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) {
-        return trinoConnectorPrimitiveTypeToDorisType(type);
-    }
-
-    private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type 
type) {
+    private Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) {
         if (type instanceof BooleanType) {
             return Type.BOOLEAN;
         } else if (type instanceof TinyintType) {
@@ -201,19 +198,19 @@ public class TrinoConnectorExternalTable extends 
ExternalTable {
             TimestampWithTimeZoneType timestampWithTimeZoneType = 
(TimestampWithTimeZoneType) type;
             return 
ScalarType.createDatetimeV2Type(timestampWithTimeZoneType.getPrecision());
         } else if (type instanceof io.trino.spi.type.ArrayType) {
-            Type elementType = trinoConnectorPrimitiveTypeToDorisType(
+            Type elementType = trinoConnectorTypeToDorisType(
                     ((io.trino.spi.type.ArrayType) type).getElementType());
             return ArrayType.create(elementType, true);
         } else if (type instanceof io.trino.spi.type.MapType) {
-            Type keyType = trinoConnectorPrimitiveTypeToDorisType(
+            Type keyType = trinoConnectorTypeToDorisType(
                     ((io.trino.spi.type.MapType) type).getKeyType());
-            Type valueType = trinoConnectorPrimitiveTypeToDorisType(
+            Type valueType = trinoConnectorTypeToDorisType(
                     ((io.trino.spi.type.MapType) type).getValueType());
             return new MapType(keyType, valueType, true, true);
         } else if (type instanceof RowType) {
             ArrayList<StructField> dorisFields = Lists.newArrayList();
             for (Field field : ((RowType) type).getFields()) {
-                Type childType = 
trinoConnectorPrimitiveTypeToDorisType(field.getType());
+                Type childType = 
trinoConnectorTypeToDorisType(field.getType());
                 if (field.getName().isPresent()) {
                     dorisFields.add(new StructField(field.getName().get(), 
childType));
                 } else {
@@ -233,6 +230,19 @@ public class TrinoConnectorExternalTable extends 
ExternalTable {
                 .orElse(null);
     }
 
+    public ConnectorMetadata getConnectorMetadata() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) 
value).getConnectorMetadata()).orElse(null);
+    }
+
+    public ConnectorTransactionHandle getConnectorTransactionHandle() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) 
value).getConnectorTransactionHandle())
+                .orElse(null);
+    }
+
     public Map<String, ColumnHandle> getColumnHandleMap() {
         makeSureInitialized();
         Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
index dc629190cbd..43bbe76c3b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java
@@ -22,27 +22,69 @@ import org.apache.doris.datasource.SchemaCacheValue;
 
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
 import io.trino.spi.connector.ConnectorTableHandle;
-import lombok.Getter;
-import lombok.Setter;
+import io.trino.spi.connector.ConnectorTransactionHandle;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-@Getter
-@Setter
 public class TrinoSchemaCacheValue extends SchemaCacheValue {
-
+    private ConnectorMetadata connectorMetadata;
     private Optional<ConnectorTableHandle> connectorTableHandle;
+    private ConnectorTransactionHandle connectorTransactionHandle;
     private Map<String, ColumnHandle> columnHandleMap;
     private Map<String, ColumnMetadata> columnMetadataMap;
 
-    public TrinoSchemaCacheValue(List<Column> schema, 
Optional<ConnectorTableHandle> connectorTableHandle,
+    public TrinoSchemaCacheValue(List<Column> schema, ConnectorMetadata 
connectorMetadata,
+            Optional<ConnectorTableHandle> connectorTableHandle, 
ConnectorTransactionHandle connectorTransactionHandle,
             Map<String, ColumnHandle> columnHandleMap, Map<String, 
ColumnMetadata> columnMetadataMap) {
         super(schema);
+        this.connectorMetadata = connectorMetadata;
+        this.connectorTableHandle = connectorTableHandle;
+        this.connectorTransactionHandle = connectorTransactionHandle;
+        this.columnHandleMap = columnHandleMap;
+        this.columnMetadataMap = columnMetadataMap;
+    }
+
+    public ConnectorMetadata getConnectorMetadata() {
+        return connectorMetadata;
+    }
+
+    public Optional<ConnectorTableHandle> getConnectorTableHandle() {
+        return connectorTableHandle;
+    }
+
+    public ConnectorTransactionHandle getConnectorTransactionHandle() {
+        return connectorTransactionHandle;
+    }
+
+    public Map<String, ColumnHandle> getColumnHandleMap() {
+        return columnHandleMap;
+    }
+
+    public Map<String, ColumnMetadata> getColumnMetadataMap() {
+        return columnMetadataMap;
+    }
+
+    public void setConnectorMetadata(ConnectorMetadata connectorMetadata) {
+        this.connectorMetadata = connectorMetadata;
+    }
+
+    public void setConnectorTableHandle(Optional<ConnectorTableHandle> 
connectorTableHandle) {
         this.connectorTableHandle = connectorTableHandle;
+    }
+
+    public void setConnectorTransactionHandle(ConnectorTransactionHandle 
connectorTransactionHandle) {
+        this.connectorTransactionHandle = connectorTransactionHandle;
+    }
+
+    public void setColumnHandleMap(Map<String, ColumnHandle> columnHandleMap) {
         this.columnHandleMap = columnHandleMap;
+    }
+
+    public void setColumnMetadataMap(Map<String, ColumnMetadata> 
columnMetadataMap) {
         this.columnMetadataMap = columnMetadataMap;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 9c61d54614a..2b09e30026c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
-import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
 import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
@@ -66,13 +65,11 @@ import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplitManager;
 import io.trino.spi.connector.ConnectorSplitSource;
 import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.connector.ConnectorTransactionHandle;
 import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.ConstraintApplicationResult;
 import io.trino.spi.connector.DynamicFilter;
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.predicate.TupleDomain;
-import io.trino.spi.transaction.IsolationLevel;
 import io.trino.spi.type.TypeManager;
 import io.trino.split.BufferingSplitSource;
 import io.trino.split.ConnectorAwareSplitSource;
@@ -106,17 +103,8 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
 
     @Override
     protected void doInitialize() throws UserException {
-        TrinoConnectorExternalTable table = (TrinoConnectorExternalTable) 
desc.getTable();
-        if (table.isView()) {
-            throw new AnalysisException(
-                    String.format("Querying external view '%s.%s' is not 
supported", table.getDbName(),
-                            table.getName()));
-        }
-
-        computeColumnsFilter();
-        initBackendPolicy();
-        source = new TrinoConnectorSource(desc, table);
-        initSchemaParams();
+        super.doInitialize();
+        source = new TrinoConnectorSource(desc);
         convertPredicate();
     }
 
@@ -144,12 +132,9 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
     public List<Split> getSplits() throws UserException {
         // 1. Get necessary objects
         Connector connector = source.getConnector();
-        ConnectorTransactionHandle connectorTransactionHandle = 
connector.beginTransaction(
-                IsolationLevel.READ_UNCOMMITTED, true, true);
-        source.setConnectorTransactionHandle(connectorTransactionHandle);
+        connectorMetadata = source.getConnectorMetadata();
         ConnectorSession connectorSession = 
source.getTrinoSession().toConnectorSession(source.getCatalogHandle());
 
-        connectorMetadata = connector.getMetadata(connectorSession, 
connectorTransactionHandle);
         // 2. Begin query
         connectorMetadata.beginQuery(connectorSession);
         applyPushDown(connectorSession);
@@ -157,7 +142,7 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
         // 3. get splitSource
         List<Split> splits = Lists.newArrayList();
         try (SplitSource splitSource = getTrinoSplitSource(connector, 
source.getTrinoSession(),
-                connectorTransactionHandle, 
source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) {
+                source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) {
             // 4. get trino.Splits and convert it to doris.Splits
             while (!splitSource.isFinished()) {
                 for (io.trino.metadata.Split split : 
getNextSplitBatch(splitSource)) {
@@ -165,6 +150,10 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
                 }
             }
         }
+
+        // 4. Clear query
+        // It is necessary for hive connector
+        connectorMetadata.cleanupQuery(connectorSession);
         return splits;
     }
 
@@ -216,8 +205,7 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
         // }
     }
 
-    private SplitSource getTrinoSplitSource(Connector connector, Session 
session,
-            ConnectorTransactionHandle connectorTransactionHandle, 
ConnectorTableHandle table,
+    private SplitSource getTrinoSplitSource(Connector connector, Session 
session, ConnectorTableHandle table,
             DynamicFilter dynamicFilter) {
         ConnectorSplitManager splitManager = connector.getSplitManager();
 
@@ -227,8 +215,8 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
 
         ConnectorSession connectorSession = 
session.toConnectorSession(source.getCatalogHandle());
         // Constraint is not used by Hive/BigQuery Connector
-        ConnectorSplitSource connectorSplitSource = 
splitManager.getSplits(connectorTransactionHandle, connectorSession,
-                table, dynamicFilter, constraint);
+        ConnectorSplitSource connectorSplitSource = 
splitManager.getSplits(source.getConnectorTransactionHandle(),
+                connectorSession, table, dynamicFilter, constraint);
 
         SplitSource splitSource = new 
ConnectorAwareSplitSource(source.getCatalogHandle(), connectorSplitSource);
         if (this.minScheduleSplitBatchSize > 1) {
@@ -386,7 +374,9 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
 
     @Override
     public TableIf getTargetTable() {
-        return source.getTargetTable();
+        // can not use `source.getTargetTable()`
+        // because source is null when called getTargetTable
+        return desc.getTable();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
index 85e36517bae..20dcf996595 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java
@@ -27,6 +27,7 @@ import io.trino.Session;
 import io.trino.connector.ConnectorName;
 import io.trino.spi.connector.CatalogHandle;
 import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTransactionHandle;
 
@@ -40,16 +41,19 @@ public class TrinoConnectorSource {
     private final ConnectorName connectorName;
     private ConnectorTransactionHandle connectorTransactionHandle;
     private ConnectorTableHandle trinoConnectorTableHandle;
+    private ConnectorMetadata connectorMetadata;
 
-    public TrinoConnectorSource(TupleDescriptor desc, 
TrinoConnectorExternalTable table) {
+    public TrinoConnectorSource(TupleDescriptor desc) {
         this.desc = desc;
-        this.trinoConnectorExtTable = table;
-        this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) 
table.getCatalog();
+        this.trinoConnectorExtTable = (TrinoConnectorExternalTable) 
desc.getTable();
+        this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) 
trinoConnectorExtTable.getCatalog();
         this.catalogHandle = 
trinoConnectorExternalCatalog.getTrinoCatalogHandle();
-        this.trinoConnectorTableHandle = table.getConnectorTableHandle();
+        this.trinoConnectorTableHandle = 
trinoConnectorExtTable.getConnectorTableHandle();
+        this.connectorMetadata = trinoConnectorExtTable.getConnectorMetadata();
+        this.connectorTransactionHandle = 
trinoConnectorExtTable.getConnectorTransactionHandle();
         this.trinoSession = trinoConnectorExternalCatalog.getTrinoSession();
-        this.connector = ((TrinoConnectorExternalCatalog) 
table.getCatalog()).getConnector();
-        this.connectorName = ((TrinoConnectorExternalCatalog) 
table.getCatalog()).getConnectorName();
+        this.connector = trinoConnectorExternalCatalog.getConnector();
+        this.connectorName = trinoConnectorExternalCatalog.getConnectorName();
     }
 
     public TupleDescriptor getDesc() {
@@ -88,8 +92,8 @@ public class TrinoConnectorSource {
         return connectorName;
     }
 
-    public void setConnectorTransactionHandle(ConnectorTransactionHandle 
connectorTransactionHandle) {
-        this.connectorTransactionHandle = connectorTransactionHandle;
+    public ConnectorMetadata getConnectorMetadata() {
+        return connectorMetadata;
     }
 
     public void setTrinoConnectorTableHandle(ConnectorTableHandle 
trinoConnectorExtTableHandle) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to