This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9f30144 [feature] support struct and map type (#212) 9f30144 is described below commit 9f30144120964eeaf9c2757afcfc1242e588a8f0 Author: wudi <676366...@qq.com> AuthorDate: Thu Oct 19 10:05:09 2023 +0800 [feature] support struct and map type (#212) Support doris map, struct type reading and writing ```java //doris create table CREATE TABLE `simple_map2` ( `id` int(11) NULL, `m` MAP<text,int(11)> NULL, `s_info` STRUCT<s_id:int(11),s_name:text,s_address:text> NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "is_being_synced" = "false", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false" ); //datagen->doris tEnv.executeSql( "CREATE TABLE doris_test (" + " id int,\n" + " task Map<String,int>,\n" + " buyer ROW<s_id int,s_name string, s_address string>\n" + ") " + "WITH (\n" + " 'connector' = 'datagen', \n" + " 'number-of-rows' = '11' \n" + ")"); tEnv.executeSql("CREATE TABLE blackhole_table (" + "id int," + "m Map<String,int>," + "s_info Row<s_id int,s_name string, s_address string>" + ") WITH (" + " 'connector' = 'doris',\n" + " 'fenodes' = '127.0.0.1:8030',\n" + " 'table.identifier' = 'test.simple_map2',\n" + " 'sink.enable-2pc' = 'false',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.properties.format' = 'json',\n" + " 'sink.properties.read_json_by_line' = 'true'\n" + ");"); tEnv.executeSql("INSERT INTO blackhole_table select * from doris_test"); //doris->doris tEnv.executeSql( "CREATE TABLE doris_source (" + "id int," + "m Map<String,int>," + "s_info Row<s_id int,s_name string, s_address string>" + ") " + "WITH (\n" + " 'connector' = 'doris',\n" + " 'fenodes' = '127.0.0.1:8030',\n" + " 'table.identifier' = 'test.simple_map',\n" + " 'username' = 'root',\n" + " 'password' = ''\n" + ")"); tEnv.executeSql("CREATE TABLE blackhole_table (" + "id int," + "m Map<String,int>," + "s_info Row<s_id int,s_name string, s_address string>" + ") WITH (" + " 'connector' = 'doris',\n" + " 'fenodes' = '127.0.0.1:8030',\n" + " 'table.identifier' = 'test.simple_map2',\n" + " 'sink.enable-2pc' = 'false',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.properties.format' = 'json',\n" + " 'sink.properties.read_json_by_line' = 'true'\n" + ");"); tEnv.executeSql("insert into blackhole_table select * from doris_source"); ``` --- .../converter/DorisRowConverter.java | 29 +++++ .../apache/doris/flink/serialization/RowBatch.java | 30 +++++ .../doris/flink/serialization/TestRowBatch.java | 144 +++++++++++++++++++++ 3 files changed, 203 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java index 6fa3be9..ebc0ff6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java @@ -220,7 +220,9 @@ public class DorisRowConverter implements Serializable { case ARRAY: return val -> convertArrayData(((List<?>) val).toArray(), type); case ROW: + return val -> convertRowData((Map<String, ?>) val, type); case MAP: + return val -> convertMapData((Map) val, type); case MULTISET: case RAW: default: @@ -298,6 +300,33 @@ public class DorisRowConverter implements Serializable { return arrayData; } + private MapData convertMapData(Map<Object, Object> map, LogicalType type){ + MapType mapType = (MapType) type; + DeserializationConverter keyConverter = createNullableInternalConverter(mapType.getKeyType()); + DeserializationConverter valueConverter = createNullableInternalConverter(mapType.getValueType()); + Map<Object, Object> result = new HashMap<>(); + for(Map.Entry<Object, Object> entry : map.entrySet()){ + Object key = keyConverter.deserialize(entry.getKey()); + Object value = valueConverter.deserialize(entry.getValue()); + result.put(key, value); + } + GenericMapData mapData = new GenericMapData(result); + return mapData; + } + + private RowData convertRowData(Map<String, ?> row, LogicalType type) { + RowType rowType = (RowType) type; + GenericRowData rowData = new GenericRowData(row.size()); + int index = 0; + for(Map.Entry<String, ?> entry : row.entrySet()){ + DeserializationConverter converter = createNullableInternalConverter(rowType.getTypeAt(index)); + Object value = converter.deserialize(entry.getValue()); + rowData.setField(index, value); + index++; + } + return rowData; + } + private List<Object> convertArrayData(ArrayData array, LogicalType type){ if(array instanceof GenericArrayData){ return Arrays.asList(((GenericArrayData)array).toObjectArray()); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index de63a6e..ad8bb72 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -32,6 +32,9 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.UnionMapReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; @@ -50,7 +53,9 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; /** @@ -329,6 +334,31 @@ public class RowBatch { //todo: when the subtype of array is date, conversion is required addValueToRow(rowIndex, listValue); break; + case "MAP": + if (!minorType.equals(Types.MinorType.MAP)) return false; + MapVector mapVector = (MapVector) fieldVector; + UnionMapReader reader = mapVector.getReader(); + if (mapVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + break; + } + reader.setPosition(rowIndex); + Map<String, Object> mapValue = new HashMap<>(); + while (reader.next()) { + mapValue.put(reader.key().readObject().toString(), reader.value().readObject()); + } + addValueToRow(rowIndex, mapValue); + break; + case "STRUCT": + if (!minorType.equals(Types.MinorType.STRUCT)) return false; + StructVector structVector = (StructVector) fieldVector; + if (structVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + break; + } + Map<String, ?> structValue = structVector.getObject(rowIndex); + addValueToRow(rowIndex, structValue); + break; default: String errMsg = "Unsupported type " + schema.get(col).getType(); logger.error(errMsg); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index e2ee827..47071f5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.serialization; +import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -30,17 +31,25 @@ import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.NullableStructWriter; +import org.apache.arrow.vector.complex.impl.UnionMapWriter; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.Text; +import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.sdk.thrift.TScanBatchResult; import org.apache.doris.sdk.thrift.TStatus; import org.apache.doris.sdk.thrift.TStatusCode; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; import org.apache.flink.table.data.DecimalData; import org.junit.Assert; import org.junit.Rule; @@ -50,7 +59,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -441,4 +452,137 @@ public class TestRowBatch { thrown.expectMessage(startsWith("Get row offset:")); rowBatch.next(); } + + @Test + public void testMap() throws IOException, DorisException { + + ImmutableList<Field> mapChildren = ImmutableList.of( + new Field("child", new FieldType(false, new ArrowType.Struct(), null), + ImmutableList.of( + new Field("key", new FieldType(false, new ArrowType.Utf8(), null), null), + new Field("value", new FieldType(false, new ArrowType.Int(32, true), null), + null) + ) + )); + + ImmutableList<Field> fields = ImmutableList.of( + new Field("col_map", new FieldType(false, new ArrowType.Map(false), null), + mapChildren) + ); + + RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + VectorSchemaRoot root = VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(fields, null), allocator); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( + root, + new DictionaryProvider.MapDictionaryProvider(), + outputStream); + + arrowStreamWriter.start(); + root.setRowCount(3); + + MapVector mapVector = (MapVector) root.getVector("col_map"); + mapVector.allocateNew(); + UnionMapWriter mapWriter = mapVector.getWriter(); + for (int i = 0; i < 3; i++) { + mapWriter.setPosition(i); + mapWriter.startMap(); + mapWriter.startEntry(); + String key = "k" + (i + 1); + byte[] bytes = key.getBytes(StandardCharsets.UTF_8); + ArrowBuf buffer = allocator.buffer(bytes.length); + buffer.setBytes(0, bytes); + mapWriter.key().varChar().writeVarChar(0, bytes.length, buffer); + buffer.close(); + mapWriter.value().integer().writeInt(i); + mapWriter.endEntry(); + mapWriter.endMap(); + } + mapWriter.setValueCount(3); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = "{\"properties\":[{\"type\":\"MAP\",\"name\":\"col_map\",\"comment\":\"\"}" + + "], \"status\":200}"; + + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + Assert.assertTrue(rowBatch.hasNext()); + Assert.assertTrue(ImmutableMap.of("k1", 0).equals(rowBatch.next().get(0))); + Assert.assertTrue(rowBatch.hasNext()); + Assert.assertTrue(ImmutableMap.of("k2", 1).equals(rowBatch.next().get(0))); + Assert.assertTrue(rowBatch.hasNext()); + Assert.assertTrue(ImmutableMap.of("k3", 2).equals(rowBatch.next().get(0))); + Assert.assertFalse(rowBatch.hasNext()); + + } + + @Test + public void testStruct() throws IOException, DorisException { + + ImmutableList<Field> fields = ImmutableList.of( + new Field("col_struct", new FieldType(false, new ArrowType.Struct(), null), + ImmutableList.of(new Field("a", new FieldType(false, new ArrowType.Utf8(), null), null), + new Field("b", new FieldType(false, new ArrowType.Int(32, true), null), null)) + )); + + RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + VectorSchemaRoot root = VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(fields, null), allocator); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( + root, + new DictionaryProvider.MapDictionaryProvider(), + outputStream); + + arrowStreamWriter.start(); + root.setRowCount(3); + + StructVector structVector = (StructVector) root.getVector("col_struct"); + structVector.allocateNew(); + NullableStructWriter writer = structVector.getWriter(); + writer.setPosition(0); + writer.start(); + byte[] bytes = "a1".getBytes(StandardCharsets.UTF_8); + ArrowBuf buffer = allocator.buffer(bytes.length); + buffer.setBytes(0, bytes); + writer.varChar("a").writeVarChar(0, bytes.length, buffer); + buffer.close(); + writer.integer("b").writeInt(1); + writer.end(); + writer.setValueCount(1); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = "{\"properties\":[{\"type\":\"STRUCT\",\"name\":\"col_struct\",\"comment\":\"\"}" + + "], \"status\":200}"; + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + Assert.assertTrue(rowBatch.hasNext()); + Assert.assertTrue(ImmutableMap.of("a", new Text("a1"),"b",1).equals(rowBatch.next().get(0))); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org