This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 8fe5c75 [DataX] Refactor doriswriter (#6188) 8fe5c75 is described below commit 8fe5c7587773e0bfe3f75f05c5aca874c9128f77 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Jul 13 11:36:40 2021 +0800 [DataX] Refactor doriswriter (#6188) 1. Use `read_json_by_line` to load data 2. Use FE http server as the target host of stream load --- extension/DataX/README | 39 ++- extension/DataX/doriswriter/doc/doriswriter.md | 113 ++++--- extension/DataX/doriswriter/doc/mysql2doris.json | 44 +++ extension/DataX/doriswriter/pom.xml | 9 +- .../plugin/writer/doriswriter/DorisFlushBatch.java | 41 ++- .../plugin/writer/doriswriter/DorisJsonCodec.java | 50 +-- .../plugin/writer/doriswriter/DorisWriter.java | 46 +-- .../writer/doriswriter/DorisWriterEmitter.java | 349 +++++++++++++-------- .../datax/plugin/writer/doriswriter/Key.java | 60 +++- extension/DataX/init-env.sh | 18 ++ .../java/org/apache/doris/backup/RestoreJob.java | 7 +- 11 files changed, 514 insertions(+), 262 deletions(-) diff --git a/extension/DataX/README b/extension/DataX/README index 687793e..93e64e7 100644 --- a/extension/DataX/README +++ b/extension/DataX/README @@ -27,20 +27,45 @@ Because the doriswriter plug-in depends on some modules in the DataX code base, 1. `doriswriter/` - This directory is the code directory of doriswriter, and this part of the code should be in the Doris code base. + This directory is the code directory of doriswriter, and this part of the code should be in the Doris code base. + + The help doc can be found in `doriswriter/doc` 2. `init_env.sh` - The script mainly performs the following steps: + The script mainly performs the following steps: - 1. Git clone the DataX code base to the local - 2. Softlink the `doriswriter/` directory to `DataX/doriswriter`. - 3. Add `<module>doriswriter</module>` to the original `DataX/pom.xml` + 1. Git clone the DataX code base to the local + 2. Softlink the `doriswriter/` directory to `DataX/doriswriter`. + 3. Add `<module>doriswriter</module>` to the original `DataX/pom.xml` + 4. Change httpclient version from 4.5 to 4.5.13 in DataX/core/pom.xml + + > httpclient v4.5 can not handle redirect 307 correctly. - After that, developers can enter `DataX/` for development. And the changes in the `DataX/doriswriter` directory will be reflected in the `doriswriter/` directory, which is convenient for developers to submit code. + After that, developers can enter `DataX/` for development. And the changes in the `DataX/doriswriter` directory will be reflected in the `doriswriter/` directory, which is convenient for developers to submit code. ### How to build 1. Run `init_env.sh` 2. Modify code of doriswriter in `DataX/doriswriter` -3. Commit code of doriswriter in `doriswriter` +3. Build doriswriter + + 1. Build doriswriter along: + + `mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests` + + 2. Build DataX: + + `mvn package assembly:assembly -Dmaven.test.skip=true` + + The output will be in `target/datax/datax/`. + + > hdfsreader, hdfswriter and oscarwriter needs some extra jar packages. If you don't need to use these components, you can comment out the corresponding module in DataX/pom.xml. + +4. Commit code of doriswriter in `doriswriter` + +### About DataX + +DataX is an open source version of Alibaba Cloud DataWorks data integration, an offline data synchronization tool/platform widely used in Alibaba Group. DataX implements efficient data synchronization functions between various heterogeneous data sources including MySQL, Oracle, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore (OTS), MaxCompute (ODPS), Hologres, DRDS, etc. + +More details can be found at: `https://github.com/alibaba/DataX/` diff --git a/extension/DataX/doriswriter/doc/doriswriter.md b/extension/DataX/doriswriter/doc/doriswriter.md index e58eff2..7df32f6 100644 --- a/extension/DataX/doriswriter/doc/doriswriter.md +++ b/extension/DataX/doriswriter/doc/doriswriter.md @@ -53,24 +53,22 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter "writer": { "name": "doriswriter", "parameter": { - "username": "dxx", - "password": "123456", - "database": "test", - "table": "datax_test", - "column": [ - "k1", - "k2", - "v1", - "v2" - ], - "preSql": [], + "feLoadUrl": ["127.0.0.1:8030", "127.0.0.2:8030", "127.0.0.3:8030"], + "beLoadUrl": ["192.168.10.1:8040", "192.168.10.2:8040", "192.168.10.3:8040"], + "jdbcUrl": "jdbc:mysql://127.0.0.1:9030/", + "database": "db1", + "table": "t1", + "column": ["k1", "k2", "v1", "v2"], + "username": "root", + "password": "", "postSql": [], - "jdbcUrl": "jdbc:mysql://10.93.6.247:9030/", - "beLoadUrl": [ - "10.93.6.167:8041" - ], + "preSql": [], "loadProps": { - } + }, + "maxBatchRows" : 500000, + "maxBatchByteSize" : 104857600, + "labelPrefix": "my_prefix", + "lineDelimiter": "\n" } } } @@ -79,45 +77,55 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter } ``` - - ### 3.2 参数说明 -* **username** +* **jdbcUrl** - - 描述:访问Doris数据库的用户名 - - 必选:是 - - 默认值:无 + - 描述:Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。 + - 必选:是 + - 默认值:无 -* **password** +* **feLoadUrl** - - 描述:访问Doris数据库的密码 - - 必选:是 + - 描述:和 **beLoadUrl** 二选一。作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,doriswriter 将以轮询的方式访问。 + - 必选:否 - 默认值:无 -* **database** +* **beLoadUrl** - - 描述:访问Doris表的数据库名称。 - - 必选:是 + - 描述:和 **feLoadUrl** 二选一。作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 BE 节点 IP,port 是 BE 节点的 webserver_port。可以填写多个,doriswriter 将以轮询的方式访问。 + - 必选:否 - 默认值:无 -* **table** +* **username** - - 描述:访问Doris表的表名称。 - - 必选:是 - - 默认值:无 + - 描述:访问Doris数据库的用户名 + - 必选:是 + - 默认值:无 + +* **password** + + - 描述:访问Doris数据库的密码 + - 必选:否 + - 默认值:空 -* **beLoadUrl** +* **database** - - 描述:Doris BE的地址用于Stream load,可以为多个BE地址,形如`BE_ip:Be_webserver_port`。 - - 必选:是 - - 默认值:无 + - 描述:需要写入的Doris数据库名称。 + - 必选:是 + - 默认值:无 + +* **table** + + - 描述:需要写入的Doris表名称。 + - 必选:是 + - 默认值:无 * **column** - - 描述:目的表**需要写入数据**的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 - - 必选:是 - - 默认值:否 + - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 + - 必选:是 + - 默认值:否 * **preSql** @@ -131,11 +139,30 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter - 必选:否 - 默认值:无 -* **jdbcUrl** - - 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。 +* **maxBatchRows** + + - 描述:每批次导入数据的最大行数。和 **maxBatchByteSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 - 必选:否 - - 默认值:无 + - 默认值:500000 + +* **maxBatchByteSize** + + - 描述:每批次导入数据的最大数据量。和 ** maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 + - 必选:否 + - 默认值:104857600 + +* **labelPrefix** + + - 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID + 序号` 组成 + - 必选:否 + - 默认值:`datax_doris_writer_` + +* **lineDelimiter** + + - 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。 + - 必选:否 + - 默认值:`\n` * **loadProps** @@ -143,5 +170,3 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter - 必选:否 - 默认值:无 - - diff --git a/extension/DataX/doriswriter/doc/mysql2doris.json b/extension/DataX/doriswriter/doc/mysql2doris.json new file mode 100644 index 0000000..f6e9d9b --- /dev/null +++ b/extension/DataX/doriswriter/doc/mysql2doris.json @@ -0,0 +1,44 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": ["k1", "k2", "k3"], + "connection": [ + { + "jdbcUrl": ["jdbc:mysql://192.168.10.10:3306/db1"], + "table": ["t1"] + } + ], + "username": "root", + "password": "123456", + "where": "" + } + }, + "writer": { + "name": "doriswriter", + "parameter": { + "feLoadUrl": ["192.168.1.1:8030"], + "jdbcUrl": "jdbc:mysql://192.168.1.1:9030/", + "loadProps": { + }, + "database": "db1", + "table": "t3", + "column": ["k1", "k2", "k3"], + "username": "root", + "password": "12345", + "postSql": [], + "preSql": [] + } + } + } + ], + "setting": { + "speed": { + "channel": "1" + } + } + } +} diff --git a/extension/DataX/doriswriter/pom.xml b/extension/DataX/doriswriter/pom.xml index f4b4ea5..1051436 100644 --- a/extension/DataX/doriswriter/pom.xml +++ b/extension/DataX/doriswriter/pom.xml @@ -24,10 +24,13 @@ under the License. <artifactId>datax-all</artifactId> <groupId>com.alibaba.datax</groupId> <version>0.0.1-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> </parent> - <modelVersion>4.0.0</modelVersion> + <modelVersion>4.0.0</modelVersion> <artifactId>doriswriter</artifactId> + <name>doriswriter</name> + <packaging>jar</packaging> <dependencies> <dependency> @@ -65,7 +68,7 @@ under the License. <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> - <version>4.5.3</version> + <version>4.5.13</version> </dependency> </dependencies> <build> @@ -101,4 +104,4 @@ under the License. </plugin> </plugins> </build> -</project> \ No newline at end of file +</project> diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java index 7a9638d..7c56fbe 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java @@ -19,29 +19,42 @@ */ package com.alibaba.datax.plugin.writer.doriswriter; -import java.util.List; - -public class DorisFlushBatch -{ +// A wrapper class to hold a batch of loaded rows +public class DorisFlushBatch { + private String lineDelimiter; private String label; - private Long bytes; - private List<String> rows; + private long rows = 0; + private StringBuilder data = new StringBuilder(); + + public DorisFlushBatch(String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + } - public DorisFlushBatch(final String label, final Long bytes, final List<String> rows) { + public void setLabel(String label) { this.label = label; - this.bytes = bytes; - this.rows = rows; } public String getLabel() { - return this.label; + return label; + } + + public long getRows() { + return rows; + } + + public void putData(String row) { + if (data.length() > 0) { + data.append(lineDelimiter); + } + data.append(row); + rows++; } - public Long getBytes() { - return this.bytes; + public StringBuilder getData() { + return data; } - public List<String> getRows() { - return this.rows; + public long getSize() { + return data.length(); } } diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java index c73a8d5..4db273f 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java @@ -23,6 +23,7 @@ import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.fastjson.JSON; + import org.apache.commons.lang3.time.DateFormatUtils; import java.util.HashMap; @@ -30,8 +31,8 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +// Convert DataX data to json public class DorisJsonCodec { - private static String timeZone = "GMT+8"; private static TimeZone timeZoner = TimeZone.getTimeZone(timeZone); @@ -48,41 +49,46 @@ public class DorisJsonCodec { final Map<String, Object> rowMap = new HashMap<String, Object>(this.fieldNames.size()); int idx = 0; for (final String fieldName : this.fieldNames) { - rowMap.put(fieldName, this.columnConvert2String(row.getColumn(idx))); + rowMap.put(fieldName, this.convertColumn(row.getColumn(idx))); ++idx; } return JSON.toJSONString(rowMap); } - /** * convert datax internal data to string * * @param col * @return */ - private String columnConvert2String(final Column col) { + private Object convertColumn(final Column col) { if (null == col.getRawData()) { return null; } - if (Column.Type.BOOL == col.getType()) { - return String.valueOf(col.asLong()); - } - if (Column.Type.DATE != col.getType()) { - return col.asString(); - } - final DateColumn.DateType type = ((DateColumn) col).getSubType(); - if (type == DateColumn.DateType.DATE) { - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner); + Column.Type type = col.getType(); + switch (type) { + case BOOL: + case INT: + case LONG: + return col.asLong(); + case DOUBLE: + return col.asDouble(); + case STRING: + return col.asString(); + case DATE: { + final DateColumn.DateType dateType = ((DateColumn) col).getSubType(); + switch (dateType) { + case DATE: + return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner); + case DATETIME: + return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner); + default: + return col.asString(); + } + } + default: + // BAD, NULL, BYTES + return null; } - if (type == DateColumn.DateType.TIME) { - return DateFormatUtils.format(col.asDate(), "HH:mm:ss", timeZoner); - } - if (type == DateColumn.DateType.DATETIME) { - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner); - } - return null; } - - } \ No newline at end of file diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java index 5148dcd..0f4d653 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -17,6 +17,7 @@ under the License. --> */ + package com.alibaba.datax.plugin.writer.doriswriter; import com.alibaba.datax.common.element.Record; @@ -31,9 +32,11 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.druid.sql.parser.ParserException; import com.google.common.base.Strings; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; @@ -51,7 +54,7 @@ public class DorisWriter extends Writer { private DorisWriterEmitter dorisWriterEmitter; private Key keys; private DorisJsonCodec rowCodec; - + private int batchNum = 0; public Task() { } @@ -69,9 +72,10 @@ public class DorisWriter extends Writer { @Override public void startWrite(RecordReceiver recordReceiver) { + String lineDelimiter = this.keys.getLineDelimiter(); try { - List<String> buffer = new ArrayList<>(); - int batchCount = 0; + DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter); + long batchCount = 0; long batchByteSize = 0L; Record record; // loop to get record from datax @@ -79,39 +83,44 @@ public class DorisWriter extends Writer { // check column size if (record.getColumnNumber() != this.keys.getColumns().size()) { throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - String.format("config writer column info error. because the column number of reader is :%s and the column number of writer is:%s . please check you datax job config json.", record.getColumnNumber(), this.keys.getColumns().size())); + String.format("config writer column info error. because the column number of reader is :%s" + + "and the column number of writer is:%s. please check you datax job config json.", + record.getColumnNumber(), this.keys.getColumns().size())); } // codec record final String recordStr = this.rowCodec.serialize(record); // put into buffer - buffer.add(recordStr); + flushBatch.putData(recordStr); batchCount += 1; - batchByteSize += recordStr.getBytes().length; + batchByteSize += recordStr.length(); // trigger buffer if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) { // generate doris stream load label - final String label = getStreamLoadLabel(); - LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label)); - final DorisFlushBatch flushBatch = new DorisFlushBatch(label, batchByteSize, buffer); - dorisWriterEmitter.doStreamLoad(flushBatch); + flush(flushBatch, batchCount, batchByteSize); // clear buffer batchCount = 0; batchByteSize = 0L; - buffer.clear(); + flushBatch = new DorisFlushBatch(lineDelimiter); } - } - if (buffer.size() > 0) { - final DorisFlushBatch flushBatch = new DorisFlushBatch(getStreamLoadLabel(), batchByteSize, buffer); - dorisWriterEmitter.doStreamLoad(flushBatch); - } + } // end of while + if (flushBatch.getSize() > 0) { + flush(flushBatch, batchCount, batchByteSize); + } } catch (Exception e) { throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); } } + private void flush(DorisFlushBatch flushBatch, long batchCount, long batchByteSize) throws IOException { + final String label = getStreamLoadLabel(); + flushBatch.setLabel(label); + dorisWriterEmitter.doStreamLoad(flushBatch); + } + private String getStreamLoadLabel() { - return "datax_doris_writer_" + UUID.randomUUID().toString(); + String labelPrefix = this.keys.getLabelPrefix(); + return labelPrefix + UUID.randomUUID().toString() + "_" + (batchNum++); } @Override @@ -159,11 +168,10 @@ public class DorisWriter extends Writer { List<String> renderedPreSqls = this.renderPreOrPostSqls(this.keys.getPreSqlList(), this.keys.getTable()); if (!renderedPreSqls.isEmpty()) { Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("prepare execute preSqls:[{}]. doris jdbc url为:{}.", String.join(";", renderedPreSqls), jdbcUrl); + LOG.info("prepare execute preSqls:[{}]. doris jdbc url:{}.", String.join(";", renderedPreSqls), jdbcUrl); this.executeSqls(conn, renderedPreSqls); DBUtil.closeDBResources(null, null, conn); } - } @Override diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java index 70792e3..d417332 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java @@ -17,161 +17,244 @@ under the License. --> */ + package com.alibaba.datax.plugin.writer.doriswriter; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.fastjson.JSON; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; -import org.apache.http.client.config.RequestConfig; +import org.apache.http.ProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.URI; import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; -import java.util.StringJoiner; -import java.util.UUID; +// Used to load batch of rows to Doris using stream load public class DorisWriterEmitter { + private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class); + private final Key keys; + private int hostPos = 0; + private List<String> targetHosts = Lists.newArrayList(); + + public DorisWriterEmitter(final Key keys) { + this.keys = keys; + initHostList(); + } + + // get target host from config + private void initHostList() { + List<String> hosts = this.keys.getBeLoadUrlList(); + if (hosts == null || hosts.isEmpty()) { + hosts = this.keys.getFeLoadUrlList(); + } + if (hosts == null || hosts.isEmpty()) { + DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "Either beLoadUrl or feLoadUrl must be set"); + } + for (String beHost : hosts) { + targetHosts.add("http://" + beHost); + } + } + + /** + * execute doris stream load + */ + public void doStreamLoad(final DorisFlushBatch flushData) throws IOException { + long start = System.currentTimeMillis(); + final String host = this.getAvailableHost(); + if (null == host) { + throw new IOException("None of the load url can be connected."); + } + final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load"; + // do http put request and get response + final Map<String, Object> loadResult = this.doHttpPut(loadUrl, flushData); + + long cost = System.currentTimeMillis() - start; + LOG.info("StreamLoad response: " + JSON.toJSONString(loadResult) + ", cost(ms): " + cost); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new IOException("Unable to flush data to doris: unknown result status."); + } + if (loadResult.get(keyStatus).equals("Fail")) { + throw new IOException("Failed to flush data to doris.\n" + JSON.toJSONString(loadResult)); + } + } + + /** + * loop to get target host + * @return + */ + private String getAvailableHost() { + if (this.hostPos >= targetHosts.size()) { + this.hostPos = 0; + } + + while (this.hostPos < targetHosts.size()) { + final String host = targetHosts.get(hostPos); + ++this.hostPos; + if (this.tryHttpConnection(host)) { + return host; + } + } + + return null; + } + + private boolean tryHttpConnection(final String host) { + try { + final URL url = new URL(host); + final HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(1000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e) { + LOG.warn("Failed to connect to address:{} , Exception ={}", host, e); + return false; + } + } + + private Map<String, Object> doHttpPut(final String loadUrl, final DorisFlushBatch flushBatch) throws IOException { + LOG.info(String.format("Executing stream load to: '%s', size: %s, rows: %d", + loadUrl, flushBatch.getSize(), flushBatch.getRows())); + + final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(final String method) { + return true; + } + + @Override + public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException { + URI uri = this.getLocationURI(request, response, context); + String method = request.getRequestLine().getMethod(); + if (method.equalsIgnoreCase("HEAD")) { + return new HttpHead(uri); + } else if (method.equalsIgnoreCase("GET")) { + return new HttpGet(uri); + } else { + int status = response.getStatusLine().getStatusCode(); + return (HttpUriRequest)(status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri)); + } + } + }); + + try (final CloseableHttpClient httpclient = httpClientBuilder.build()) { + final HttpPut httpPut = new HttpPut(loadUrl); + final List<String> cols = this.keys.getColumns(); + if (null != cols && !cols.isEmpty()) { + httpPut.setHeader("columns", String.join(",", cols)); + } + + // put loadProps to http header + final Map<String, Object> loadProps = this.keys.getLoadProps(); + if (null != loadProps) { + for (final Map.Entry<String, Object> entry : loadProps.entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + + // set other required headers + httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); + httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword())); + httpPut.setHeader("label", flushBatch.getLabel()); + httpPut.setHeader("format", "json"); + httpPut.setHeader("read_json_by_line", "true"); + httpPut.setHeader("fuzzy_parse", "true"); + // Use ByteArrayEntity instead of StringEntity to handle Chinese correctly + httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().toString().getBytes())); + + try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) { + final int code = resp.getStatusLine().getStatusCode(); + if (HttpStatus.SC_OK != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + final HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return (Map<String, Object>) JSON.parse(EntityUtils.toString(respEntity)); + } + } + } + + private String getBasicAuthHeader(final String username, final String password) { + final String auth = username + ":" + password; + final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes()); + return "Basic " + new String(encodedAuth); + } + + // for test + public static void main(String[] args) throws IOException { + String json = "{\n" + + " \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" + + " \"column\": [\"k1\", \"k2\", \"k3\"],\n" + + " \"database\": \"db1\",\n" + + " \"jdbcUrl\": \"jdbc:mysql://127.0.0.1:9030/\",\n" + + " \"loadProps\": {\n" + + " },\n" + + " \"password\": \"12345\",\n" + + " \"postSql\": [],\n" + + " \"preSql\": [],\n" + + " \"table\": \"t1\",\n" + + " \"username\": \"root\"\n" + + " }"; + Configuration configuration = Configuration.from(json); + Key key = new Key(configuration); + + DorisWriterEmitter emitter = new DorisWriterEmitter(key); + DorisFlushBatch flushBatch = new DorisFlushBatch("\n"); + flushBatch.setLabel("test4"); + Map<String, String> row1 = Maps.newHashMap(); + row1.put("k1", "2021-02-02"); + row1.put("k2", "2021-02-02 00:00:00"); + row1.put("k3", "3"); + String rowStr1 = JSON.toJSONString(row1); + System.out.println("rows1: " + rowStr1); + flushBatch.putData(rowStr1); + + Map<String, String> row2 = Maps.newHashMap(); + row2.put("k1", "2021-02-03"); + row2.put("k2", "2021-02-03 00:00:00"); + row2.put("k3", "4"); + String rowStr2 = JSON.toJSONString(row2); + System.out.println("rows2: " + rowStr2); + flushBatch.putData(rowStr2); + + for (int i = 0; i < 500000; ++i) { + flushBatch.putData(rowStr2); + } - private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class); - ; - private final Key keys; - private int pos; - - - public DorisWriterEmitter(final Key keys) { - this.keys = keys; - } - - - /** - * execute doris stream load - */ - public void doStreamLoad(final DorisFlushBatch flushData) throws IOException { - final String host = this.getAvailableHost(); - if (null == host) { - throw new IOException("None of the host in `beLoadUrl` could be connected."); - } - final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load"; - LOG.info(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); - // do http put request - final Map<String, Object> loadResult = this.doHttpPut(loadUrl, flushData.getLabel(), this.mergeRows(flushData.getRows())); - // get response - final String keyStatus = "Status"; - if (null == loadResult || !loadResult.containsKey(keyStatus)) { - throw new IOException("Unable to flush data to doris: unknown result status."); - } - LOG.info("StreamLoad response:\n" + JSON.toJSONString(loadResult)); - if (loadResult.get(keyStatus).equals("Fail")) { - throw new IOException("Failed to flush data to doris.\n" + JSON.toJSONString(loadResult)); - } - } - - /** - * loop to get be host - * @return - */ - private String getAvailableHost() { - final List<String> hostList = this.keys.getBeLoadUrlList(); - if (this.pos >= hostList.size()) { - this.pos = 0; - } - while (this.pos < hostList.size()) { - final String host = "http://" + hostList.get(this.pos); - if (this.tryHttpConnection(host)) { - return host; - } - ++this.pos; - } - return null; - } - - private boolean tryHttpConnection(final String host) { - try { - final URL url = new URL(host); - final HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(1000); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e) { - LOG.warn("Failed to connect to address:{} , Exception ={}", host, e); - return false; - } - } - - private byte[] mergeRows(final List<String> rows) { - final StringJoiner stringJoiner = new StringJoiner(",", "[", "]"); - for (final String row : rows) { - stringJoiner.add(row); - } - return stringJoiner.toString().getBytes(StandardCharsets.UTF_8); - } - - private Map<String, Object> doHttpPut(final String loadUrl, final String label, final byte[] data) throws IOException { - LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); - final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(final String method) { - return true; - } - }); - try (final CloseableHttpClient httpclient = httpClientBuilder.build()) { - final HttpPut httpPut = new HttpPut(loadUrl); - final List<String> cols = this.keys.getColumns(); - if (null != cols && !cols.isEmpty()) { - httpPut.setHeader("columns", String.join(",", cols)); - } - // put loadProps to http header - final Map<String, Object> loadProps = this.keys.getLoadProps(); - if (null != loadProps) { - for (final Map.Entry<String, Object> entry : loadProps.entrySet()) { - httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); - } - } - httpPut.setHeader("Expect", "100-continue"); - httpPut.setHeader("label", label); - httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); - httpPut.setHeader("Authorization", this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword())); - httpPut.setHeader("format", "json"); - httpPut.setHeader("strip_outer_array", "true"); - httpPut.setEntity(new ByteArrayEntity(data)); - httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); - try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) { - final int code = resp.getStatusLine().getStatusCode(); - if (HttpStatus.SC_OK != code) { - LOG.warn("Request failed with code:{}", code); - return null; - } - final HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; - } - return (Map<String, Object>) JSON.parse(EntityUtils.toString(respEntity)); - } - } - } - - private String getBasicAuthHeader(final String username, final String password) { - final String auth = username + ":" + password; - final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes()); - return "Basic " + new String(encodedAuth); - } - - -} \ No newline at end of file + emitter.doStreamLoad(flushBatch); + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java index f670e59..fcc1c15 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java @@ -22,26 +22,38 @@ package com.alibaba.datax.plugin.writer.doriswriter; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.google.common.base.Strings; import java.io.Serializable; import java.util.List; import java.util.Map; -public class Key implements Serializable -{ +public class Key implements Serializable { + public static final String FE_LOAD_URL = "feLoadUrl"; + public static final String BE_LOAD_URL = "beLoadUrl"; public static final String JDBC_URL = "jdbcUrl"; + public static final String DATABASE = "database"; public static final String TABLE = "table"; + public static final String COLUMN = "column"; + public static final String USERNAME = "username"; public static final String PASSWORD = "password"; - public static final String BE_LOAD_URL = "beLoadUrl"; - public static final String COLUMN = "column"; + public static final String PRE_SQL = "preSql"; public static final String POST_SQL = "postSql"; + public static final String LOAD_PROPS = "loadProps"; public static final String MAX_BATCH_ROWS = "maxBatchRows"; public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize"; + public static final String LABEL_PREFIX = "labelPrefix"; + public static final String LINE_DELIMITER = "lineDelimiter"; private final Configuration options; + + private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000; + private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; // 100MB + private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; + private static final String DEFAULT_LINE_DELIMITER = "\n"; public Key(final Configuration options) { this.options = options; @@ -69,13 +81,17 @@ public class Key implements Serializable } public String getPassword() { - return this.options.getString(PASSWORD); + return Strings.nullToEmpty(this.options.getString(PASSWORD)); } public List<String> getBeLoadUrlList() { return this.options.getList(BE_LOAD_URL, String.class); } + public List<String> getFeLoadUrlList() { + return this.options.getList(FE_LOAD_URL, String.class); + } + public List<String> getColumns() { return this.options.getList(COLUMN, String.class); } @@ -92,32 +108,44 @@ public class Key implements Serializable return this.options.getMap(LOAD_PROPS); } - public int getBatchRows() { - final Integer rows = this.options.getInt(MAX_BATCH_ROWS); - return (null == rows) ? 500000 : rows; + public long getBatchRows() { + return this.options.getLong(MAX_BATCH_ROWS, DEFAULT_MAX_BATCH_ROWS); } public long getBatchByteSize() { - final Long size = this.options.getLong(MAX_BATCH_BYTE_SIZE); - return (null == size) ? 94371840L : size; + return this.options.getLong(MAX_BATCH_BYTE_SIZE, DEFAULT_MAX_BATCH_BYTE_SIZE); } + public String getLabelPrefix() { + return this.options.getString(LABEL_PREFIX, DEFAULT_LABEL_PREFIX); + } + + public String getLineDelimiter() { + return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER); + } private void validateStreamLoadUrl() { - final List<String> urlList = this.getBeLoadUrlList(); + List<String> urlList = this.getBeLoadUrlList(); + if (urlList == null) { + urlList = this.getFeLoadUrlList(); + } + if (urlList == null || urlList.isEmpty()) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "Either beLoadUrl or feLoadUrl must be set"); + } + for (final String host : urlList) { if (host.split(":").length < 2) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "loadUrl的格式不正确,请输入 `be_ip:be_http_ip;be_ip:be_http_ip`。"); + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "Invalid load url format. IF use FE hosts, should be like: fe_host:fe_http_port." + + " If use BE hosts, should be like: be_host:be_webserver_port"); } } } private void validateRequired() { - final String[] requiredOptionKeys = new String[] { USERNAME, PASSWORD, DATABASE, TABLE, COLUMN, BE_LOAD_URL }; + final String[] requiredOptionKeys = new String[] { JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN }; for (final String optionKey : requiredOptionKeys) { this.options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); } } - - -} \ No newline at end of file +} diff --git a/extension/DataX/init-env.sh b/extension/DataX/init-env.sh index e6a4af8..5231110 100755 --- a/extension/DataX/init-env.sh +++ b/extension/DataX/init-env.sh @@ -30,6 +30,8 @@ DATAX_GITHUB=https://github.com/alibaba/DataX.git DORISWRITER_DIR=$DATAX_EXTENSION_HOME/doriswriter DATAX_GIT_DIR=$DATAX_EXTENSION_HOME/DataX/ DATAX_POM=$DATAX_EXTENSION_HOME/DataX/pom.xml +DATAX_PACKAGE=$DATAX_EXTENSION_HOME/DataX/package.xml +DATAX_CORE_POM=$DATAX_EXTENSION_HOME/DataX/core/pom.xml if [ ! -d $DATAX_GIT_DIR ]; then echo "Clone DataX from $DATAX_GITHUB" @@ -52,4 +54,20 @@ else echo "doriswriter module exists in $DATAX_POM" fi +if [ `grep -c "doriswriter" $DATAX_PACKAGE` -eq 0 ]; then + echo "No doriswriter module in $DATAX_PACKAGE, add it" + cp $DATAX_PACKAGE ${DATAX_PACKAGE}.orig + sed -i "s/<\/fileSets>/ <fileSet>\n <directory>doriswriter\/target\/datax\/<\/directory>\n <includes>\n <include>**\/*.*<\/include>\n <\/includes>\n <outputDirectory>datax<\/outputDirectory>\n <\/fileSet>\n <\/fileSets>/g" $DATAX_PACKAGE +else + echo "doriswriter module exists in $DATAX_PACKAGE" +fi + +if [ `grep -c "4.5.13" $DATAX_CORE_POM` -eq 0 ]; then + echo "No httpclient 4.5.13 in $DATAX_CORE_POM, add it" + cp $DATAX_CORE_POM ${DATAX_CORE_POM}.orig + sed -i ":a;N;s/<artifactId>httpclient<\/artifactId>\n <version>4.5<\/version>/<artifactId>httpclient<\/artifactId>\n <version>4.5.13<\/version>/g" $DATAX_CORE_POM +else + echo "httpclient 4.5.13 exists in $DATAX_CORE_POM" +fi + echo "Finish DataX environment initialization" diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 7f96fe7..a88f3c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -70,9 +70,6 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -83,6 +80,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Table.Cell; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -507,7 +507,6 @@ public class RestoreJob extends AbstractJob { } } - // Check and prepare meta objects. AgentBatchTask batchTask = new AgentBatchTask(); db.readLock(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org