huyuanfeng2018 commented on issue #9349: URL: https://github.com/apache/iceberg/issues/9349#issuecomment-2162903308
> @huyuanfeng2018 > > I see that the "closeable" trait was added in this commit, however in case of below implementations, the close() method of the MetricsReporter never gets called. > > spark-submit command looks like below: > > ``` > spark-submit --name test_metrics --packages org.apache.iceberg:iceberg-aws-bundle:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \ > --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ > --conf spark.sql.catalog.prod=org.apache.iceberg.spark.SparkCatalog \ > --conf spark.sql.catalog.prod.type=hive \ > --conf spark.sql.catalog.prod.uri=thrift://<host>:9083 \ > --conf spark.sql.catalog.prod.warehouse=s3a://<mybucket>/jarSparkJobs/iceberg-metrics-test/ \ > --conf spark.sql.catalog.prod.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ > --conf iceberg.engine.hive.enabled=true \ > --conf spark.dynamicAllocation.enabled=false \ > --conf spark.sql.catalog.prod.metrics-reporter-impl=package.MyCustomMetricsReporter \ > --class in.app.TestMetrics /home/hadoop/abhishek/my_jar.jar > ``` > > The MetricsReporter implementation as below: > > ``` > class MyCustomMetricsReporter extends MetricsReporter{ > override def report(report: MetricsReport): Unit = { > //gets called > } > > override def close(): Unit = { > //Never Gets Called > } > } > ``` > > The Main Class > > ``` > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types._ > import org.apache.spark.sql.Row > > import scala.util.Random > > object TestMetrics extends App { > > val spark: SparkSession = SparkSession.builder() > .enableHiveSupport() > .getOrCreate() > > val schema = StructType( Array( > StructField("vendor_id", LongType,true), > StructField("trip_id", LongType,true), > StructField("trip_distance", FloatType,true), > StructField("fare_amount", DoubleType,true), > StructField("store_and_fwd_flag", StringType,true) > )) > > val randData = Random.nextDouble() > println(randData) > > val data = Seq( > Row(1: Long, 1000371: Long, 1.8f: Float, 15.32: Double, "N": String), > Row(2: Long, 1000372: Long, 2.5f: Float, 22.15: Double, "N": String), > Row(2: Long, 1000373: Long, 0.9f: Float, randData: Double, "N": String), > Row(1: Long, 1000374: Long, 8.4f: Float, 42.13: Double, "Y": String) > ) > > val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) > df.createTempView("latestRecords") > val mergeQuery = > """ > |MERGE INTO prod.iceberg_sanity_test.taxis t USING latestRecords s > | ON s.vendor_id=t.vendor_id and s.trip_id=t.trip_id > | WHEN MATCHED THEN UPDATE SET * > | WHEN NOT MATCHED THEN INSERT * > |""".stripMargin > println(System.currentTimeMillis()) > spark.sql(mergeQuery) > > } > ``` The reporter will be called when the iceberg catalog closes. This should be a behavior of spark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org