This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 1d86a67 [Improve] Use Spark's Logging instead of explicit usage of log4j (#84) 1d86a67 is described below commit 1d86a6794ab9b2fbd94930deb522250fb8e6103a Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Fri Mar 31 18:16:14 2023 +0800 [Improve] Use Spark's Logging instead of explicit usage of log4j (#84) * replace org.apache.log4j.Logger by using spark's Logging --- .../apache/doris/spark/rdd/ScalaValueReader.scala | 23 +++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 03643b2..b196355 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -34,7 +34,7 @@ import org.apache.doris.spark.sql.SchemaUtils import org.apache.doris.spark.util.ErrorMessages import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} -import org.apache.log4j.Logger +import org.apache.spark.internal.Logging import scala.util.control.Breaks @@ -43,8 +43,7 @@ import scala.util.control.Breaks * @param partition Doris RDD partition * @param settings request configuration */ -class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { - private val logger = Logger.getLogger(classOf[ScalaValueReader]) +class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging{ protected val client = new BackendClient(new Routing(partition.getBeAddress), settings) protected val clientLock = @@ -57,7 +56,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try { settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean } getOrElse { - logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)) + logWarning(String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))) DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT } @@ -65,7 +64,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val blockingQueueSize = Try { settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)) + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))) DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT } @@ -89,21 +88,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val batchSize = Try { settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE)) - DORIS_BATCH_SIZE_DEFAULT + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))) + DORIS_BATCH_SIZE_DEFAULT } val queryDorisTimeout = Try { settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)) + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))) DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT } val execMemLimit = Try { settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)) + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))) DORIS_EXEC_MEM_LIMIT_DEFAULT } @@ -113,7 +112,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) - logger.debug(s"Open scan params is, " + + logDebug(s"Open scan params is, " + s"cluster: ${params.getCluster}, " + s"database: ${params.getDatabase}, " + s"table: ${params.getTable}, " + @@ -159,7 +158,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { started } - logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.") + logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.") /** * read data and cached in rowBatch. @@ -213,7 +212,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { */ def next: AnyRef = { if (!hasNext) { - logger.error(SHOULD_NOT_HAPPEN_MESSAGE) + logError(SHOULD_NOT_HAPPEN_MESSAGE) throw new ShouldNeverHappenException } rowBatch.next --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org