allen-abhishekkr commented on issue #9349: URL: https://github.com/apache/iceberg/issues/9349#issuecomment-2153099416
@huyuanfeng2018 I see that the "closeable" trait was added in this commit, however in case of below implementations, the close() method of the MetricsReporter never gets called. spark-submit command looks like below: ` spark-submit --name test_metrics --packages org.apache.iceberg:iceberg-aws-bundle:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.prod=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.prod.type=hive \ --conf spark.sql.catalog.prod.uri=thrift://<host>:9083 \ --conf spark.sql.catalog.prod.warehouse=s3a://<mybucket>/jarSparkJobs/iceberg-metrics-test/ \ --conf spark.sql.catalog.prod.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf iceberg.engine.hive.enabled=true \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.sql.catalog.prod.metrics-reporter-impl=package.MyCustomMetricsReporter \ --class in.app.TestMetrics /home/hadoop/abhishek/my_jar.jar ` The MetricsReporter implementation as below: ` class MyCustomMetricsReporter extends MetricsReporter{ override def report(report: MetricsReport): Unit = { //gets called } override def close(): Unit = { //Never Gets Called } } ` The Main Class ` import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import scala.util.Random object TestMetrics extends App { val spark: SparkSession = SparkSession.builder() .enableHiveSupport() .getOrCreate() val schema = StructType( Array( StructField("vendor_id", LongType,true), StructField("trip_id", LongType,true), StructField("trip_distance", FloatType,true), StructField("fare_amount", DoubleType,true), StructField("store_and_fwd_flag", StringType,true) )) val randData = Random.nextDouble() println(randData) val data = Seq( Row(1: Long, 1000371: Long, 1.8f: Float, 15.32: Double, "N": String), Row(2: Long, 1000372: Long, 2.5f: Float, 22.15: Double, "N": String), Row(2: Long, 1000373: Long, 0.9f: Float, randData: Double, "N": String), Row(1: Long, 1000374: Long, 8.4f: Float, 42.13: Double, "Y": String) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) df.createTempView("latestRecords") val mergeQuery = """ |MERGE INTO prod.iceberg_sanity_test.taxis t USING latestRecords s | ON s.vendor_id=t.vendor_id and s.trip_id=t.trip_id | WHEN MATCHED THEN UPDATE SET * | WHEN NOT MATCHED THEN INSERT * |""".stripMargin println(System.currentTimeMillis()) spark.sql(mergeQuery) } ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org