adevore3 commented on issue #9738:
URL: https://github.com/apache/iceberg/issues/9738#issuecomment-1954798452

   Hi, I was trying to create a code sample but got distracted. I've tried to 
copy all the relevant code pieces here, 1 thing I couldn't do is the protobuf 
file. It seems to be failing on the `insertIntoTable` function call based on 
the stacktrace
   ```
   import com.indeed.osiris.iceberg.exporter.config.OutputTableDefinition
   import com.indeed.osiris.iceberg.exporter.datasources.Datasource
   import 
com.indeed.osiris.iceberg.exporter.datasources.JobArchiveOsirisDatasource.getClass
   import com.indeed.spark.hivesupport.SessionBuilder
   import org.apache.hadoop.fs.Path
   import org.apache.iceberg.Table
   import org.apache.iceberg.spark.SparkCatalog
   import org.apache.spark.sql.{DataFrame, SparkSession}
   import org.joda.time.DateTime
   import org.slf4j.LoggerFactory
   import org.springframework.boot.builder.SpringApplicationBuilder
   import org.springframework.boot.{CommandLineRunner, WebApplicationType}
   import org.springframework.context.annotation.Bean
   
   class OsirisIcebergExporter(
                                spark: SparkSession,
                                datasource: Datasource,
                                inputCheckpointBucket: Option[String],
                                inputCheckpointMillis: Option[Long],
                                outputTableDefinition: OutputTableDefinition,
                              ) extends CommandLineRunner {
   
     import OsirisIcebergExporter._
   
     private val newHighWaterMark = 
inputCheckpointMillis.getOrElse(System.currentTimeMillis())
     private val inputCheckpointPath = inputCheckpointBucket.map(b => new 
Path(s"s3a://$b/tmp/osiris_iceberg/jatt3/$newHighWaterMark"))
     private val icebergCatalog = 
spark.sessionState.catalogManager.catalog("outbox").asInstanceOf[SparkCatalog].icebergCatalog()
   
     override def run(rawArgs: String*): Unit = {
       try {
         log.info(s"running OsirisIcebergExporter. new high water mark will is 
$newHighWaterMark")
         val df = inputDf(datasource.dataframe(spark))
         df.createOrReplaceTempView("input")
         val icebergTable = populateTable(df)
   
       } finally {
         // close connections
       }
     }
   
     def populateTable(df: DataFrame): Table = {
       // df not used in this example since we'll read from the temp view
       val t = icebergCatalog.loadTable(outputTableDefinition.icebergIdentifier)
       mergeIntoTable(spark, "input", t)
       t
     }
   
     def mergeIntoTable(spark: SparkSession, view: String, icebergTable: 
Table): Unit = {
       val currentSnapshot = icebergTable.currentSnapshot()
   
       log.info(s"mergeIntoTable >>> SET spark.wap.id = 
${newHighWaterMark}_delete")
       spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_delete")
       deleteFromTable(spark, view)
   
       log.info(s"mergeIntoTable >>> SET spark.wap.id = 
${newHighWaterMark}_insert")
       spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_insert")
       insertIntoTable(spark, view)
   
       log.info("mergeIntoTable >>> RESET spark.wap.id")
       spark.sql(s"RESET spark.wap.id")
   
       log.info("mergeIntoTable >>> icebergTable.refresh()")
       icebergTable.refresh()
     }
   
     def deleteFromTable(spark: SparkSession, view: String): Unit = {
       val identifierFieldTuple = "(jobId)"
       spark.sql(
         s"""
            |DELETE FROM outbox.osiris_iceberg.jatt3
            |WHERE $identifierFieldTuple in (select $identifierFieldTuple from 
$view)
            |""".stripMargin)
     }
   
     def insertIntoTable(spark: SparkSession, view: String): Unit = {
       spark.sql(
         s"""
            |INSERT INTO (jobId)
            |SELECT * FROM $view ORDER BY outbox.bucket(512, jobId), jobId
            |""".stripMargin)
     }
   
     def inputDf(sourceDf: DataFrame): DataFrame = {
       inputCheckpointPath.map(b => {
           val fileSystem = 
b.getFileSystem(spark.sparkContext.hadoopConfiguration)
           if (!fileSystem.exists(b)) {
             //write the output, and then read it back
             //which seems crazy, but it makes recovery faster if downstream 
stages fail
             //and an osiris partition needs to be reread (which is super slow)
             //dont just checkpoint, cus then wed still only have 256 input 
partitions
             sourceDf.write
               .option("maxRecordsPerFile", 500000) //this should probably 
actually be configurable. assume 1k rows, goal is 512-1gb files
               .orc(b.toString)
           }
   
           spark.createDataFrame(spark.read.orc(b.toString).rdd, 
sourceDf.schema)
         })
         .getOrElse({
           //if we didnt checkpoint in s3, we should cache
           //not just optimization, needed for correctness
           // (in case new records come into osiris between the delete and 
insert)
           sourceDf.cache()
           sourceDf
         })
     }
   }
   
   object OsirisIcebergExporter {
   
     private val log = LoggerFactory.getLogger(getClass)
   
     def main(args: Array[String]): Unit = {
       java.security.Security.setProperty("networkaddress.cache.ttl", "60")
       new SpringApplicationBuilder(classOf[OsirisIcebergExporter])
         .web(WebApplicationType.NONE)
         .run(args: _*)
         .close()
     }
   }
   
   @Bean
   def spark(): SparkSession = {
     val icebergCatalogConfig = Map(
       s"spark.sql.catalog.outbox" -> "org.apache.iceberg.spark.SparkCatalog",
       s"spark.sql.catalog.outbox.io-impl" -> 
"org.apache.iceberg.aws.s3.S3FileIO",
       s"spark.sql.catalog.outbox.catalog-impl" -> 
"org.apache.iceberg.aws.glue.GlueCatalog",
       s"spark.sql.catalog.outbox.glue.id" -> "936411429724",
       //"spark.sql.catalog.iceberg.s3.staging-dir" -> "/tmp/" //should this be 
configurable? is it even needed?
     )
     SessionBuilder(s"OsirisIcebergExporter-jatt3")
       .withMoreProperties(icebergCatalogConfig)
       .build
   }
   
   import org.apache.log4j.Level
   
   case class SessionBuilder(
                              appName: String,
                              logLevel: Level = Level.INFO,
                              properties: Map[String, String] = Map.empty,
                            ) {
   
     def withMoreProperties(properties: Map[String, String]): SessionBuilder = 
copy(properties = this.properties ++ properties)
   
     def builder: SparkSession.Builder = {
       SparkSession.builder
         .appName(appName)
         .enableHiveSupport
         .config("spark.hadoop.hive.exec.dynamic.partition", "true")
         .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
         .config("spark.hadoop.hive.exec.max.dynamic.partitions", "2048")
         
.config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", 
"false")
         .config("spark.kryoserializer.buffer.max", "1g")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .config("spark.sql.hive.metastorePartitionPruning", "true")
         .config("spark.sql.orc.cache.stripe.details.size", "10000")
         .config("spark.sql.orc.enabled", "true")
         .config("spark.sql.orc.enableVectorizedReader", "true")
         .config("spark.sql.orc.filterPushdown", "true")
         .config("spark.sql.orc.impl", "native")
         .config("spark.sql.orc.splits.include.file.footer", "true")
         .config("spark.sql.parquet.filterPushdown", "true")
         .config("spark.sql.parquet.mergeSchema", "false")
         .config("spark.sql.session.timeZone", "-06:00")
         .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
         .config("spark.ui.view.acls", "*")
     }
   
     def build: SparkSession = {
       val sparkSession = builder.getOrCreate
   
       sparkSession.sparkContext.setLogLevel(logLevel.toString)
   
       sparkSession
     }
   
   }
   
   @Bean
   def datasource(): Datasource = {
     val startTime: Long = DateTime.now().getMillis
     val inputPartitions = 4
     val useDfc = false
     JobArchiveOsirisDatasource(startTime, 
Option(inputPartitions).map(_.toInt), useDfc)
   }
   
   import com.example.proto.JobArchiveEntry
   
   
   import org.apache.spark.sql.types.{StructField, StructType}
   
   trait Datasource {
     def dataframe(spark: SparkSession): DataFrame
   
     def setNullable(df: DataFrame, fieldName: String, nullable: Boolean) : 
DataFrame = {
       val schema = df.schema
       val newSchema = StructType(schema.map {
         case StructField( c, t, _, m) if c.equals(fieldName) => StructField( 
c, t, nullable = nullable, m)
         case y: StructField => y
       })
       df.sqlContext.createDataFrame( df.rdd, newSchema )
     }
   }
   
   case class OsirisRowv2(value: Array[Byte])
   
   case class JobArchiveOsirisDatasource(minTs: Long, inputPartitions: 
Option[Int], useDfc: Boolean) extends Datasource {
     def dataframe(spark: SparkSession): DataFrame = {
       import spark.implicits._
   
       val dfcOptions: Map[String, String] =
         if (useDfc)
           Map(
             "osiris.dfc"-> "true",
             "osiris.dfc.datadir"-> "/osiris/",
             "osiris.dfc.readahead"-> "4",
             "osiris.s3.bucket" -> "cmhprod3-cdcosiris"
           )
         else
           Map(
             "servers" -> "osirisserver:26238"
           )
   
       val rawDf = spark.read
         .format("osirisv2")
         .option("keys", "jobId")
         .option("keysplitter", "vlong-1000")
         .option("table", "jobarchive_new")
         .option("osiris.s3.region", "us-east-2")
         .option("osiris.request.timeout", "180000")
         .option("osiris.retry.timeout", "360000")
         .option("minTs", minTs)
         .options(dfcOptions)
         .load()
   
       val df = rawDf
         .as[OsirisRowv2]
         .map(r => RowTransformer.parseRow(r))
         .toDF()
       inputPartitions
         .map(ps => df.repartition(ps))
         .getOrElse(df)
     }
   }
   
   object RowTransformer extends Serializable {
   
     val transcoder = new ByteArrayTranscoder()
   
     def parseRow(row: OsirisRowv2): JobInfo = {
       val bytes = transcoder.toBytes(row.value)
       val jobArchiveEntry = JobArchiveEntry.parseFrom(bytes)
   
       JobInfo(
         jobArchiveEntry.getJobId,
         jobArchiveEntry.getTitleId,
       )
     }
   
   }
   case class JobInfo(
                       jobId: Long,
                       titleId: Int,
                     )
   
   class ByteArrayTranscoder {
     def fromBytes(bytes: Array[Byte]) = bytes
   
     def toBytes(bytes: Array[Byte]) = bytes
   
     def equals(o: AnyRef): Boolean = {
       if (this eq o) return true
       o != null && (getClass eq o.getClass)
     }
   
     override def toString = "byte"
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to