geonyeongkim opened a new issue, #7568:
URL: https://github.com/apache/iceberg/issues/7568
### Query engine
Flink
### Question
Hello.
Attempt to load from cdc to iceberg table using flink DataStream.
By the way, **there are too many small files in kb units**, is there a way
to solve this problem?
**For example, collect as much kafka data as possible and write it.**
Or is there a way to use **Flink DataStream and Flink Rewrite DataFiles
Action together?**
**For example, write 10 times and then RewriteDataFiles.**
```kotlin
object CpdepTacctLCowBmtApp {
private const val topic = "pedw.cdc.RDWOWN.CPDEP_TACCT_L"
private val bootstrapServers =
KafkaConfigUtil.getBootstrapServers(KafkaClusterType.DP1)
@JvmStatic
fun main(args: Array<String>) {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000)
val kafkaSource = KafkaSource.builder<CustomKafkaRecord>()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(CpdepTacctLCowBmtApp::class.java.name)
.setClientIdPrefix(UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(CustomKafkaRecordDeserializationSchema())
.build()
val tableLoader =
TableLoader.fromHadoopTable("hdfs:///user/geonyeong.kim/iceberg/cpdep_tacct_l/cow")
val input: DataStream<RowData> =
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),
"iceberg-cow-bmt-source")
.filter { it.value != null }
.map { DpObjectMapper.readValue(it.value!!,
CpdepTacctLCdcVO::class.java) }
.map { it.payload }
.map {
val row = GenericRowData(109)
if (it.op.equals("d")) {
row.rowKind = RowKind.DELETE
} else {
val vo = it.after!!
row.setField(0, StringData.fromString(vo.ACNO))
row.setField(1, StringData.fromString(vo.TX_DT))
row.setField(2,
StringData.fromString("${vo.TX_SEQNO}"))
row.setField(3,
StringData.fromString("${vo.DTL_TX_SEQNO}"))
row.setField(4,
StringData.fromString("${vo.FRMW_CHNG_TMST}"))
row.setField(5, StringData.fromString(vo.TX_GUID))
row.setField(6, StringData.fromString(vo.SLIP_NO))
row.setField(7,
StringData.fromString(vo.DEP_TX_STCD))
row.setField(8,
StringData.fromString(vo.DEP_CANC_DVCD))
row.setField(9, StringData.fromString(vo.TX_TIME))
row.setField(10, StringData.fromString(vo.RECKN_DT))
row.setField(11, StringData.fromString(vo.SYS_DT))
row.setField(12, StringData.fromString(vo.SYS_TIME))
row.setField(13,
StringData.fromString(vo.DEP_TRRC_KNCD))
row.setField(14,
StringData.fromString(vo.TX_CHNL_DVCD))
row.setField(15,
StringData.fromString(vo.TX_FRMN_CD))
row.setField(16, StringData.fromString(vo.TX_CD))
row.setField(17, StringData.fromString(vo.TX_BRCD))
row.setField(18, StringData.fromString(vo.TX_TLRNO))
row.setField(19,
StringData.fromString(vo.ACCO_MGMT_BRCD))
row.setField(20,
StringData.fromString(vo.XN_PB_DVCD))
row.setField(21, StringData.fromString(vo.CURN_CD))
row.setField(22, StringData.fromString(vo.ACCO_SBCD))
row.setField(23, StringData.fromString(vo.GDS_CD))
row.setField(24,
StringData.fromString(vo.AFTR_BAL_CHNG_TX_YN))
row.setField(25, StringData.fromString(vo.TX_AMT))
row.setField(26, StringData.fromString(vo.AFTR_BAL))
row.setField(27, StringData.fromString(vo.CASH_AMT))
row.setField(28,
StringData.fromString(vo.SMPL_TRFAMT))
row.setField(29,
StringData.fromString(vo.TOT_CBPC_AMT))
row.setField(30,
StringData.fromString(vo.ITLK_ALTR_AMT))
row.setField(31,
StringData.fromString(vo.CSCHQ_ISSU_AMT))
row.setField(32,
StringData.fromString(vo.CSCHQ_PRVS_AMT))
row.setField(33,
StringData.fromString(vo.OJBB_MNRC_AMT))
row.setField(34,
StringData.fromString(vo.OBNK_MNRC_AMT))
row.setField(35,
StringData.fromString(vo.DEP_TRRC_CBPC_CD))
row.setField(36,
StringData.fromString(vo.DRAF_CHQ_NO))
row.setField(37, StringData.fromString(vo.CBPC_AMT))
row.setField(38,
StringData.fromString(vo.FEE_PRFR_RSCD))
row.setField(39,
StringData.fromString(vo.DEP_FEE_RCV_DVCD))
row.setField(40,
StringData.fromString("${vo.FEE_TX_SEQNO}"))
row.setField(41, StringData.fromString(vo.INSTL_MM))
row.setField(42,
StringData.fromString("${vo.INSTL_NTH}"))
row.setField(43,
StringData.fromString("${vo.OVRD_PPAY_DAYS}"))
row.setField(44,
StringData.fromString(vo.CNGT_MSG_CD))
row.setField(45, StringData.fromString(vo.BBPR_CTNT))
row.setField(46,
StringData.fromString(vo.PB_ARR_TRGT_YN))
row.setField(47,
StringData.fromString(vo.PSWD_NSTUP_PRVS_YN))
row.setField(48,
StringData.fromString(vo.NPSWD_RSCD))
row.setField(49,
StringData.fromString(vo.ACCO_UZ_DVCD))
row.setField(50,
StringData.fromString(vo.INPUT_ACNO))
row.setField(51, StringData.fromString(vo.BTCH_NO))
row.setField(52,
StringData.fromString(vo.AFCL_TX_YN))
row.setField(53,
StringData.fromString(vo.PRCG_APRV_TLRNO))
row.setField(54,
StringData.fromString(vo.ITLK_TX_DVCD))
row.setField(55,
StringData.fromString(vo.DEP_TRRC_TYCD))
row.setField(56,
StringData.fromString(vo.RBF_SRVC_ID))
row.setField(57,
StringData.fromString(vo.REAL_ACCP_AMT))
row.setField(58, StringData.fromString(vo.INT_CCAM))
row.setField(59,
StringData.fromString(vo.UNPAID_BAL))
row.setField(60,
StringData.fromString(vo.FRUP_BAL_TX_DT))
row.setField(61,
StringData.fromString(vo.BNCF_TX_ANNT))
row.setField(62,
StringData.fromString(vo.BNCF_AFTR_ANNT))
row.setField(63,
StringData.fromString(vo.SLE_PAY_AMT))
row.setField(64, StringData.fromString(vo.MDCL_FEE))
row.setField(65, StringData.fromString(vo.BNLN_INT))
row.setField(66,
StringData.fromString(vo.FX_ITLK_TX_NO))
row.setField(67,
StringData.fromString(vo.FX_PRFR_XRT_APPC_NO))
row.setField(68,
StringData.fromString(vo.FX_APLY_XRT))
row.setField(69,
StringData.fromString("${vo.FX_XRT_ANOUN_NTH}"))
row.setField(70,
StringData.fromString(vo.BOK_RPT_RSCD))
row.setField(71,
StringData.fromString(vo.EXPRT_CPT_YN))
row.setField(72, StringData.fromString(vo.WCUC_AMT))
row.setField(73,
StringData.fromString(vo.WCUC_SMPL_TRFAMT))
row.setField(74,
StringData.fromString(vo.WCUC_ITLK_TRFAMT))
row.setField(75,
StringData.fromString(vo.WCUC_CASH_AMT))
row.setField(76,
StringData.fromString(vo.CNTR_MOVE_FRST_PYMN_YN))
row.setField(77,
StringData.fromString(vo.SCFC_TCFND_BZWK_DVCD))
row.setField(78,
StringData.fromString(vo.SCFC_TCFND_MESG_ID))
row.setField(79,
StringData.fromString(vo.SCFC_TCFND_PL_INT))
row.setField(80,
StringData.fromString(vo.MNRC_RQER_RLNM_DVCD))
row.setField(81,
StringData.fromString(vo.MNRC_RQER_RNNO))
row.setField(82,
StringData.fromString(vo.MNRC_RQER_NM))
row.setField(83, StringData.fromString(vo.CMS_CD))
row.setField(84,
StringData.fromString(vo.CMS_RCPR_NM))
row.setField(85, StringData.fromString(vo.ORTR_DT))
row.setField(86,
StringData.fromString("${vo.ORTR_SEQNO}"))
row.setField(87,
StringData.fromString(vo.INT_CLCL_DTL_HIST_CRTN_YN))
row.setField(88,
StringData.fromString(vo.SEIZ_DMAN_MGMT_NO))
row.setField(89,
StringData.fromString(vo.AIQRY_CSCHQ_AMT))
row.setField(90,
StringData.fromString(vo.OSDCH_TX_UNQ_NO))
row.setField(91,
StringData.fromString(vo.BANK_GIRCD))
row.setField(92,
StringData.fromString(vo.GDS_DTLS_CD))
row.setField(93,
StringData.fromString(vo.WCUC_DEAL_PFLS_AMT))
row.setField(94,
StringData.fromString(vo.WCUC_ANTT_AMT))
row.setField(95, StringData.fromString(vo.DMAN_NO))
row.setField(96, StringData.fromString(vo.TX_RSN))
row.setField(97,
StringData.fromString(vo.FUND_ITMS_CD))
row.setField(98,
StringData.fromString(vo.MDCL_CTRINT))
row.setField(99,
StringData.fromString(vo.ITLK_TX_LDNG_YN))
row.setField(100,
StringData.fromString(vo.FRST_TRNM_IPAD))
row.setField(101, StringData.fromString(vo.GUID))
row.setField(102,
StringData.fromString(vo.SYS_FRST_REG_DTTM))
row.setField(103,
StringData.fromString(vo.SYS_FRST_REG_EMPNO))
row.setField(104,
StringData.fromString(vo.SYS_LAST_CHNG_DTTM))
row.setField(105,
StringData.fromString(vo.SYS_LAST_CHNG_EMPNO))
row.setField(106,
StringData.fromString(vo.TRSF_BANK_CD))
row.setField(107,
StringData.fromString(vo.TRSF_ACNO))
row.setField(108,
StringData.fromString(vo.TRSF_ACCO_DEPR_NM))
}
row
}
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.tableSchema(
TableSchema.builder()
.field("ACNO", DataTypes.STRING().notNull())
.field("TX_DT", DataTypes.STRING().notNull())
.field("TX_SEQNO", DataTypes.STRING().notNull())
.field("DTL_TX_SEQNO", DataTypes.STRING().notNull())
.field("FRMW_CHNG_TMST", DataTypes.STRING())
.field("TX_GUID", DataTypes.STRING())
.field("SLIP_NO", DataTypes.STRING())
.field("DEP_TX_STCD", DataTypes.STRING())
.field("DEP_CANC_DVCD", DataTypes.STRING())
.field("TX_TIME", DataTypes.STRING())
.field("RECKN_DT", DataTypes.STRING())
.field("SYS_DT", DataTypes.STRING())
.field("SYS_TIME", DataTypes.STRING())
.field("DEP_TRRC_KNCD", DataTypes.STRING())
.field("TX_CHNL_DVCD", DataTypes.STRING())
.field("TX_FRMN_CD", DataTypes.STRING())
.field("TX_CD", DataTypes.STRING())
.field("TX_BRCD", DataTypes.STRING())
.field("TX_TLRNO", DataTypes.STRING())
.field("ACCO_MGMT_BRCD", DataTypes.STRING())
.field("XN_PB_DVCD", DataTypes.STRING())
.field("CURN_CD", DataTypes.STRING())
.field("ACCO_SBCD", DataTypes.STRING())
.field("GDS_CD", DataTypes.STRING())
.field("AFTR_BAL_CHNG_TX_YN", DataTypes.STRING())
.field("TX_AMT", DataTypes.STRING())
.field("AFTR_BAL", DataTypes.STRING())
.field("CASH_AMT", DataTypes.STRING())
.field("SMPL_TRFAMT", DataTypes.STRING())
.field("TOT_CBPC_AMT", DataTypes.STRING())
.field("ITLK_ALTR_AMT", DataTypes.STRING())
.field("CSCHQ_ISSU_AMT", DataTypes.STRING())
.field("CSCHQ_PRVS_AMT", DataTypes.STRING())
.field("OJBB_MNRC_AMT", DataTypes.STRING())
.field("OBNK_MNRC_AMT", DataTypes.STRING())
.field("DEP_TRRC_CBPC_CD", DataTypes.STRING())
.field("DRAF_CHQ_NO", DataTypes.STRING())
.field("CBPC_AMT", DataTypes.STRING())
.field("FEE_PRFR_RSCD", DataTypes.STRING())
.field("DEP_FEE_RCV_DVCD", DataTypes.STRING())
.field("FEE_TX_SEQNO", DataTypes.STRING())
.field("INSTL_MM", DataTypes.STRING())
.field("INSTL_NTH", DataTypes.STRING())
.field("OVRD_PPAY_DAYS", DataTypes.STRING())
.field("CNGT_MSG_CD", DataTypes.STRING())
.field("BBPR_CTNT", DataTypes.STRING())
.field("PB_ARR_TRGT_YN", DataTypes.STRING())
.field("PSWD_NSTUP_PRVS_YN", DataTypes.STRING())
.field("NPSWD_RSCD", DataTypes.STRING())
.field("ACCO_UZ_DVCD", DataTypes.STRING())
.field("INPUT_ACNO", DataTypes.STRING())
.field("BTCH_NO", DataTypes.STRING())
.field("AFCL_TX_YN", DataTypes.STRING())
.field("PRCG_APRV_TLRNO", DataTypes.STRING())
.field("ITLK_TX_DVCD", DataTypes.STRING())
.field("DEP_TRRC_TYCD", DataTypes.STRING())
.field("RBF_SRVC_ID", DataTypes.STRING())
.field("REAL_ACCP_AMT", DataTypes.STRING())
.field("INT_CCAM", DataTypes.STRING())
.field("UNPAID_BAL", DataTypes.STRING())
.field("FRUP_BAL_TX_DT", DataTypes.STRING())
.field("BNCF_TX_ANNT", DataTypes.STRING())
.field("BNCF_AFTR_ANNT", DataTypes.STRING())
.field("SLE_PAY_AMT", DataTypes.STRING())
.field("MDCL_FEE", DataTypes.STRING())
.field("BNLN_INT", DataTypes.STRING())
.field("FX_ITLK_TX_NO", DataTypes.STRING())
.field("FX_PRFR_XRT_APPC_NO", DataTypes.STRING())
.field("FX_APLY_XRT", DataTypes.STRING())
.field("FX_XRT_ANOUN_NTH", DataTypes.STRING())
.field("BOK_RPT_RSCD", DataTypes.STRING())
.field("EXPRT_CPT_YN", DataTypes.STRING())
.field("WCUC_AMT", DataTypes.STRING())
.field("WCUC_SMPL_TRFAMT", DataTypes.STRING())
.field("WCUC_ITLK_TRFAMT", DataTypes.STRING())
.field("WCUC_CASH_AMT", DataTypes.STRING())
.field("CNTR_MOVE_FRST_PYMN_YN", DataTypes.STRING())
.field("SCFC_TCFND_BZWK_DVCD", DataTypes.STRING())
.field("SCFC_TCFND_MESG_ID", DataTypes.STRING())
.field("SCFC_TCFND_PL_INT", DataTypes.STRING())
.field("MNRC_RQER_RLNM_DVCD", DataTypes.STRING())
.field("MNRC_RQER_RNNO", DataTypes.STRING())
.field("MNRC_RQER_NM", DataTypes.STRING())
.field("CMS_CD", DataTypes.STRING())
.field("CMS_RCPR_NM", DataTypes.STRING())
.field("ORTR_DT", DataTypes.STRING())
.field("ORTR_SEQNO", DataTypes.STRING())
.field("INT_CLCL_DTL_HIST_CRTN_YN", DataTypes.STRING())
.field("SEIZ_DMAN_MGMT_NO", DataTypes.STRING())
.field("AIQRY_CSCHQ_AMT", DataTypes.STRING())
.field("OSDCH_TX_UNQ_NO", DataTypes.STRING())
.field("BANK_GIRCD", DataTypes.STRING())
.field("GDS_DTLS_CD", DataTypes.STRING())
.field("WCUC_DEAL_PFLS_AMT", DataTypes.STRING())
.field("WCUC_ANTT_AMT", DataTypes.STRING())
.field("DMAN_NO", DataTypes.STRING())
.field("TX_RSN", DataTypes.STRING())
.field("FUND_ITMS_CD", DataTypes.STRING())
.field("MDCL_CTRINT", DataTypes.STRING())
.field("ITLK_TX_LDNG_YN", DataTypes.STRING())
.field("FRST_TRNM_IPAD", DataTypes.STRING())
.field("GUID", DataTypes.STRING())
.field("SYS_FRST_REG_DTTM", DataTypes.STRING())
.field("SYS_FRST_REG_EMPNO", DataTypes.STRING())
.field("SYS_LAST_CHNG_DTTM", DataTypes.STRING())
.field("SYS_LAST_CHNG_EMPNO", DataTypes.STRING())
.field("TRSF_BANK_CD", DataTypes.STRING())
.field("TRSF_ACNO", DataTypes.STRING())
.field("TRSF_ACCO_DEPR_NM", DataTypes.STRING())
.primaryKey("ACNO", "TX_DT", "TX_SEQNO", "DTL_TX_SEQNO")
.build()
)
.equalityFieldColumns(listOf("ACNO", "TX_DT", "TX_SEQNO",
"DTL_TX_SEQNO"))
.set("write.delete.mode", "copy-on-write")
.set("write.update.mode", "copy-on-write")
.set("write.merge.mode", "copy-on-write")
.set(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES.key(), "${1024 *
1024 * 1024L}")
.upsert(true)
.append()
env.execute("ICEBERG COW BMT")
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]