lengkristy commented on issue #8636:
URL: https://github.com/apache/iceberg/issues/8636#issuecomment-1734696190

   @nastra Sure, this is gradle dependencies:
   
   implementation 'org.apache.iceberg:iceberg-core:1.3.1'
       implementation 'org.apache.iceberg:iceberg-parquet:1.3.1'
       implementation 'org.apache.iceberg:iceberg-data:1.3.1'
       implementation 'org.apache.iceberg:iceberg-hive-runtime:1.3.1'
       implementation ('org.apache.hadoop:hadoop-aws:3.3.0'){
           exclude(group:'com.amazonaws',module: 'aws-java-sdk-bundle')
       }
       implementation 'org.apache.hadoop:hadoop-common:3.3.0'
       implementation 'software.amazon.awssdk:s3:2.20.131'
       implementation 'software.amazon.awssdk:glue:2.17.122'
       implementation 'software.amazon.awssdk:sts:2.17.122'
       implementation 'software.amazon.awssdk:dynamodb:2.17.122'
   
   and write iceberg table data with 4 concurrent:
   
   below code:
   //write data
           PartitionedFanoutWriter<Record> partitionedFanoutWriter = null;
           try {
               PartitionKey partitionKey = new PartitionKey(table.spec(), 
table.spec().schema());
               InternalRecordWrapper recordWrapper = new 
InternalRecordWrapper(table.schema().asStruct());
               // partitionedFanoutWriter will auto partitioned record and 
create the partitioned writer
               partitionedFanoutWriter = new 
PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET, 
appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
                   @Override
                   protected PartitionKey partition(Record record) {
                       try {
                           partitionKey.partition(recordWrapper.wrap(record));
                       }catch (Exception e){
                           System.out.println("write partition data error:" + 
e.getMessage() + ExceptionUtil.convertStackToString(e));
                       }
                       return partitionKey;
                   }
               };
               for (PipelineData data : records) {
                   try {
                       if (data.getRecord() != null)
                           partitionedFanoutWriter.write(data.getRecord());
                   } catch (Exception e) {
                       System.out.println("write data error:" + e.getMessage() 
+ ExceptionUtil.convertStackToString(e));
                       if (logger != null) {
                           logger.log("can not write iceberg table :" + 
data.getRecord().toString());
                           logger.log(ExceptionUtil.convertStackToString(e));
                           //record the data
                           logger.log(records.toString());
                       }
                       ProcessKinesisRecords.errRecords.add(data);
                   }
               }
           } catch (Exception e) {
               //
               System.out.println("write data error:" + e.getMessage() + 
ExceptionUtil.convertStackToString(e));
               if (logger != null) {
                   logger.log("can not write data to iceberg table:");
                   logger.log(ExceptionUtil.convertStackToString(e));
               }
               throw e;
           }finally {
               if (partitionedFanoutWriter != null){
                   partitionedFanoutWriter.close();
               }
           }
   
           //commit data
           try {
               System.out.println("start commit file,file size:" + 
partitionedFanoutWriter.dataFiles()[0].recordCount());
               AppendFiles appendFiles = table.newAppend();
               // submit datafiles to the table
               
Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);
               // submit snapshot
               appendFiles.apply();
               appendFiles.commit();
               System.out.println("committed file");
           } catch (Throwable e) {
               if (logger != null) {
                   logger.log("ERROR: can not commit data to iceberg table:" + 
e.getMessage());
               }
               System.out.println("can not commit");
               ProcessKinesisRecords.errRecords.addAll(records);
               e.printStackTrace();
               throw e;
           }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to