This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fc27d7a5bd0 [Fix](trino-connector) refactor code of TrinoConnectorExternalTable (#34496) fc27d7a5bd0 is described below commit fc27d7a5bd00566ef0427c4f40d689efc20212c1 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Fri May 24 11:45:42 2024 +0800 [Fix](trino-connector) refactor code of TrinoConnectorExternalTable (#34496) 1. Fix the issue with the trino-connector accessing DeltaLake: Cache `ConnectorMetadata` and `ConnectorTransactionHandle`. 2. refactor some code --- .../datasource/paimon/source/PaimonScanNode.java | 16 ++----- .../datasource/paimon/source/PaimonSource.java | 9 ++-- .../TrinoConnectorExternalTable.java | 30 ++++++++---- .../trinoconnector/TrinoSchemaCacheValue.java | 54 +++++++++++++++++++--- .../source/TrinoConnectorScanNode.java | 38 ++++++--------- .../source/TrinoConnectorSource.java | 20 ++++---- 6 files changed, 100 insertions(+), 67 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 003ced7ead7..3ab5b3ec657 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -20,12 +20,10 @@ package org.apache.doris.datasource.paimon.source; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; @@ -115,17 +113,9 @@ public class PaimonScanNode extends FileQueryScanNode { @Override protected void doInitialize() throws UserException { - ExternalTable table = (ExternalTable) desc.getTable(); - if (table.isView()) { - throw new AnalysisException( - String.format("Querying external view '%s.%s' is not supported", table.getDbName(), - table.getName())); - } - computeColumnsFilter(); - initBackendPolicy(); - source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange); + super.doInitialize(); + source = new PaimonSource(desc); Preconditions.checkNotNull(source); - initSchemaParams(); PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter( source.getPaimonTable().rowType()); predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts); @@ -330,7 +320,7 @@ public class PaimonScanNode extends FileQueryScanNode { @Override public TableIf getTargetTable() { - return source.getTargetTable(); + return desc.getTable(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 9ac44537e8a..da948d2b063 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -23,12 +23,10 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; -import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileAttributes; import org.apache.paimon.table.Table; -import java.util.Map; public class PaimonSource { private final PaimonExternalTable paimonExtTable; @@ -36,11 +34,10 @@ public class PaimonSource { private final TupleDescriptor desc; - public PaimonSource(PaimonExternalTable table, TupleDescriptor desc, - Map<String, ColumnRange> columnNameToRange) { - this.paimonExtTable = table; - this.originTable = paimonExtTable.getPaimonTable(); + public PaimonSource(TupleDescriptor desc) { this.desc = desc; + this.paimonExtTable = (PaimonExternalTable) desc.getTable(); + this.originTable = paimonExtTable.getPaimonTable(); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java index 3cba264861e..27f9b8086a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java @@ -143,7 +143,8 @@ public class TrinoConnectorExternalTable extends ExternalTable { } Map<String, ColumnMetadata> columnMetadataMap = columnMetadataMapBuilder.buildOrThrow(); return Optional.of( - new TrinoSchemaCacheValue(columns, connectorTableHandle, columnHandleMap, columnMetadataMap)); + new TrinoSchemaCacheValue(columns, connectorMetadata, connectorTableHandle, connectorTransactionHandle, + columnHandleMap, columnMetadataMap)); } @Override @@ -160,11 +161,7 @@ public class TrinoConnectorExternalTable extends ExternalTable { return tTableDescriptor; } - protected Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) { - return trinoConnectorPrimitiveTypeToDorisType(type); - } - - private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type type) { + private Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) { if (type instanceof BooleanType) { return Type.BOOLEAN; } else if (type instanceof TinyintType) { @@ -201,19 +198,19 @@ public class TrinoConnectorExternalTable extends ExternalTable { TimestampWithTimeZoneType timestampWithTimeZoneType = (TimestampWithTimeZoneType) type; return ScalarType.createDatetimeV2Type(timestampWithTimeZoneType.getPrecision()); } else if (type instanceof io.trino.spi.type.ArrayType) { - Type elementType = trinoConnectorPrimitiveTypeToDorisType( + Type elementType = trinoConnectorTypeToDorisType( ((io.trino.spi.type.ArrayType) type).getElementType()); return ArrayType.create(elementType, true); } else if (type instanceof io.trino.spi.type.MapType) { - Type keyType = trinoConnectorPrimitiveTypeToDorisType( + Type keyType = trinoConnectorTypeToDorisType( ((io.trino.spi.type.MapType) type).getKeyType()); - Type valueType = trinoConnectorPrimitiveTypeToDorisType( + Type valueType = trinoConnectorTypeToDorisType( ((io.trino.spi.type.MapType) type).getValueType()); return new MapType(keyType, valueType, true, true); } else if (type instanceof RowType) { ArrayList<StructField> dorisFields = Lists.newArrayList(); for (Field field : ((RowType) type).getFields()) { - Type childType = trinoConnectorPrimitiveTypeToDorisType(field.getType()); + Type childType = trinoConnectorTypeToDorisType(field.getType()); if (field.getName().isPresent()) { dorisFields.add(new StructField(field.getName().get(), childType)); } else { @@ -233,6 +230,19 @@ public class TrinoConnectorExternalTable extends ExternalTable { .orElse(null); } + public ConnectorMetadata getConnectorMetadata() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) value).getConnectorMetadata()).orElse(null); + } + + public ConnectorTransactionHandle getConnectorTransactionHandle() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) value).getConnectorTransactionHandle()) + .orElse(null); + } + public Map<String, ColumnHandle> getColumnHandleMap() { makeSureInitialized(); Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java index dc629190cbd..43bbe76c3b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java @@ -22,27 +22,69 @@ import org.apache.doris.datasource.SchemaCacheValue; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorTableHandle; -import lombok.Getter; -import lombok.Setter; +import io.trino.spi.connector.ConnectorTransactionHandle; import java.util.List; import java.util.Map; import java.util.Optional; -@Getter -@Setter public class TrinoSchemaCacheValue extends SchemaCacheValue { - + private ConnectorMetadata connectorMetadata; private Optional<ConnectorTableHandle> connectorTableHandle; + private ConnectorTransactionHandle connectorTransactionHandle; private Map<String, ColumnHandle> columnHandleMap; private Map<String, ColumnMetadata> columnMetadataMap; - public TrinoSchemaCacheValue(List<Column> schema, Optional<ConnectorTableHandle> connectorTableHandle, + public TrinoSchemaCacheValue(List<Column> schema, ConnectorMetadata connectorMetadata, + Optional<ConnectorTableHandle> connectorTableHandle, ConnectorTransactionHandle connectorTransactionHandle, Map<String, ColumnHandle> columnHandleMap, Map<String, ColumnMetadata> columnMetadataMap) { super(schema); + this.connectorMetadata = connectorMetadata; + this.connectorTableHandle = connectorTableHandle; + this.connectorTransactionHandle = connectorTransactionHandle; + this.columnHandleMap = columnHandleMap; + this.columnMetadataMap = columnMetadataMap; + } + + public ConnectorMetadata getConnectorMetadata() { + return connectorMetadata; + } + + public Optional<ConnectorTableHandle> getConnectorTableHandle() { + return connectorTableHandle; + } + + public ConnectorTransactionHandle getConnectorTransactionHandle() { + return connectorTransactionHandle; + } + + public Map<String, ColumnHandle> getColumnHandleMap() { + return columnHandleMap; + } + + public Map<String, ColumnMetadata> getColumnMetadataMap() { + return columnMetadataMap; + } + + public void setConnectorMetadata(ConnectorMetadata connectorMetadata) { + this.connectorMetadata = connectorMetadata; + } + + public void setConnectorTableHandle(Optional<ConnectorTableHandle> connectorTableHandle) { this.connectorTableHandle = connectorTableHandle; + } + + public void setConnectorTransactionHandle(ConnectorTransactionHandle connectorTransactionHandle) { + this.connectorTransactionHandle = connectorTransactionHandle; + } + + public void setColumnHandleMap(Map<String, ColumnHandle> columnHandleMap) { this.columnHandleMap = columnHandleMap; + } + + public void setColumnMetadataMap(Map<String, ColumnMetadata> columnMetadataMap) { this.columnMetadataMap = columnMetadataMap; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java index 9c61d54614a..2b09e30026c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java @@ -27,7 +27,6 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; -import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; @@ -66,13 +65,11 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorTableHandle; -import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.transaction.IsolationLevel; import io.trino.spi.type.TypeManager; import io.trino.split.BufferingSplitSource; import io.trino.split.ConnectorAwareSplitSource; @@ -106,17 +103,8 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { @Override protected void doInitialize() throws UserException { - TrinoConnectorExternalTable table = (TrinoConnectorExternalTable) desc.getTable(); - if (table.isView()) { - throw new AnalysisException( - String.format("Querying external view '%s.%s' is not supported", table.getDbName(), - table.getName())); - } - - computeColumnsFilter(); - initBackendPolicy(); - source = new TrinoConnectorSource(desc, table); - initSchemaParams(); + super.doInitialize(); + source = new TrinoConnectorSource(desc); convertPredicate(); } @@ -144,12 +132,9 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { public List<Split> getSplits() throws UserException { // 1. Get necessary objects Connector connector = source.getConnector(); - ConnectorTransactionHandle connectorTransactionHandle = connector.beginTransaction( - IsolationLevel.READ_UNCOMMITTED, true, true); - source.setConnectorTransactionHandle(connectorTransactionHandle); + connectorMetadata = source.getConnectorMetadata(); ConnectorSession connectorSession = source.getTrinoSession().toConnectorSession(source.getCatalogHandle()); - connectorMetadata = connector.getMetadata(connectorSession, connectorTransactionHandle); // 2. Begin query connectorMetadata.beginQuery(connectorSession); applyPushDown(connectorSession); @@ -157,7 +142,7 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { // 3. get splitSource List<Split> splits = Lists.newArrayList(); try (SplitSource splitSource = getTrinoSplitSource(connector, source.getTrinoSession(), - connectorTransactionHandle, source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) { + source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) { // 4. get trino.Splits and convert it to doris.Splits while (!splitSource.isFinished()) { for (io.trino.metadata.Split split : getNextSplitBatch(splitSource)) { @@ -165,6 +150,10 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { } } } + + // 4. Clear query + // It is necessary for hive connector + connectorMetadata.cleanupQuery(connectorSession); return splits; } @@ -216,8 +205,7 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { // } } - private SplitSource getTrinoSplitSource(Connector connector, Session session, - ConnectorTransactionHandle connectorTransactionHandle, ConnectorTableHandle table, + private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorTableHandle table, DynamicFilter dynamicFilter) { ConnectorSplitManager splitManager = connector.getSplitManager(); @@ -227,8 +215,8 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { ConnectorSession connectorSession = session.toConnectorSession(source.getCatalogHandle()); // Constraint is not used by Hive/BigQuery Connector - ConnectorSplitSource connectorSplitSource = splitManager.getSplits(connectorTransactionHandle, connectorSession, - table, dynamicFilter, constraint); + ConnectorSplitSource connectorSplitSource = splitManager.getSplits(source.getConnectorTransactionHandle(), + connectorSession, table, dynamicFilter, constraint); SplitSource splitSource = new ConnectorAwareSplitSource(source.getCatalogHandle(), connectorSplitSource); if (this.minScheduleSplitBatchSize > 1) { @@ -386,7 +374,9 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { @Override public TableIf getTargetTable() { - return source.getTargetTable(); + // can not use `source.getTargetTable()` + // because source is null when called getTargetTable + return desc.getTable(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java index 85e36517bae..20dcf996595 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java @@ -27,6 +27,7 @@ import io.trino.Session; import io.trino.connector.ConnectorName; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -40,16 +41,19 @@ public class TrinoConnectorSource { private final ConnectorName connectorName; private ConnectorTransactionHandle connectorTransactionHandle; private ConnectorTableHandle trinoConnectorTableHandle; + private ConnectorMetadata connectorMetadata; - public TrinoConnectorSource(TupleDescriptor desc, TrinoConnectorExternalTable table) { + public TrinoConnectorSource(TupleDescriptor desc) { this.desc = desc; - this.trinoConnectorExtTable = table; - this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) table.getCatalog(); + this.trinoConnectorExtTable = (TrinoConnectorExternalTable) desc.getTable(); + this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) trinoConnectorExtTable.getCatalog(); this.catalogHandle = trinoConnectorExternalCatalog.getTrinoCatalogHandle(); - this.trinoConnectorTableHandle = table.getConnectorTableHandle(); + this.trinoConnectorTableHandle = trinoConnectorExtTable.getConnectorTableHandle(); + this.connectorMetadata = trinoConnectorExtTable.getConnectorMetadata(); + this.connectorTransactionHandle = trinoConnectorExtTable.getConnectorTransactionHandle(); this.trinoSession = trinoConnectorExternalCatalog.getTrinoSession(); - this.connector = ((TrinoConnectorExternalCatalog) table.getCatalog()).getConnector(); - this.connectorName = ((TrinoConnectorExternalCatalog) table.getCatalog()).getConnectorName(); + this.connector = trinoConnectorExternalCatalog.getConnector(); + this.connectorName = trinoConnectorExternalCatalog.getConnectorName(); } public TupleDescriptor getDesc() { @@ -88,8 +92,8 @@ public class TrinoConnectorSource { return connectorName; } - public void setConnectorTransactionHandle(ConnectorTransactionHandle connectorTransactionHandle) { - this.connectorTransactionHandle = connectorTransactionHandle; + public ConnectorMetadata getConnectorMetadata() { + return connectorMetadata; } public void setTrinoConnectorTableHandle(ConnectorTableHandle trinoConnectorExtTableHandle) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org