This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 22bf2889e5d [feature](tvf)(jni-avro)jni-avro scanner add complex data types (#26236) 22bf2889e5d is described below commit 22bf2889e5d47175d18942fa02db47d1f6128763 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Nov 9 13:58:49 2023 +0800 [feature](tvf)(jni-avro)jni-avro scanner add complex data types (#26236) Support avro's enum, record, union data types --- be/src/vec/exec/format/avro/avro_jni_reader.cpp | 44 +++--- .../avro/avro_all_types/all_type.avro | Bin 0 -> 1155 bytes .../org/apache/doris/avro/AvroColumnValue.java | 34 ++++- .../java/org/apache/doris/avro/AvroJNIScanner.java | 56 +------- .../java/org/apache/doris/avro/AvroTypeUtils.java | 122 ++++++++++++++++ .../org/apache/doris/avro/AvroTypeUtilsTest.java | 105 ++++++++++++++ .../apache/doris/common/jni/vec/TableSchema.java | 10 +- .../data/external_table_p0/tvf/test_tvf_avro.out | 73 ++++++++++ .../external_table_p0/tvf/test_tvf_avro.groovy | 154 +++++++++++++++++++++ 9 files changed, 513 insertions(+), 85 deletions(-) diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index e682ff9886d..f3cff19c04d 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -86,18 +86,10 @@ Status AvroJNIReader::init_fetch_table_reader( {"file_type", std::to_string(type)}, {"is_get_table_schema", "false"}, {"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}}; - switch (type) { - case TFileType::FILE_HDFS: - required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value)); - break; - case TFileType::FILE_S3: - required_param.insert(std::make_pair("uri", _range.path)); + if (type == TFileType::FILE_S3) { required_param.insert(_params.properties.begin(), _params.properties.end()); - break; - default: - return Status::InternalError("unsupported file reader type: {}", std::to_string(type)); } - required_param.insert(_params.properties.begin(), _params.properties.end()); + required_param.insert(std::make_pair("uri", _range.path)); _jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner", required_param, column_names); RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range)); @@ -144,8 +136,7 @@ Status AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names, } TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) { - ::doris::TPrimitiveType::type schema_type = - static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt()); + auto schema_type = static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt()); switch (schema_type) { case TPrimitiveType::INT: case TPrimitiveType::STRING: @@ -153,30 +144,35 @@ TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& colu case TPrimitiveType::BOOLEAN: case TPrimitiveType::DOUBLE: case TPrimitiveType::FLOAT: - return TypeDescriptor(thrift_to_type(schema_type)); + case TPrimitiveType::BINARY: + return {thrift_to_type(schema_type)}; case TPrimitiveType::ARRAY: { TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY); - list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); + const rapidjson::Value& childColumns = column_schema["childColumns"]; + list_type.add_sub_type(convert_to_doris_type(childColumns[0])); return list_type; } case TPrimitiveType::MAP: { TypeDescriptor map_type(PrimitiveType::TYPE_MAP); - + const rapidjson::Value& childColumns = column_schema["childColumns"]; // The default type of AVRO MAP structure key is STRING map_type.add_sub_type(PrimitiveType::TYPE_STRING); - map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); + map_type.add_sub_type(convert_to_doris_type(childColumns[1])); return map_type; } + case TPrimitiveType::STRUCT: { + TypeDescriptor struct_type(PrimitiveType::TYPE_STRUCT); + const rapidjson::Value& childColumns = column_schema["childColumns"]; + for (auto i = 0; i < childColumns.Size(); i++) { + const rapidjson::Value& child = childColumns[i]; + struct_type.add_sub_type(convert_to_doris_type(childColumns[i]), + std::string(child["name"].GetString())); + } + return struct_type; + } default: - return TypeDescriptor(PrimitiveType::INVALID_TYPE); + return {PrimitiveType::INVALID_TYPE}; } } -TypeDescriptor AvroJNIReader::convert_complex_type( - const rapidjson::Document::ConstObject child_schema) { - ::doris::TPrimitiveType::type child_schema_type = - static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt()); - return TypeDescriptor(thrift_to_type(child_schema_type)); -} - } // namespace doris::vectorized diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro new file mode 100644 index 00000000000..c66017bb624 Binary files /dev/null and b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro differ diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java index dd72c9aad50..77c6fba37dc 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java @@ -19,17 +19,25 @@ package org.apache.doris.avro; import org.apache.doris.common.jni.vec.ColumnValue; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Fixed; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; public class AvroColumnValue implements ColumnValue { @@ -42,6 +50,12 @@ public class AvroColumnValue implements ColumnValue { } private Object inspectObject() { + if (fieldData instanceof ByteBuffer) { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(((ByteBuffer) fieldData).array()); + } else if (fieldData instanceof Fixed) { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject( + ((GenericFixed) fieldData).bytes()); + } return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); } @@ -162,6 +176,24 @@ public class AvroColumnValue implements ColumnValue { @Override public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) { - + StructObjectInspector inspector = (StructObjectInspector) fieldInspector; + List<? extends StructField> fields = inspector.getAllStructFieldRefs(); + for (Integer idx : structFieldIndex) { + AvroColumnValue cv = null; + if (idx != null) { + StructField sf = fields.get(idx); + Object o; + if (fieldData instanceof GenericData.Record) { + GenericRecord record = (GenericRecord) inspector.getStructFieldData(fieldData, sf); + o = record.get(idx); + } else { + o = inspector.getStructFieldData(fieldData, sf); + } + if (Objects.nonNull(o)) { + cv = new AvroColumnValue(sf.getFieldObjectInspector(), o); + } + } + values.add(cv); + } } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 11bce610d70..75cbc721e31 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -21,12 +21,9 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; -import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn; import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TPrimitiveType; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; @@ -40,10 +37,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -193,54 +187,6 @@ public class AvroJNIScanner extends JniScanner { @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { Schema schema = avroReader.getSchema(); - List<Field> schemaFields = schema.getFields(); - List<SchemaColumn> schemaColumns = new ArrayList<>(); - for (Field schemaField : schemaFields) { - Schema avroSchema = schemaField.schema(); - String columnName = schemaField.name().toLowerCase(Locale.ROOT); - - SchemaColumn schemaColumn = new SchemaColumn(); - TPrimitiveType tPrimitiveType = serializeSchemaType(avroSchema, schemaColumn); - schemaColumn.setName(columnName); - schemaColumn.setType(tPrimitiveType); - schemaColumns.add(schemaColumn); - } - return new TableSchema(schemaColumns); - } - - private TPrimitiveType serializeSchemaType(Schema avroSchema, SchemaColumn schemaColumn) - throws UnsupportedOperationException { - Schema.Type type = avroSchema.getType(); - switch (type) { - case NULL: - return TPrimitiveType.NULL_TYPE; - case STRING: - return TPrimitiveType.STRING; - case INT: - return TPrimitiveType.INT; - case BOOLEAN: - return TPrimitiveType.BOOLEAN; - case LONG: - return TPrimitiveType.BIGINT; - case FLOAT: - return TPrimitiveType.FLOAT; - case BYTES: - return TPrimitiveType.BINARY; - case DOUBLE: - return TPrimitiveType.DOUBLE; - case ARRAY: - SchemaColumn arrayChildColumn = new SchemaColumn(); - schemaColumn.addChildColumn(arrayChildColumn); - arrayChildColumn.setType(serializeSchemaType(avroSchema.getElementType(), arrayChildColumn)); - return TPrimitiveType.ARRAY; - case MAP: - SchemaColumn mapChildColumn = new SchemaColumn(); - schemaColumn.addChildColumn(mapChildColumn); - mapChildColumn.setType(serializeSchemaType(avroSchema.getValueType(), mapChildColumn)); - return TPrimitiveType.MAP; - default: - throw new UnsupportedOperationException("avro format: " + type.getName() + " is not supported."); - } + return AvroTypeUtils.parseTableSchema(schema); } - } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java new file mode 100644 index 00000000000..cd597fa4cfc --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn; +import org.apache.doris.thrift.TPrimitiveType; + +import com.google.common.base.Preconditions; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.commons.compress.utils.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class AvroTypeUtils { + + protected static TableSchema parseTableSchema(Schema schema) throws UnsupportedOperationException { + List<Field> schemaFields = schema.getFields(); + List<SchemaColumn> schemaColumns = new ArrayList<>(); + for (Field schemaField : schemaFields) { + Schema avroSchema = schemaField.schema(); + String columnName = schemaField.name(); + + SchemaColumn schemaColumn = new SchemaColumn(); + TPrimitiveType tPrimitiveType = typeFromAvro(avroSchema, schemaColumn); + schemaColumn.setName(columnName); + schemaColumn.setType(tPrimitiveType); + schemaColumns.add(schemaColumn); + } + return new TableSchema(schemaColumns); + } + + private static TPrimitiveType typeFromAvro(Schema avroSchema, SchemaColumn schemaColumn) + throws UnsupportedOperationException { + Schema.Type type = avroSchema.getType(); + switch (type) { + case ENUM: + case STRING: + return TPrimitiveType.STRING; + case INT: + return TPrimitiveType.INT; + case BOOLEAN: + return TPrimitiveType.BOOLEAN; + case LONG: + return TPrimitiveType.BIGINT; + case FLOAT: + return TPrimitiveType.FLOAT; + case FIXED: + case BYTES: + return TPrimitiveType.BINARY; + case DOUBLE: + return TPrimitiveType.DOUBLE; + case ARRAY: + SchemaColumn arrayChildColumn = new SchemaColumn(); + schemaColumn.addChildColumns(Collections.singletonList(arrayChildColumn)); + arrayChildColumn.setType(typeFromAvro(avroSchema.getElementType(), arrayChildColumn)); + return TPrimitiveType.ARRAY; + case MAP: + // The default type of AVRO MAP structure key is STRING + SchemaColumn keyChildColumn = new SchemaColumn(); + keyChildColumn.setType(TPrimitiveType.STRING); + SchemaColumn valueChildColumn = new SchemaColumn(); + valueChildColumn.setType(typeFromAvro(avroSchema.getValueType(), valueChildColumn)); + + schemaColumn.addChildColumns(Arrays.asList(keyChildColumn, valueChildColumn)); + return TPrimitiveType.MAP; + case RECORD: + List<Field> fields = avroSchema.getFields(); + List<SchemaColumn> childSchemaColumn = Lists.newArrayList(); + for (Field field : fields) { + SchemaColumn structChildColumn = new SchemaColumn(); + structChildColumn.setName(field.name()); + structChildColumn.setType(typeFromAvro(field.schema(), structChildColumn)); + childSchemaColumn.add(structChildColumn); + } + schemaColumn.addChildColumns(childSchemaColumn); + return TPrimitiveType.STRUCT; + case UNION: + List<Schema> nonNullableMembers = filterNullableUnion(avroSchema); + Preconditions.checkArgument(!nonNullableMembers.isEmpty(), + avroSchema.getName() + "Union child type not all nullAble type"); + List<SchemaColumn> childSchemaColumns = Lists.newArrayList(); + for (Schema nullableMember : nonNullableMembers) { + SchemaColumn childColumn = new SchemaColumn(); + childColumn.setName(nullableMember.getName()); + childColumn.setType(typeFromAvro(nullableMember, childColumn)); + childSchemaColumns.add(childColumn); + } + schemaColumn.addChildColumns(childSchemaColumns); + return TPrimitiveType.STRUCT; + default: + throw new UnsupportedOperationException( + "avro format: " + avroSchema.getName() + type.getName() + " is not supported."); + } + } + + private static List<Schema> filterNullableUnion(Schema schema) { + Preconditions.checkArgument(schema.isUnion(), "Schema must be union"); + return schema.getTypes().stream().filter(s -> !s.isNullable()).collect(Collectors.toList()); + } + +} diff --git a/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java b/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java new file mode 100644 index 00000000000..04f5fd217bf --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.doris.common.jni.vec.TableSchema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class AvroTypeUtilsTest { + private Schema allTypesRecordSchema; + private final ObjectMapper objectMapper = new ObjectMapper(); + private String result; + + @Before + public void setUp() { + result = "[{\"name\":\"aBoolean\",\"type\":2,\"childColumns\":null},{\"name\":\"aInt\",\"type\":5," + + "\"childColumns\":null},{\"name\":\"aLong\",\"type\":6,\"childColumns\":null},{\"name\":\"" + + "aFloat\",\"type\":7,\"childColumns\":null},{\"name\":\"aDouble\",\"type\":8,\"childColumns\"" + + ":null},{\"name\":\"aString\",\"type\":23,\"childColumns\":null},{\"name\":\"aBytes\",\"type\"" + + ":11,\"childColumns\":null},{\"name\":\"aFixed\",\"type\":11,\"childColumns\":null},{\"name\"" + + ":\"anArray\",\"type\":20,\"childColumns\":[{\"name\":null,\"type\":5,\"childColumns\":null}]}" + + ",{\"name\":\"aMap\",\"type\":21,\"childColumns\":[{\"name\":null,\"type\":23,\"childColumns\"" + + ":null},{\"name\":null,\"type\":5,\"childColumns\":null}]},{\"name\":\"anEnum\",\"type\":23" + + ",\"childColumns\":null},{\"name\":\"aRecord\",\"type\":22,\"childColumns\":[{\"name\":\"a\"," + + "\"type\":5,\"childColumns\":null},{\"name\":\"b\",\"type\":8,\"childColumns\":null},{\"name\":" + + "\"c\",\"type\":23,\"childColumns\":null}]},{\"name\":\"aUnion\",\"type\":22,\"childColumns\":" + + "[{\"name\":\"string\",\"type\":23,\"childColumns\":null}]}]\n"; + + Schema simpleEnumSchema = SchemaBuilder.enumeration("myEnumType").symbols("A", "B", "C"); + Schema simpleRecordSchema = SchemaBuilder.record("simpleRecord") + .fields() + .name("a") + .type().intType().noDefault() + .name("b") + .type().doubleType().noDefault() + .name("c") + .type().stringType().noDefault() + .endRecord(); + + allTypesRecordSchema = SchemaBuilder.builder() + .record("all") + .fields() + .name("aBoolean") + .type().booleanType().noDefault() + .name("aInt") + .type().intType().noDefault() + .name("aLong") + .type().longType().noDefault() + .name("aFloat") + .type().floatType().noDefault() + .name("aDouble") + .type().doubleType().noDefault() + .name("aString") + .type().stringType().noDefault() + .name("aBytes") + .type().bytesType().noDefault() + .name("aFixed") + .type().fixed("myFixedType").size(16).noDefault() + .name("anArray") + .type().array().items().intType().noDefault() + .name("aMap") + .type().map().values().intType().noDefault() + .name("anEnum") + .type(simpleEnumSchema).noDefault() + .name("aRecord") + .type(simpleRecordSchema).noDefault() + .name("aUnion") + .type().optional().stringType() + .endRecord(); + } + + @Test + public void testParseTableSchema() throws IOException { + TableSchema tableSchema = AvroTypeUtils.parseTableSchema(allTypesRecordSchema); + String tableSchemaTableSchema = tableSchema.getTableSchema(); + JsonNode tableSchemaTree = objectMapper.readTree(tableSchemaTableSchema); + + JsonNode resultSchemaTree = objectMapper.readTree(result); + Assert.assertEquals(resultSchemaTree, tableSchemaTree); + } + +} diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java index 421feb55a3f..9e223d0435f 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java @@ -49,7 +49,7 @@ public class TableSchema { public static class SchemaColumn { private String name; private int type; - private SchemaColumn childColumn; + private List<SchemaColumn> childColumns; public SchemaColumn() { @@ -59,8 +59,8 @@ public class TableSchema { return name; } - public SchemaColumn getChildColumn() { - return childColumn; + public List<SchemaColumn> getChildColumns() { + return childColumns; } public int getType() { @@ -75,8 +75,8 @@ public class TableSchema { this.type = type.getValue(); } - public void addChildColumn(SchemaColumn childColumn) { - this.childColumn = childColumn; + public void addChildColumns(List<SchemaColumn> childColumns) { + this.childColumns = childColumns; } } diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out new file mode 100644 index 00000000000..8f39bd410c9 --- /dev/null +++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +aBoolean BOOLEAN Yes false \N NONE +aInt INT Yes false \N NONE +aLong BIGINT Yes false \N NONE +aFloat FLOAT Yes false \N NONE +aDouble DOUBLE Yes false \N NONE +aString TEXT Yes false \N NONE +anArray ARRAY<INT> Yes false \N NONE +aMap MAP<TEXT,INT> Yes false \N NONE +anEnum TEXT Yes false \N NONE +aRecord STRUCT<a:INT,b:DOUBLE,c:TEXT> Yes false \N NONE +aUnion STRUCT<string:TEXT> Yes false \N NONE +mapArrayLong MAP<TEXT,ARRAY<BIGINT>> Yes false \N NONE +arrayMapBoolean ARRAY<MAP<TEXT,BOOLEAN>> Yes false \N NONE + +-- !2 -- +2 + +-- !1 -- +false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] +true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] + +-- !2 -- +[1, 2, 3, 4] +[5, 6, 7] + +-- !3 -- +{"k1":1, "k2":2} +{"key1":1, "key2":2} + +-- !4 -- +A +B + +-- !5 -- +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} + +-- !6 -- +\N +\N + +-- !7 -- +{"k1":[99, 88, 77], "k2":[10, 20]} +{"k11":[77, 11, 33], "k22":[10, 20]} + +-- !8 -- +[{"Key11":1}, {"Key22":0}] +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] + +-- !3 -- +aBoolean BOOLEAN Yes false \N NONE +aInt INT Yes false \N NONE +aLong BIGINT Yes false \N NONE +aFloat FLOAT Yes false \N NONE +aDouble DOUBLE Yes false \N NONE +aString TEXT Yes false \N NONE +anArray ARRAY<INT> Yes false \N NONE +aMap MAP<TEXT,INT> Yes false \N NONE +anEnum TEXT Yes false \N NONE +aRecord STRUCT<a:INT,b:DOUBLE,c:TEXT> Yes false \N NONE +aUnion STRUCT<string:TEXT> Yes false \N NONE +mapArrayLong MAP<TEXT,ARRAY<BIGINT>> Yes false \N NONE +arrayMapBoolean ARRAY<MAP<TEXT,BOOLEAN>> Yes false \N NONE + +-- !9 -- +false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] +true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] + +-- !4 -- +2 + diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy new file mode 100644 index 00000000000..6f9b4f98b49 --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { + + def all_type_file = "all_type.avro"; + def format = "avro" + + // s3 config + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + def s3Uri = "https://${bucket}.${s3_endpoint}/regression/datalake/pipeline_data/tvf/${all_type_file}"; + + // hdfs config + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def hdfsUserName = "doris" + def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}" + def hdfsUri = "${defaultFS}" + "/user/doris/preinstalled_data/avro/avro_all_types/${all_type_file}" + + // TVF s3() + qt_1 """ + desc function s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_2 """ + select count(*) from s3( + "uri" ="${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_1 """ + select * from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_2 """ + select anArray from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_3 """ + select aMap from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_4 """ + select anEnum from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_5 """ + select aRecord from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_6 """ + select aUnion from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_7 """ + select mapArrayLong from s3( + "uri" ="${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_8 """ + select arrayMapBoolean from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + // TVF hdfs() + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + qt_3 """ + desc function HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "FORMAT" = "${format}"); """ + + order_qt_9 """ select * from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" + + qt_4 """ select count(*) from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); """ + } finally { + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org