allen-abhishekkr commented on issue #9349:
URL: https://github.com/apache/iceberg/issues/9349#issuecomment-2153099416

   @huyuanfeng2018 
   
   I see that the "closeable" trait was added in this commit, however in case 
of below implementations, the close() method of the MetricsReporter never gets 
called. 
   
    spark-submit command looks like below:
   `
   spark-submit --name test_metrics --packages 
org.apache.iceberg:iceberg-aws-bundle:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0
  \
   --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
   --conf spark.sql.catalog.prod=org.apache.iceberg.spark.SparkCatalog \
   --conf spark.sql.catalog.prod.type=hive \
   --conf spark.sql.catalog.prod.uri=thrift://<host>:9083 \
   --conf 
spark.sql.catalog.prod.warehouse=s3a://<mybucket>/jarSparkJobs/iceberg-metrics-test/
 \
   --conf spark.sql.catalog.prod.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
   --conf iceberg.engine.hive.enabled=true \
   --conf spark.dynamicAllocation.enabled=false \
   --conf 
spark.sql.catalog.prod.metrics-reporter-impl=package.MyCustomMetricsReporter \
   --class in.app.TestMetrics /home/hadoop/abhishek/my_jar.jar
   `
   
   The MetricsReporter implementation as below:
   `
   class MyCustomMetricsReporter extends MetricsReporter{
    override def report(report: MetricsReport): Unit = {
     //gets called
     }
   
    override def close(): Unit = {
     //Never Gets Called
     }
   }
   `
   
   The Main Class
   `
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.Row
   
   import scala.util.Random
   
   object TestMetrics extends App {
   
     val spark: SparkSession = SparkSession.builder()
       .enableHiveSupport()
       .getOrCreate()
   
     val schema = StructType( Array(
       StructField("vendor_id", LongType,true),
       StructField("trip_id", LongType,true),
       StructField("trip_distance", FloatType,true),
       StructField("fare_amount", DoubleType,true),
       StructField("store_and_fwd_flag", StringType,true)
     ))
   
     val randData = Random.nextDouble()
     println(randData)
   
     val data = Seq(
       Row(1: Long, 1000371: Long, 1.8f: Float, 15.32: Double, "N": String),
       Row(2: Long, 1000372: Long, 2.5f: Float, 22.15: Double, "N": String),
       Row(2: Long, 1000373: Long, 0.9f: Float, randData: Double, "N": String),
       Row(1: Long, 1000374: Long, 8.4f: Float, 42.13: Double, "Y": String)
     )
   
     val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema)
     df.createTempView("latestRecords")
     val mergeQuery =
       """
         |MERGE INTO prod.iceberg_sanity_test.taxis t USING latestRecords s
         |        ON s.vendor_id=t.vendor_id and s.trip_id=t.trip_id
         |   WHEN MATCHED THEN UPDATE SET *
         |   WHEN NOT MATCHED THEN INSERT *
         |""".stripMargin
     println(System.currentTimeMillis())
     spark.sql(mergeQuery)
   
   }
   `
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to