adevore3 commented on issue #9738: URL: https://github.com/apache/iceberg/issues/9738#issuecomment-1954798452
Hi, I was trying to create a code sample but got distracted. I've tried to copy all the relevant code pieces here, 1 thing I couldn't do is the protobuf file. It seems to be failing on the `insertIntoTable` function call based on the stacktrace ``` import com.indeed.osiris.iceberg.exporter.config.OutputTableDefinition import com.indeed.osiris.iceberg.exporter.datasources.Datasource import com.indeed.osiris.iceberg.exporter.datasources.JobArchiveOsirisDatasource.getClass import com.indeed.spark.hivesupport.SessionBuilder import org.apache.hadoop.fs.Path import org.apache.iceberg.Table import org.apache.iceberg.spark.SparkCatalog import org.apache.spark.sql.{DataFrame, SparkSession} import org.joda.time.DateTime import org.slf4j.LoggerFactory import org.springframework.boot.builder.SpringApplicationBuilder import org.springframework.boot.{CommandLineRunner, WebApplicationType} import org.springframework.context.annotation.Bean class OsirisIcebergExporter( spark: SparkSession, datasource: Datasource, inputCheckpointBucket: Option[String], inputCheckpointMillis: Option[Long], outputTableDefinition: OutputTableDefinition, ) extends CommandLineRunner { import OsirisIcebergExporter._ private val newHighWaterMark = inputCheckpointMillis.getOrElse(System.currentTimeMillis()) private val inputCheckpointPath = inputCheckpointBucket.map(b => new Path(s"s3a://$b/tmp/osiris_iceberg/jatt3/$newHighWaterMark")) private val icebergCatalog = spark.sessionState.catalogManager.catalog("outbox").asInstanceOf[SparkCatalog].icebergCatalog() override def run(rawArgs: String*): Unit = { try { log.info(s"running OsirisIcebergExporter. new high water mark will is $newHighWaterMark") val df = inputDf(datasource.dataframe(spark)) df.createOrReplaceTempView("input") val icebergTable = populateTable(df) } finally { // close connections } } def populateTable(df: DataFrame): Table = { // df not used in this example since we'll read from the temp view val t = icebergCatalog.loadTable(outputTableDefinition.icebergIdentifier) mergeIntoTable(spark, "input", t) t } def mergeIntoTable(spark: SparkSession, view: String, icebergTable: Table): Unit = { val currentSnapshot = icebergTable.currentSnapshot() log.info(s"mergeIntoTable >>> SET spark.wap.id = ${newHighWaterMark}_delete") spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_delete") deleteFromTable(spark, view) log.info(s"mergeIntoTable >>> SET spark.wap.id = ${newHighWaterMark}_insert") spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_insert") insertIntoTable(spark, view) log.info("mergeIntoTable >>> RESET spark.wap.id") spark.sql(s"RESET spark.wap.id") log.info("mergeIntoTable >>> icebergTable.refresh()") icebergTable.refresh() } def deleteFromTable(spark: SparkSession, view: String): Unit = { val identifierFieldTuple = "(jobId)" spark.sql( s""" |DELETE FROM outbox.osiris_iceberg.jatt3 |WHERE $identifierFieldTuple in (select $identifierFieldTuple from $view) |""".stripMargin) } def insertIntoTable(spark: SparkSession, view: String): Unit = { spark.sql( s""" |INSERT INTO (jobId) |SELECT * FROM $view ORDER BY outbox.bucket(512, jobId), jobId |""".stripMargin) } def inputDf(sourceDf: DataFrame): DataFrame = { inputCheckpointPath.map(b => { val fileSystem = b.getFileSystem(spark.sparkContext.hadoopConfiguration) if (!fileSystem.exists(b)) { //write the output, and then read it back //which seems crazy, but it makes recovery faster if downstream stages fail //and an osiris partition needs to be reread (which is super slow) //dont just checkpoint, cus then wed still only have 256 input partitions sourceDf.write .option("maxRecordsPerFile", 500000) //this should probably actually be configurable. assume 1k rows, goal is 512-1gb files .orc(b.toString) } spark.createDataFrame(spark.read.orc(b.toString).rdd, sourceDf.schema) }) .getOrElse({ //if we didnt checkpoint in s3, we should cache //not just optimization, needed for correctness // (in case new records come into osiris between the delete and insert) sourceDf.cache() sourceDf }) } } object OsirisIcebergExporter { private val log = LoggerFactory.getLogger(getClass) def main(args: Array[String]): Unit = { java.security.Security.setProperty("networkaddress.cache.ttl", "60") new SpringApplicationBuilder(classOf[OsirisIcebergExporter]) .web(WebApplicationType.NONE) .run(args: _*) .close() } } @Bean def spark(): SparkSession = { val icebergCatalogConfig = Map( s"spark.sql.catalog.outbox" -> "org.apache.iceberg.spark.SparkCatalog", s"spark.sql.catalog.outbox.io-impl" -> "org.apache.iceberg.aws.s3.S3FileIO", s"spark.sql.catalog.outbox.catalog-impl" -> "org.apache.iceberg.aws.glue.GlueCatalog", s"spark.sql.catalog.outbox.glue.id" -> "936411429724", //"spark.sql.catalog.iceberg.s3.staging-dir" -> "/tmp/" //should this be configurable? is it even needed? ) SessionBuilder(s"OsirisIcebergExporter-jatt3") .withMoreProperties(icebergCatalogConfig) .build } import org.apache.log4j.Level case class SessionBuilder( appName: String, logLevel: Level = Level.INFO, properties: Map[String, String] = Map.empty, ) { def withMoreProperties(properties: Map[String, String]): SessionBuilder = copy(properties = this.properties ++ properties) def builder: SparkSession.Builder = { SparkSession.builder .appName(appName) .enableHiveSupport .config("spark.hadoop.hive.exec.dynamic.partition", "true") .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.hadoop.hive.exec.max.dynamic.partitions", "2048") .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") .config("spark.kryoserializer.buffer.max", "1g") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.hive.metastorePartitionPruning", "true") .config("spark.sql.orc.cache.stripe.details.size", "10000") .config("spark.sql.orc.enabled", "true") .config("spark.sql.orc.enableVectorizedReader", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.orc.impl", "native") .config("spark.sql.orc.splits.include.file.footer", "true") .config("spark.sql.parquet.filterPushdown", "true") .config("spark.sql.parquet.mergeSchema", "false") .config("spark.sql.session.timeZone", "-06:00") .config("spark.sql.sources.partitionOverwriteMode", "dynamic") .config("spark.ui.view.acls", "*") } def build: SparkSession = { val sparkSession = builder.getOrCreate sparkSession.sparkContext.setLogLevel(logLevel.toString) sparkSession } } @Bean def datasource(): Datasource = { val startTime: Long = DateTime.now().getMillis val inputPartitions = 4 val useDfc = false JobArchiveOsirisDatasource(startTime, Option(inputPartitions).map(_.toInt), useDfc) } import com.example.proto.JobArchiveEntry import org.apache.spark.sql.types.{StructField, StructType} trait Datasource { def dataframe(spark: SparkSession): DataFrame def setNullable(df: DataFrame, fieldName: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(fieldName) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) df.sqlContext.createDataFrame( df.rdd, newSchema ) } } case class OsirisRowv2(value: Array[Byte]) case class JobArchiveOsirisDatasource(minTs: Long, inputPartitions: Option[Int], useDfc: Boolean) extends Datasource { def dataframe(spark: SparkSession): DataFrame = { import spark.implicits._ val dfcOptions: Map[String, String] = if (useDfc) Map( "osiris.dfc"-> "true", "osiris.dfc.datadir"-> "/osiris/", "osiris.dfc.readahead"-> "4", "osiris.s3.bucket" -> "cmhprod3-cdcosiris" ) else Map( "servers" -> "osirisserver:26238" ) val rawDf = spark.read .format("osirisv2") .option("keys", "jobId") .option("keysplitter", "vlong-1000") .option("table", "jobarchive_new") .option("osiris.s3.region", "us-east-2") .option("osiris.request.timeout", "180000") .option("osiris.retry.timeout", "360000") .option("minTs", minTs) .options(dfcOptions) .load() val df = rawDf .as[OsirisRowv2] .map(r => RowTransformer.parseRow(r)) .toDF() inputPartitions .map(ps => df.repartition(ps)) .getOrElse(df) } } object RowTransformer extends Serializable { val transcoder = new ByteArrayTranscoder() def parseRow(row: OsirisRowv2): JobInfo = { val bytes = transcoder.toBytes(row.value) val jobArchiveEntry = JobArchiveEntry.parseFrom(bytes) JobInfo( jobArchiveEntry.getJobId, jobArchiveEntry.getTitleId, ) } } case class JobInfo( jobId: Long, titleId: Int, ) class ByteArrayTranscoder { def fromBytes(bytes: Array[Byte]) = bytes def toBytes(bytes: Array[Byte]) = bytes def equals(o: AnyRef): Boolean = { if (this eq o) return true o != null && (getClass eq o.getClass) } override def toString = "byte" } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org