huyuanfeng2018 commented on issue #9349:
URL: https://github.com/apache/iceberg/issues/9349#issuecomment-2162903308

   > @huyuanfeng2018
   > 
   > I see that the "closeable" trait was added in this commit, however in case 
of below implementations, the close() method of the MetricsReporter never gets 
called.
   > 
   > spark-submit command looks like below:
   > 
   > ```
   > spark-submit --name test_metrics --packages 
org.apache.iceberg:iceberg-aws-bundle:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0
  \
   > --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
   > --conf spark.sql.catalog.prod=org.apache.iceberg.spark.SparkCatalog \
   > --conf spark.sql.catalog.prod.type=hive \
   > --conf spark.sql.catalog.prod.uri=thrift://<host>:9083 \
   > --conf 
spark.sql.catalog.prod.warehouse=s3a://<mybucket>/jarSparkJobs/iceberg-metrics-test/
 \
   > --conf spark.sql.catalog.prod.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
   > --conf iceberg.engine.hive.enabled=true \
   > --conf spark.dynamicAllocation.enabled=false \
   > --conf 
spark.sql.catalog.prod.metrics-reporter-impl=package.MyCustomMetricsReporter \
   > --class in.app.TestMetrics /home/hadoop/abhishek/my_jar.jar
   > ```
   > 
   > The MetricsReporter implementation as below:
   > 
   > ```
   > class MyCustomMetricsReporter extends MetricsReporter{
   >  override def report(report: MetricsReport): Unit = {
   >   //gets called
   >   }
   > 
   >  override def close(): Unit = {
   >   //Never Gets Called
   >   }
   > }
   > ```
   > 
   > The Main Class
   > 
   > ```
   > import org.apache.spark.sql.SparkSession
   > import org.apache.spark.sql.types._
   > import org.apache.spark.sql.Row
   > 
   > import scala.util.Random
   > 
   > object TestMetrics extends App {
   > 
   >   val spark: SparkSession = SparkSession.builder()
   >     .enableHiveSupport()
   >     .getOrCreate()
   > 
   >   val schema = StructType( Array(
   >     StructField("vendor_id", LongType,true),
   >     StructField("trip_id", LongType,true),
   >     StructField("trip_distance", FloatType,true),
   >     StructField("fare_amount", DoubleType,true),
   >     StructField("store_and_fwd_flag", StringType,true)
   >   ))
   > 
   >   val randData = Random.nextDouble()
   >   println(randData)
   > 
   >   val data = Seq(
   >     Row(1: Long, 1000371: Long, 1.8f: Float, 15.32: Double, "N": String),
   >     Row(2: Long, 1000372: Long, 2.5f: Float, 22.15: Double, "N": String),
   >     Row(2: Long, 1000373: Long, 0.9f: Float, randData: Double, "N": 
String),
   >     Row(1: Long, 1000374: Long, 8.4f: Float, 42.13: Double, "Y": String)
   >   )
   > 
   >   val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema)
   >   df.createTempView("latestRecords")
   >   val mergeQuery =
   >     """
   >       |MERGE INTO prod.iceberg_sanity_test.taxis t USING latestRecords s
   >       |        ON s.vendor_id=t.vendor_id and s.trip_id=t.trip_id
   >       |   WHEN MATCHED THEN UPDATE SET *
   >       |   WHEN NOT MATCHED THEN INSERT *
   >       |""".stripMargin
   >   println(System.currentTimeMillis())
   >   spark.sql(mergeQuery)
   > 
   > }
   > ```
   
   The reporter will be called when the iceberg catalog closes.  This should be 
a behavior of spark.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to