geonyeongkim opened a new issue, #7568:
URL: https://github.com/apache/iceberg/issues/7568

   ### Query engine
   
   Flink
   
   ### Question
   
   Hello.
   
   Attempt to load from cdc to iceberg table using flink DataStream.
   
   By the way, **there are too many small files in kb units**, is there a way 
to solve this problem?
   
   **For example, collect as much kafka data as possible and write it.**
   
   Or is there a way to use **Flink DataStream and Flink Rewrite DataFiles 
Action together?**
   
   **For example, write 10 times and then RewriteDataFiles.**
   
   
   ```kotlin
   object CpdepTacctLCowBmtApp {
   
       private const val topic = "pedw.cdc.RDWOWN.CPDEP_TACCT_L"
       private val bootstrapServers = 
KafkaConfigUtil.getBootstrapServers(KafkaClusterType.DP1)
   
       @JvmStatic
       fun main(args: Array<String>) {
   
           val env = StreamExecutionEnvironment.getExecutionEnvironment()
           env.enableCheckpointing(5000)
   
           val kafkaSource = KafkaSource.builder<CustomKafkaRecord>()
               .setBootstrapServers(bootstrapServers)
               .setTopics(topic)
               .setGroupId(CpdepTacctLCowBmtApp::class.java.name)
               .setClientIdPrefix(UUID.randomUUID().toString())
               
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
               .setDeserializer(CustomKafkaRecordDeserializationSchema())
               .build()
   
           val tableLoader = 
TableLoader.fromHadoopTable("hdfs:///user/geonyeong.kim/iceberg/cpdep_tacct_l/cow")
   
           val input: DataStream<RowData> =
               env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"iceberg-cow-bmt-source")
                   .filter { it.value != null }
                   .map { DpObjectMapper.readValue(it.value!!, 
CpdepTacctLCdcVO::class.java) }
                   .map { it.payload }
                   .map {
                       val row = GenericRowData(109)
                       if (it.op.equals("d")) {
                           row.rowKind = RowKind.DELETE
                       } else {
                           val vo = it.after!!
                           row.setField(0, StringData.fromString(vo.ACNO))
                           row.setField(1, StringData.fromString(vo.TX_DT))
                           row.setField(2, 
StringData.fromString("${vo.TX_SEQNO}"))
                           row.setField(3, 
StringData.fromString("${vo.DTL_TX_SEQNO}"))
                           row.setField(4, 
StringData.fromString("${vo.FRMW_CHNG_TMST}"))
                           row.setField(5, StringData.fromString(vo.TX_GUID))
                           row.setField(6, StringData.fromString(vo.SLIP_NO))
                           row.setField(7, 
StringData.fromString(vo.DEP_TX_STCD))
                           row.setField(8, 
StringData.fromString(vo.DEP_CANC_DVCD))
                           row.setField(9, StringData.fromString(vo.TX_TIME))
                           row.setField(10, StringData.fromString(vo.RECKN_DT))
                           row.setField(11, StringData.fromString(vo.SYS_DT))
                           row.setField(12, StringData.fromString(vo.SYS_TIME))
                           row.setField(13, 
StringData.fromString(vo.DEP_TRRC_KNCD))
                           row.setField(14, 
StringData.fromString(vo.TX_CHNL_DVCD))
                           row.setField(15, 
StringData.fromString(vo.TX_FRMN_CD))
                           row.setField(16, StringData.fromString(vo.TX_CD))
                           row.setField(17, StringData.fromString(vo.TX_BRCD))
                           row.setField(18, StringData.fromString(vo.TX_TLRNO))
                           row.setField(19, 
StringData.fromString(vo.ACCO_MGMT_BRCD))
                           row.setField(20, 
StringData.fromString(vo.XN_PB_DVCD))
                           row.setField(21, StringData.fromString(vo.CURN_CD))
                           row.setField(22, StringData.fromString(vo.ACCO_SBCD))
                           row.setField(23, StringData.fromString(vo.GDS_CD))
                           row.setField(24, 
StringData.fromString(vo.AFTR_BAL_CHNG_TX_YN))
                           row.setField(25, StringData.fromString(vo.TX_AMT))
                           row.setField(26, StringData.fromString(vo.AFTR_BAL))
                           row.setField(27, StringData.fromString(vo.CASH_AMT))
                           row.setField(28, 
StringData.fromString(vo.SMPL_TRFAMT))
                           row.setField(29, 
StringData.fromString(vo.TOT_CBPC_AMT))
                           row.setField(30, 
StringData.fromString(vo.ITLK_ALTR_AMT))
                           row.setField(31, 
StringData.fromString(vo.CSCHQ_ISSU_AMT))
                           row.setField(32, 
StringData.fromString(vo.CSCHQ_PRVS_AMT))
                           row.setField(33, 
StringData.fromString(vo.OJBB_MNRC_AMT))
                           row.setField(34, 
StringData.fromString(vo.OBNK_MNRC_AMT))
                           row.setField(35, 
StringData.fromString(vo.DEP_TRRC_CBPC_CD))
                           row.setField(36, 
StringData.fromString(vo.DRAF_CHQ_NO))
                           row.setField(37, StringData.fromString(vo.CBPC_AMT))
                           row.setField(38, 
StringData.fromString(vo.FEE_PRFR_RSCD))
                           row.setField(39, 
StringData.fromString(vo.DEP_FEE_RCV_DVCD))
                           row.setField(40, 
StringData.fromString("${vo.FEE_TX_SEQNO}"))
                           row.setField(41, StringData.fromString(vo.INSTL_MM))
                           row.setField(42, 
StringData.fromString("${vo.INSTL_NTH}"))
                           row.setField(43, 
StringData.fromString("${vo.OVRD_PPAY_DAYS}"))
                           row.setField(44, 
StringData.fromString(vo.CNGT_MSG_CD))
                           row.setField(45, StringData.fromString(vo.BBPR_CTNT))
                           row.setField(46, 
StringData.fromString(vo.PB_ARR_TRGT_YN))
                           row.setField(47, 
StringData.fromString(vo.PSWD_NSTUP_PRVS_YN))
                           row.setField(48, 
StringData.fromString(vo.NPSWD_RSCD))
                           row.setField(49, 
StringData.fromString(vo.ACCO_UZ_DVCD))
                           row.setField(50, 
StringData.fromString(vo.INPUT_ACNO))
                           row.setField(51, StringData.fromString(vo.BTCH_NO))
                           row.setField(52, 
StringData.fromString(vo.AFCL_TX_YN))
                           row.setField(53, 
StringData.fromString(vo.PRCG_APRV_TLRNO))
                           row.setField(54, 
StringData.fromString(vo.ITLK_TX_DVCD))
                           row.setField(55, 
StringData.fromString(vo.DEP_TRRC_TYCD))
                           row.setField(56, 
StringData.fromString(vo.RBF_SRVC_ID))
                           row.setField(57, 
StringData.fromString(vo.REAL_ACCP_AMT))
                           row.setField(58, StringData.fromString(vo.INT_CCAM))
                           row.setField(59, 
StringData.fromString(vo.UNPAID_BAL))
                           row.setField(60, 
StringData.fromString(vo.FRUP_BAL_TX_DT))
                           row.setField(61, 
StringData.fromString(vo.BNCF_TX_ANNT))
                           row.setField(62, 
StringData.fromString(vo.BNCF_AFTR_ANNT))
                           row.setField(63, 
StringData.fromString(vo.SLE_PAY_AMT))
                           row.setField(64, StringData.fromString(vo.MDCL_FEE))
                           row.setField(65, StringData.fromString(vo.BNLN_INT))
                           row.setField(66, 
StringData.fromString(vo.FX_ITLK_TX_NO))
                           row.setField(67, 
StringData.fromString(vo.FX_PRFR_XRT_APPC_NO))
                           row.setField(68, 
StringData.fromString(vo.FX_APLY_XRT))
                           row.setField(69, 
StringData.fromString("${vo.FX_XRT_ANOUN_NTH}"))
                           row.setField(70, 
StringData.fromString(vo.BOK_RPT_RSCD))
                           row.setField(71, 
StringData.fromString(vo.EXPRT_CPT_YN))
                           row.setField(72, StringData.fromString(vo.WCUC_AMT))
                           row.setField(73, 
StringData.fromString(vo.WCUC_SMPL_TRFAMT))
                           row.setField(74, 
StringData.fromString(vo.WCUC_ITLK_TRFAMT))
                           row.setField(75, 
StringData.fromString(vo.WCUC_CASH_AMT))
                           row.setField(76, 
StringData.fromString(vo.CNTR_MOVE_FRST_PYMN_YN))
                           row.setField(77, 
StringData.fromString(vo.SCFC_TCFND_BZWK_DVCD))
                           row.setField(78, 
StringData.fromString(vo.SCFC_TCFND_MESG_ID))
                           row.setField(79, 
StringData.fromString(vo.SCFC_TCFND_PL_INT))
                           row.setField(80, 
StringData.fromString(vo.MNRC_RQER_RLNM_DVCD))
                           row.setField(81, 
StringData.fromString(vo.MNRC_RQER_RNNO))
                           row.setField(82, 
StringData.fromString(vo.MNRC_RQER_NM))
                           row.setField(83, StringData.fromString(vo.CMS_CD))
                           row.setField(84, 
StringData.fromString(vo.CMS_RCPR_NM))
                           row.setField(85, StringData.fromString(vo.ORTR_DT))
                           row.setField(86, 
StringData.fromString("${vo.ORTR_SEQNO}"))
                           row.setField(87, 
StringData.fromString(vo.INT_CLCL_DTL_HIST_CRTN_YN))
                           row.setField(88, 
StringData.fromString(vo.SEIZ_DMAN_MGMT_NO))
                           row.setField(89, 
StringData.fromString(vo.AIQRY_CSCHQ_AMT))
                           row.setField(90, 
StringData.fromString(vo.OSDCH_TX_UNQ_NO))
                           row.setField(91, 
StringData.fromString(vo.BANK_GIRCD))
                           row.setField(92, 
StringData.fromString(vo.GDS_DTLS_CD))
                           row.setField(93, 
StringData.fromString(vo.WCUC_DEAL_PFLS_AMT))
                           row.setField(94, 
StringData.fromString(vo.WCUC_ANTT_AMT))
                           row.setField(95, StringData.fromString(vo.DMAN_NO))
                           row.setField(96, StringData.fromString(vo.TX_RSN))
                           row.setField(97, 
StringData.fromString(vo.FUND_ITMS_CD))
                           row.setField(98, 
StringData.fromString(vo.MDCL_CTRINT))
                           row.setField(99, 
StringData.fromString(vo.ITLK_TX_LDNG_YN))
                           row.setField(100, 
StringData.fromString(vo.FRST_TRNM_IPAD))
                           row.setField(101, StringData.fromString(vo.GUID))
                           row.setField(102, 
StringData.fromString(vo.SYS_FRST_REG_DTTM))
                           row.setField(103, 
StringData.fromString(vo.SYS_FRST_REG_EMPNO))
                           row.setField(104, 
StringData.fromString(vo.SYS_LAST_CHNG_DTTM))
                           row.setField(105, 
StringData.fromString(vo.SYS_LAST_CHNG_EMPNO))
                           row.setField(106, 
StringData.fromString(vo.TRSF_BANK_CD))
                           row.setField(107, 
StringData.fromString(vo.TRSF_ACNO))
                           row.setField(108, 
StringData.fromString(vo.TRSF_ACCO_DEPR_NM))
                       }
                       row
                   }
   
           FlinkSink.forRowData(input)
               .tableLoader(tableLoader)
               .tableSchema(
                   TableSchema.builder()
                       .field("ACNO", DataTypes.STRING().notNull())
                       .field("TX_DT", DataTypes.STRING().notNull())
                       .field("TX_SEQNO", DataTypes.STRING().notNull())
                       .field("DTL_TX_SEQNO", DataTypes.STRING().notNull())
                       .field("FRMW_CHNG_TMST", DataTypes.STRING())
                       .field("TX_GUID", DataTypes.STRING())
                       .field("SLIP_NO", DataTypes.STRING())
                       .field("DEP_TX_STCD", DataTypes.STRING())
                       .field("DEP_CANC_DVCD", DataTypes.STRING())
                       .field("TX_TIME", DataTypes.STRING())
                       .field("RECKN_DT", DataTypes.STRING())
                       .field("SYS_DT", DataTypes.STRING())
                       .field("SYS_TIME", DataTypes.STRING())
                       .field("DEP_TRRC_KNCD", DataTypes.STRING())
                       .field("TX_CHNL_DVCD", DataTypes.STRING())
                       .field("TX_FRMN_CD", DataTypes.STRING())
                       .field("TX_CD", DataTypes.STRING())
                       .field("TX_BRCD", DataTypes.STRING())
                       .field("TX_TLRNO", DataTypes.STRING())
                       .field("ACCO_MGMT_BRCD", DataTypes.STRING())
                       .field("XN_PB_DVCD", DataTypes.STRING())
                       .field("CURN_CD", DataTypes.STRING())
                       .field("ACCO_SBCD", DataTypes.STRING())
                       .field("GDS_CD", DataTypes.STRING())
                       .field("AFTR_BAL_CHNG_TX_YN", DataTypes.STRING())
                       .field("TX_AMT", DataTypes.STRING())
                       .field("AFTR_BAL", DataTypes.STRING())
                       .field("CASH_AMT", DataTypes.STRING())
                       .field("SMPL_TRFAMT", DataTypes.STRING())
                       .field("TOT_CBPC_AMT", DataTypes.STRING())
                       .field("ITLK_ALTR_AMT", DataTypes.STRING())
                       .field("CSCHQ_ISSU_AMT", DataTypes.STRING())
                       .field("CSCHQ_PRVS_AMT", DataTypes.STRING())
                       .field("OJBB_MNRC_AMT", DataTypes.STRING())
                       .field("OBNK_MNRC_AMT", DataTypes.STRING())
                       .field("DEP_TRRC_CBPC_CD", DataTypes.STRING())
                       .field("DRAF_CHQ_NO", DataTypes.STRING())
                       .field("CBPC_AMT", DataTypes.STRING())
                       .field("FEE_PRFR_RSCD", DataTypes.STRING())
                       .field("DEP_FEE_RCV_DVCD", DataTypes.STRING())
                       .field("FEE_TX_SEQNO", DataTypes.STRING())
                       .field("INSTL_MM", DataTypes.STRING())
                       .field("INSTL_NTH", DataTypes.STRING())
                       .field("OVRD_PPAY_DAYS", DataTypes.STRING())
                       .field("CNGT_MSG_CD", DataTypes.STRING())
                       .field("BBPR_CTNT", DataTypes.STRING())
                       .field("PB_ARR_TRGT_YN", DataTypes.STRING())
                       .field("PSWD_NSTUP_PRVS_YN", DataTypes.STRING())
                       .field("NPSWD_RSCD", DataTypes.STRING())
                       .field("ACCO_UZ_DVCD", DataTypes.STRING())
                       .field("INPUT_ACNO", DataTypes.STRING())
                       .field("BTCH_NO", DataTypes.STRING())
                       .field("AFCL_TX_YN", DataTypes.STRING())
                       .field("PRCG_APRV_TLRNO", DataTypes.STRING())
                       .field("ITLK_TX_DVCD", DataTypes.STRING())
                       .field("DEP_TRRC_TYCD", DataTypes.STRING())
                       .field("RBF_SRVC_ID", DataTypes.STRING())
                       .field("REAL_ACCP_AMT", DataTypes.STRING())
                       .field("INT_CCAM", DataTypes.STRING())
                       .field("UNPAID_BAL", DataTypes.STRING())
                       .field("FRUP_BAL_TX_DT", DataTypes.STRING())
                       .field("BNCF_TX_ANNT", DataTypes.STRING())
                       .field("BNCF_AFTR_ANNT", DataTypes.STRING())
                       .field("SLE_PAY_AMT", DataTypes.STRING())
                       .field("MDCL_FEE", DataTypes.STRING())
                       .field("BNLN_INT", DataTypes.STRING())
                       .field("FX_ITLK_TX_NO", DataTypes.STRING())
                       .field("FX_PRFR_XRT_APPC_NO", DataTypes.STRING())
                       .field("FX_APLY_XRT", DataTypes.STRING())
                       .field("FX_XRT_ANOUN_NTH", DataTypes.STRING())
                       .field("BOK_RPT_RSCD", DataTypes.STRING())
                       .field("EXPRT_CPT_YN", DataTypes.STRING())
                       .field("WCUC_AMT", DataTypes.STRING())
                       .field("WCUC_SMPL_TRFAMT", DataTypes.STRING())
                       .field("WCUC_ITLK_TRFAMT", DataTypes.STRING())
                       .field("WCUC_CASH_AMT", DataTypes.STRING())
                       .field("CNTR_MOVE_FRST_PYMN_YN", DataTypes.STRING())
                       .field("SCFC_TCFND_BZWK_DVCD", DataTypes.STRING())
                       .field("SCFC_TCFND_MESG_ID", DataTypes.STRING())
                       .field("SCFC_TCFND_PL_INT", DataTypes.STRING())
                       .field("MNRC_RQER_RLNM_DVCD", DataTypes.STRING())
                       .field("MNRC_RQER_RNNO", DataTypes.STRING())
                       .field("MNRC_RQER_NM", DataTypes.STRING())
                       .field("CMS_CD", DataTypes.STRING())
                       .field("CMS_RCPR_NM", DataTypes.STRING())
                       .field("ORTR_DT", DataTypes.STRING())
                       .field("ORTR_SEQNO", DataTypes.STRING())
                       .field("INT_CLCL_DTL_HIST_CRTN_YN", DataTypes.STRING())
                       .field("SEIZ_DMAN_MGMT_NO", DataTypes.STRING())
                       .field("AIQRY_CSCHQ_AMT", DataTypes.STRING())
                       .field("OSDCH_TX_UNQ_NO", DataTypes.STRING())
                       .field("BANK_GIRCD", DataTypes.STRING())
                       .field("GDS_DTLS_CD", DataTypes.STRING())
                       .field("WCUC_DEAL_PFLS_AMT", DataTypes.STRING())
                       .field("WCUC_ANTT_AMT", DataTypes.STRING())
                       .field("DMAN_NO", DataTypes.STRING())
                       .field("TX_RSN", DataTypes.STRING())
                       .field("FUND_ITMS_CD", DataTypes.STRING())
                       .field("MDCL_CTRINT", DataTypes.STRING())
                       .field("ITLK_TX_LDNG_YN", DataTypes.STRING())
                       .field("FRST_TRNM_IPAD", DataTypes.STRING())
                       .field("GUID", DataTypes.STRING())
                       .field("SYS_FRST_REG_DTTM", DataTypes.STRING())
                       .field("SYS_FRST_REG_EMPNO", DataTypes.STRING())
                       .field("SYS_LAST_CHNG_DTTM", DataTypes.STRING())
                       .field("SYS_LAST_CHNG_EMPNO", DataTypes.STRING())
                       .field("TRSF_BANK_CD", DataTypes.STRING())
                       .field("TRSF_ACNO", DataTypes.STRING())
                       .field("TRSF_ACCO_DEPR_NM", DataTypes.STRING())
                       .primaryKey("ACNO", "TX_DT", "TX_SEQNO", "DTL_TX_SEQNO")
                       .build()
               )
               .equalityFieldColumns(listOf("ACNO", "TX_DT", "TX_SEQNO", 
"DTL_TX_SEQNO"))
               .set("write.delete.mode", "copy-on-write")
               .set("write.update.mode", "copy-on-write")
               .set("write.merge.mode", "copy-on-write")
               .set(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES.key(), "${1024 * 
1024 * 1024L}")
               .upsert(true)
               .append()
   
           env.execute("ICEBERG COW BMT")
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to