This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new 87f39fb9c18 [doc](ecosystem) improve spark connector doc (#1292) 87f39fb9c18 is described below commit 87f39fb9c181b4fdd0a8b1e03f4c0202ab9436c6 Author: wudi <676366...@qq.com> AuthorDate: Wed Nov 6 09:55:08 2024 +0800 [doc](ecosystem) improve spark connector doc (#1292) --- common_docs_zh/ecosystem/spark-doris-connector.md | 132 +++++++++---------- ecosystem/spark-doris-connector.md | 147 +++++++++++----------- 2 files changed, 140 insertions(+), 139 deletions(-) diff --git a/common_docs_zh/ecosystem/spark-doris-connector.md b/common_docs_zh/ecosystem/spark-doris-connector.md index c56fecbeee7..29d69d7daf9 100644 --- a/common_docs_zh/ecosystem/spark-doris-connector.md +++ b/common_docs_zh/ecosystem/spark-doris-connector.md @@ -44,11 +44,30 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | | 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 | -## 编译与安装 +## 使用 -准备工作 +### Maven +``` +<dependency> + <groupId>org.apache.doris</groupId> + <artifactId>spark-doris-connector-3.4_2.12</artifactId> + <version>1.3.2</version> +</dependency> +``` + +**备注** + +1. 请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。 + +2. 也可从[这里](https://repo.maven.apache.org/maven2/org/apache/doris/)下载相关版本 jar 包。 + +### 编译 + +编译时,可直接运行 `sh build.sh`,具体可参考这里。 + +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +也可以 -1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh` 2. 在源码目录下执行: `sh build.sh` @@ -59,36 +78,18 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar -包路径 - +例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 +```shell 1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到 hdfs。 -``` hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ -``` 2. 在集群中添加 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 依赖。 - -``` spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar -``` - -## 使用 Maven 管理 -``` -<dependency> - <groupId>org.apache.doris</groupId> - <artifactId>spark-doris-connector-3.4_2.12</artifactId> - <version>1.3.0</version> -</dependency> ``` -**注意** - -请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。 - ## 使用示例 ### 读取 @@ -112,7 +113,7 @@ FROM spark_doris; #### DataFrame -```scala +```java val dorisSparkDF = spark.read.format("doris") .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME") .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") @@ -125,7 +126,7 @@ dorisSparkDF.show(5) #### RDD -```scala +```java import org.apache.doris.spark._ val dorisSparkRDD = sc.dorisRDD( @@ -142,7 +143,7 @@ dorisSparkRDD.collect() #### pySpark -```scala +```java dorisSparkDF = spark.read.format("doris") .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME") .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") @@ -182,8 +183,8 @@ FROM YOUR_TABLE #### DataFrame(batch/stream) -```scala -## batch sink +```java +// batch sink val mockDataDF = List( (3, "440403001005", "21.cn"), (1, "4404030013005", "22.cn"), @@ -203,9 +204,9 @@ mockDataDF.write.format("doris") // .option("save_mode", SaveMode.Overwrite) .save() -## stream sink(StructuredStreaming) +// stream sink(StructuredStreaming) -### 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。 +// 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。 val sourceDf = spark.readStream. .format("your_own_stream_source") .load() @@ -222,7 +223,7 @@ resultDf.writeStream .start() .awaitTermination() -### 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值 +// 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值 val kafkaSource = spark.readStream .format("kafka") @@ -265,14 +266,14 @@ kafkaSource.selectExpr("CAST(value as STRING)") | doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | | doris.request.tablet.size | Integer.MAX_VALUE | 一个 RDD Partition 对应的 Doris Tablet 个数。<br />此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | | doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | -| doris.batch.size | 1024 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 | +| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 | | doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | | doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | | doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照 Doris 表字段顺序写入全部字段。 | | doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数 | -| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持 3 种格式:csv,json,arrow(1.4.0 版本开始支持)<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持 3 种格式:csv,json,arrow <br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.properties.* | -- | Stream Load 的导入参数。<br/>例如:<br/>指定列分隔符:`'doris.sink.properties.column_separator' = ','`等<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。<br/>此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | | doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。<br/>如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | @@ -303,9 +304,36 @@ kafkaSource.selectExpr("CAST(value as STRING)") | doris.request.auth.password | -- | 访问 Doris 的密码 | | doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 | -:::tip -1. 在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型,并通过 `doris.write.fields` 对列进行映射转换,使用方式如下: +## Doris 和 Spark 列类型映射关系 + +| Doris Type | Spark Type | +|------------|----------------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.StringType<sup>1</sup> | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | Unsupported datatype | +| Bitmap | Unsupported datatype | + +* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。 + + +## 常见问题 +1. **如何写入 Bitmap 类型?** + +在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型,并通过 `doris.write.fields` 对列进行映射转换,使用方式如下: > BITMAP > ```sql > CREATE TEMPORARY VIEW spark_doris @@ -333,13 +361,11 @@ kafkaSource.selectExpr("CAST(value as STRING)") > ); > ``` +2. **如何使用overwrite写入?** -2. 从 1.3.0 版本开始, `doris.sink.max-retries` 配置项的默认值为 0,即默认不进行重试。 - 当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 - -3. 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下 +从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下 > DataFrame -> ```scala +> ```java > resultDf.format("doris") > > .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") > // your own options @@ -353,28 +379,4 @@ kafkaSource.selectExpr("CAST(value as STRING)") > SELECT * FROM your_source_table > ``` -::: - -## Doris 和 Spark 列类型映射关系 - -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType<sup>1</sup> | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | -* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。 diff --git a/ecosystem/spark-doris-connector.md b/ecosystem/spark-doris-connector.md index 3d0058b68a8..4d392207110 100644 --- a/ecosystem/spark-doris-connector.md +++ b/ecosystem/spark-doris-connector.md @@ -44,51 +44,50 @@ Github: https://github.com/apache/doris-spark-connector | 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | | 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 | -## Build and Install +## How To Use -Ready to work +### Maven +``` +<dependency> + <groupId>org.apache.doris</groupId> + <artifactId>spark-doris-connector-3.4_2.12</artifactId> + <version>1.3.2</version> +</dependency> +``` +**Note** -1. Modify the `custom_env.sh.tpl` file and rename it to `custom_env.sh` +1. Please replace the corresponding Connector version according to different Spark and Scala versions. -2. Execute following command in source dir: - `sh build.sh` - Follow the prompts to enter the Scala and Spark versions you need to start compiling. +2. You can also download the relevant version jar package from [here](https://repo.maven.apache.org/maven2/org/apache/doris/). -After the compilation is successful, the target jar package will be generated in the `dist` directory, such -as: `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar`. -Copy this file to `ClassPath` in `Spark` to use `Spark-Doris-Connector`. For example, `Spark` running in `Local` mode, -put this file in the `jars/` folder. `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment -package. +### Compile -For example upload `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add hdfs file path in -spark.yarn.jars. +When compiling, you can directly run `sh build.sh`, for details, please refer to here. -1. Upload `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar` Jar to hdfs. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +You can also -``` -hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ -``` +2. Execute in the source code directory: -2. Add `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar` dependency in Cluster. +`sh build.sh` +Enter the Scala and Spark versions you need to compile according to the prompts. -``` -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar -``` +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`. +Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. -## Using Maven +For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -``` -<dependency> - <groupId>org.apache.doris</groupId> - <artifactId>spark-doris-connector-3.4_2.12</artifactId> - <version>1.3.0</version> -</dependency> -``` +For example, upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +```shell +1. Upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs. -**Notes** +hdfs dfs -mkdir /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ -Please replace the Connector version according to the different Spark and Scala versions. +2. Add the `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar + +``` ## Example @@ -113,7 +112,7 @@ FROM spark_doris; #### DataFrame -```scala +```java val dorisSparkDF = spark.read.format("doris") .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME") .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") @@ -126,7 +125,7 @@ dorisSparkDF.show(5) #### RDD -```scala +```java import org.apache.doris.spark._ val dorisSparkRDD = sc.dorisRDD( @@ -143,7 +142,7 @@ dorisSparkRDD.collect() #### pySpark -```scala +```java dorisSparkDF = spark.read.format("doris") .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME") .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") @@ -183,8 +182,8 @@ FROM YOUR_TABLE #### DataFrame(batch/stream) -```scala -## batch sink +```java +// batch sink val mockDataDF = List( (3, "440403001005", "21.cn"), (1, "4404030013005", "22.cn"), @@ -204,9 +203,9 @@ mockDataDF.write.format("doris") // .option("save_mode", SaveMode.Overwrite) .save() -## stream sink(StructuredStreaming) +// stream sink(StructuredStreaming) -### Result DataFrame with structured data, the configuration method is the same as the batch mode. +// Result DataFrame with structured data, the configuration method is the same as the batch mode. val sourceDf = spark.readStream. .format("your_own_stream_source") .load() @@ -223,7 +222,7 @@ resultDf.writeStream .start() .awaitTermination() -### There is a column value in the Result DataFrame that can be written directly, such as the value in the kafka message that conforms to the import format +// There is a column value in the Result DataFrame that can be written directly, such as the value in the kafka message that conforms to the import format val kafkaSource = spark.readStream .format("kafka") @@ -261,14 +260,14 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") | doris.request.query.timeout.s | 3600 | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit [...] | doris.request.tablet.size | Integer.MAX_VALUE | The number of Doris Tablets corresponding to an RDD Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the Spark side, but at the same time will cause greater pressure on Doris. [...] | doris.read.field | -- | List of column names in the Doris table, separated by commas [...] -| doris.batch.size | 1024 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Spark and Doris. Thereby reducing the extra time overhead caused by network delay. [...] +| doris.batch.size | 4064 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Spark and Doris. Thereby reducing the extra time overhead caused by network delay. [...] | doris.exec.mem.limit | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. [...] | doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration [...] | doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true [...] | doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.<br/>By default, all fields are written in the order of Doris table fields. [...] | doris.sink.batch.size | 100000 | Maximum number of lines in a single write BE [...] -| doris.sink.max-retries | 0 | Number of retries after writing BE failed [...] -| doris.sink.properties.format | -- | Data format of the stream load.<br/>Supported formats: csv, json, arrow(since version 1.4.0)<br/> [More Multi-parameter details](../data-operate/import/stream-load-manual.md) | +| doris.sink.max-retries | 0 | Number of retries after writing BE, Since version 1.3.0, the default value is 0, which means no retries are performed by default. When this parameter is set greater than 0, batch-level failure retries will be performed, and data of the configured size of `doris.sink.batch.size` will be cached in the Spark Executor memory. The memory allocation may need to be appropriately increased. | +| doris.sink.properties.format | -- | Data format of the stream load.<br/>Supported formats: csv, json, arrow <br/> [More Multi-parameter details](../data-operate/import/stream-load-manual.md) | | doris.sink.properties.* | -- | Import parameters for Stream Load. <br/>For example:<br/>Specify column separator: `'doris.sink.properties.column_separator' = ','`.<br/>[More parameter details](../data-operate/import/stream-load-manual.md) | | doris.sink.task.partition.size | -- | The number of partitions corresponding to the Writing task. After filtering and other operations, the number of partitions written in Spark RDD may be large, but the number of records corresponding to each Partition is relatively small, resulting in increased writing frequency and waste of computing resources. The smaller this value is set, the less Doris write frequency and less Doris merge pressure. It is generally used with dori [...] | doris.sink.task.use.repartition | false | Whether to use repartition mode to control the number of partitions written by Doris. The default value is false, and coalesce is used (note: if there is no Spark action before the write, the whole computation will be less parallel). If it is set to true, then repartition is used (note: you can set the final number of partitions at the cost of shuffle). [...] @@ -299,9 +298,35 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") | doris.request.auth.password | -- | Doris password | | doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. | -:::tip -1. In Spark SQL, when writing data through insert into, if the target table of doris contains `BITMAP` or `HLL` type data, you need to set the parameter `doris.ignore-type` to the corresponding type, and set `doris.write.fields` maps the corresponding columns, the usage is as follows: +## Doris & Spark Column Type Mapping + +| Doris Type | Spark Type | +|------------|----------------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.StringType<sup>1</sup> | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | Unsupported datatype | +| Bitmap | Unsupported datatype | + +* Note: In Connector, ` DATETIME` is mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text. + +## FAQ +1. **How to write Bitmap type** + +In Spark SQL, when writing data through insert into, if the target table of doris contains data of type `BITMAP` or `HLL`, you need to set the parameter `doris.ignore-type` to the corresponding type and map the columns through `doris.write.fields`. The usage is as follows: > BITMAP > ```sql > CREATE TEMPORARY VIEW spark_doris @@ -329,12 +354,11 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") > ); > ``` -2. Since version 1.3.0, the default value of `doris.sink.max-retries` configuration is 0, which means no retries are performed by default. - When this parameter is set greater than 0, batch-level failure retries will be performed, and data of the configured size of `doris.sink.batch.size` will be cached in the Spark Executor memory. The memory allocation may need to be appropriately increased. +2. **How to use overwrite to write?** -3. Since version 1.3.0, overwrite mode insertion is supported (only full table-level overwrite insertion is supported). The specific usage is as follows +Starting from version 1.3.0, overwrite mode writing is supported (only supports data overwriting at the full table level). The specific usage is as follows: > DataFrame -> ```scala +> ```java > resultDf.format("doris") > > .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") > // your own options @@ -348,28 +372,3 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") > SELECT * FROM your_source_table > ``` -::: - -## Doris & Spark Column Type Mapping - -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType<sup>1</sup> | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* Note: In Connector, ` DATETIME` is mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org