This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b0e2cb575390 [SPARK-48623][CORE] Structured Logging Migrations
b0e2cb575390 is described below

commit b0e2cb575390d9dabb1142a78f4ceed48c059212
Author: Amanda Liu <[email protected]>
AuthorDate: Tue Jun 18 23:51:28 2024 -0700

    [SPARK-48623][CORE] Structured Logging Migrations
    
    ### What changes were proposed in this pull request?
    This PR migrates Scala logging to comply with the scala style changes in 
[#46979](https://github.com/apache/spark/pull/46947)
    
    ### Why are the changes needed?
    This makes development and PR review of the structured logging migration 
easier.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested by ensuring `dev/scalastyle` checks pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46980 from asl3/logging-migrationscala.
    
    Authored-by: Amanda Liu <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 26 +++++++++++++++
 .../sql/streaming/StreamingQueryListenerBus.scala  |  9 +++--
 ...SparkConnectStreamingQueryListenerHandler.scala | 26 ++++++++++-----
 .../org/apache/spark/broadcast/Broadcast.scala     |  5 +--
 .../spark/deploy/ExternalShuffleService.scala      |  4 +--
 .../apache/spark/deploy/worker/WorkerWatcher.scala |  4 +--
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  6 ++--
 ...AbortableStreamBasedCheckpointFileManager.scala |  4 +--
 .../ml/classification/LogisticRegression.scala     | 12 +++----
 .../spark/deploy/yarn/ApplicationMaster.scala      | 13 +++++---
 .../org/apache/spark/deploy/yarn/Client.scala      |  2 +-
 .../cluster/YarnClientSchedulerBackend.scala       |  4 +--
 .../apache/spark/sql/types/UDTRegistration.scala   |  5 +--
 .../expressions/codegen/CodeGenerator.scala        | 10 +++---
 .../execution/adaptive/ShufflePartitionsUtil.scala |  8 +++--
 .../PythonStreamingPartitionReaderFactory.scala    |  5 +--
 .../python/PythonStreamingSourceRunner.scala       |  5 +--
 .../sql/execution/streaming/ProgressReporter.scala |  9 ++---
 .../sql/execution/streaming/StreamExecution.scala  |  8 +++--
 .../streaming/continuous/ContinuousExecution.scala |  3 +-
 .../streaming/continuous/ContinuousWriteRDD.scala  |  8 +++--
 .../streaming/state/StateStoreChangelog.scala      |  8 +++--
 .../SparkExecuteStatementOperation.scala           | 16 ++++-----
 .../thriftserver/SparkGetFunctionsOperation.scala  |  4 +--
 .../org/apache/spark/streaming/Checkpoint.scala    | 38 +++++++++++++--------
 .../apache/spark/streaming/StreamingContext.scala  |  5 +--
 .../apache/spark/streaming/dstream/DStream.scala   | 39 +++++++++++++---------
 .../streaming/dstream/SocketInputDStream.scala     |  8 ++---
 .../streaming/receiver/ReceiverSupervisor.scala    |  9 ++---
 .../receiver/ReceiverSupervisorImpl.scala          |  8 ++---
 .../scheduler/ExecutorAllocationManager.scala      | 24 ++++++++-----
 .../streaming/scheduler/InputInfoTracker.scala     |  5 +--
 .../streaming/scheduler/ReceivedBlockTracker.scala | 17 ++++++----
 .../streaming/scheduler/ReceiverTracker.scala      | 19 ++++++-----
 .../scheduler/rate/PIDRateEstimator.scala          |  9 +++--
 .../streaming/util/BatchedWriteAheadLog.scala      |  5 +--
 .../streaming/util/FileBasedWriteAheadLog.scala    | 19 +++++++----
 37 files changed, 257 insertions(+), 152 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 40d3f67a48a7..f90eb4a77071 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -65,7 +65,10 @@ private[spark] object LogKeys {
   case object ADDED_JARS extends LogKey
   case object ADMIN_ACLS extends LogKey
   case object ADMIN_ACL_GROUPS extends LogKey
+  case object ADVISORY_TARGET_SIZE extends LogKey
   case object AGGREGATE_FUNCTIONS extends LogKey
+  case object ALIGNED_FROM_TIME extends LogKey
+  case object ALIGNED_TO_TIME extends LogKey
   case object ALPHA extends LogKey
   case object ANALYSIS_ERROR extends LogKey
   case object APP_ATTEMPT_ID extends LogKey
@@ -77,8 +80,10 @@ private[spark] object LogKeys {
   case object APP_STATE extends LogKey
   case object ARCHIVE_NAME extends LogKey
   case object ARGS extends LogKey
+  case object ARTIFACT_ID extends LogKey
   case object ATTRIBUTE_MAP extends LogKey
   case object AUTH_ENABLED extends LogKey
+  case object AVG_BATCH_PROC_TIME extends LogKey
   case object BACKUP_FILE extends LogKey
   case object BARRIER_EPOCH extends LogKey
   case object BARRIER_ID extends LogKey
@@ -99,6 +104,8 @@ private[spark] object LogKeys {
   case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
   case object BUCKET extends LogKey
   case object BYTECODE_SIZE extends LogKey
+  case object BYTE_BUFFER extends LogKey
+  case object BYTE_SIZE extends LogKey
   case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
   case object CACHE_AUTO_REMOVED_SIZE extends LogKey
   case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
@@ -109,6 +116,7 @@ private[spark] object LogKeys {
   case object CATALOG_NAME extends LogKey
   case object CATEGORICAL_FEATURES extends LogKey
   case object CHECKPOINT_FILE extends LogKey
+  case object CHECKPOINT_INTERVAL extends LogKey
   case object CHECKPOINT_LOCATION extends LogKey
   case object CHECKPOINT_PATH extends LogKey
   case object CHECKPOINT_ROOT extends LogKey
@@ -186,6 +194,7 @@ private[spark] object LogKeys {
   case object DELEGATE extends LogKey
   case object DELTA extends LogKey
   case object DEPRECATED_KEY extends LogKey
+  case object DERIVATIVE extends LogKey
   case object DESCRIPTION extends LogKey
   case object DESIRED_NUM_PARTITIONS extends LogKey
   case object DESIRED_TREE_DEPTH extends LogKey
@@ -197,6 +206,7 @@ private[spark] object LogKeys {
   case object DRIVER_MEMORY_SIZE extends LogKey
   case object DRIVER_STATE extends LogKey
   case object DROPPED_PARTITIONS extends LogKey
+  case object DSTREAM extends LogKey
   case object DURATION extends LogKey
   case object EARLIEST_LOADED_VERSION extends LogKey
   case object EFFECTIVE_STORAGE_LEVEL extends LogKey
@@ -251,6 +261,7 @@ private[spark] object LogKeys {
   case object FEATURE_NAME extends LogKey
   case object FETCH_SIZE extends LogKey
   case object FIELD_NAME extends LogKey
+  case object FILES extends LogKey
   case object FILE_ABSOLUTE_PATH extends LogKey
   case object FILE_END_OFFSET extends LogKey
   case object FILE_FORMAT extends LogKey
@@ -307,6 +318,7 @@ private[spark] object LogKeys {
   case object INIT_MODE extends LogKey
   case object INPUT extends LogKey
   case object INPUT_SPLIT extends LogKey
+  case object INTEGRAL extends LogKey
   case object INTERVAL extends LogKey
   case object ISOLATION_LEVEL extends LogKey
   case object ISSUE_DATE extends LogKey
@@ -394,6 +406,7 @@ private[spark] object LogKeys {
   case object MIN_COMPACTION_BATCH_ID extends LogKey
   case object MIN_NUM_FREQUENT_PATTERN extends LogKey
   case object MIN_POINT_PER_CLUSTER extends LogKey
+  case object MIN_RATE extends LogKey
   case object MIN_SHARE extends LogKey
   case object MIN_SIZE extends LogKey
   case object MIN_TIME extends LogKey
@@ -490,6 +503,7 @@ private[spark] object LogKeys {
   case object NUM_PREFIXES extends LogKey
   case object NUM_PRUNED extends LogKey
   case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey
+  case object NUM_RECEIVERS extends LogKey
   case object NUM_RECORDS_READ extends LogKey
   case object NUM_RELEASED_LOCKS extends LogKey
   case object NUM_REMAINED extends LogKey
@@ -547,6 +561,7 @@ private[spark] object LogKeys {
   case object PARTITIONER extends LogKey
   case object PARTITION_ID extends LogKey
   case object PARTITION_IDS extends LogKey
+  case object PARTITION_SIZE extends LogKey
   case object PARTITION_SPECIFICATION extends LogKey
   case object PARTITION_SPECS extends LogKey
   case object PATH extends LogKey
@@ -575,6 +590,7 @@ private[spark] object LogKeys {
   case object PROCESSING_TIME extends LogKey
   case object PRODUCER_ID extends LogKey
   case object PROPERTY_NAME extends LogKey
+  case object PROPORTIONAL extends LogKey
   case object PROTOCOL_VERSION extends LogKey
   case object PROVIDER extends LogKey
   case object PUSHED_FILTERS extends LogKey
@@ -595,6 +611,8 @@ private[spark] object LogKeys {
   case object QUERY_PLAN_LENGTH_MAX extends LogKey
   case object QUERY_RUN_ID extends LogKey
   case object RANGE extends LogKey
+  case object RATE_LIMIT extends LogKey
+  case object RATIO extends LogKey
   case object RDD_CHECKPOINT_DIR extends LogKey
   case object RDD_DEBUG_STRING extends LogKey
   case object RDD_DESCRIPTION extends LogKey
@@ -646,6 +664,8 @@ private[spark] object LogKeys {
   case object RULE_NAME extends LogKey
   case object RUN_ID extends LogKey
   case object SCALA_VERSION extends LogKey
+  case object SCALING_DOWN_RATIO extends LogKey
+  case object SCALING_UP_RATIO extends LogKey
   case object SCHEDULER_POOL_NAME extends LogKey
   case object SCHEDULING_MODE extends LogKey
   case object SCHEMA extends LogKey
@@ -671,12 +691,14 @@ private[spark] object LogKeys {
   case object SHUFFLE_SERVICE_NAME extends LogKey
   case object SIGMAS_LENGTH extends LogKey
   case object SIGNAL extends LogKey
+  case object SINK extends LogKey
   case object SIZE extends LogKey
   case object SLEEP_TIME extends LogKey
   case object SLIDE_DURATION extends LogKey
   case object SMALLEST_CLUSTER_INDEX extends LogKey
   case object SNAPSHOT_VERSION extends LogKey
   case object SOCKET_ADDRESS extends LogKey
+  case object SOURCE extends LogKey
   case object SOURCE_PATH extends LogKey
   case object SPARK_BRANCH extends LogKey
   case object SPARK_BUILD_DATE extends LogKey
@@ -708,6 +730,7 @@ private[spark] object LogKeys {
   case object STORAGE_LEVEL_REPLICATION extends LogKey
   case object STORAGE_MEMORY_SIZE extends LogKey
   case object STORE_ID extends LogKey
+  case object STREAMING_CONTEXT extends LogKey
   case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey
   case object STREAMING_DATA_SOURCE_NAME extends LogKey
   case object STREAMING_OFFSETS_END extends LogKey
@@ -729,6 +752,7 @@ private[spark] object LogKeys {
   case object TARGET_NUM_EXECUTOR extends LogKey
   case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
   case object TARGET_PATH extends LogKey
+  case object TARGET_SIZE extends LogKey
   case object TASK_ATTEMPT_ID extends LogKey
   case object TASK_ID extends LogKey
   case object TASK_INDEX extends LogKey
@@ -752,6 +776,7 @@ private[spark] object LogKeys {
   case object THREAD_POOL_SIZE extends LogKey
   case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
   case object THRESHOLD extends LogKey
+  case object THRESH_TIME extends LogKey
   case object TIME extends LogKey
   case object TIMEOUT extends LogKey
   case object TIMER extends LogKey
@@ -814,4 +839,5 @@ private[spark] object LogKeys {
   case object XML_SCHEDULING_MODE extends LogKey
   case object XSD_PATH extends LogKey
   case object YOUNG_GENERATION_GC extends LogKey
+  case object ZERO_TIME extends LogKey
 }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
index 56a9e19a1b78..c3c23740e2fe 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan, 
StreamingQueryEventType}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connect.client.CloseableIterator
 import org.apache.spark.sql.streaming.StreamingQueryListener.{Event, 
QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
@@ -115,7 +115,7 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) 
extends Logging {
             case StreamingQueryEventType.QUERY_TERMINATED_EVENT =>
               postToAll(QueryTerminatedEvent.fromJson(event.getEventJson))
             case _ =>
-              logWarning(s"Unknown StreamingQueryListener event: $event")
+              logWarning(log"Unknown StreamingQueryListener event: 
${MDC(LogKeys.EVENT, event)}")
           }
         })
       }
@@ -144,7 +144,10 @@ class StreamingQueryListenerBus(sparkSession: 
SparkSession) extends Logging {
             listener.onQueryIdle(t)
           case t: QueryTerminatedEvent =>
             listener.onQueryTerminated(t)
-          case _ => logWarning(s"Unknown StreamingQueryListener event: $event")
+          case _ =>
+            logWarning(
+              log"Unknown StreamingQueryListener event: " +
+                log"${MDC(LogKeys.EVENT, event)}")
         }
       } catch {
         case e: Exception =>
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
index 94f01026b7a5..d072b56e022a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
@@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
 import org.apache.spark.connect.proto.ExecutePlanResponse
 import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
 import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.connect.service.ExecuteHolder
 
 /**
@@ -83,20 +83,30 @@ class 
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
             } catch {
               case NonFatal(e) =>
                 logError(
-                  s"[SessionId: $sessionId][UserId: $userId][operationId: " +
-                    s"${executeHolder.operationId}] Error sending listener 
added response.",
+                  log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+                    log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+                    log"[operationId: " +
+                    log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, 
executeHolder.operationId)}] " +
+                    log"Error sending listener added response.",
                   e)
                 listenerHolder.cleanUp()
                 return
             }
         }
-        logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
-          s"${executeHolder.operationId}] Server side listener added. Now 
blocking until " +
-          "all client side listeners are removed or there is error 
transmitting the event back.")
+        logInfo(
+          log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
+            log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
+            log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, 
executeHolder.operationId)}] " +
+            log"Server side listener added. Now blocking until all client side 
listeners are " +
+            log"removed or there is error transmitting the event back.")
         // Block the handling thread, and have serverListener continuously 
send back new events
         listenerHolder.streamingQueryListenerLatch.await()
-        logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
-          s"${executeHolder.operationId}] Server side listener long-running 
handling thread ended.")
+        logInfo(
+          log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
+            log"${MDC(LogKeys.USER_ID, userId)}]" +
+            log"[operationId: " +
+            log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, 
executeHolder.operationId)}] " +
+            log"Server side listener long-running handling thread ended.")
       case 
StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
         listenerHolder.isServerSideListenerRegistered match {
           case true =>
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 445b7d4d7aa0..3adb540a7ad1 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,7 +22,7 @@ import java.io.Serializable
 import scala.reflect.ClassTag
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.util.Utils
 
 /**
@@ -106,7 +106,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends 
Serializable with Lo
     assertValid()
     _isValid = false
     _destroySite = Utils.getCallSite().shortForm
-    logInfo("Destroying %s (from %s)".format(toString, _destroySite))
+    logInfo(log"Destroying ${MDC(LogKeys.BROADCAST, toString)} " +
+      log"(from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, _destroySite)})")
     doDestroy(blocking)
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 3ce5e2d62b6a..851fb453fd09 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -71,8 +71,8 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
     if (localDirs.length >= 1) {
       new File(localDirs.find(new File(_, 
dbName).exists()).getOrElse(localDirs(0)), dbName)
     } else {
-      logWarning(s"'spark.local.dir' should be set first when we use db in " +
-        s"ExternalShuffleService. Note that this only affects standalone 
mode.")
+      logWarning("'spark.local.dir' should be set first when we use db in " +
+        "ExternalShuffleService. Note that this only affects standalone mode.")
       null
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index be11c23f306e..bd07a0ade523 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker
 
 import java.util.concurrent.atomic.AtomicBoolean
 
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.WORKER_URL
 import org.apache.spark.rpc._
 
@@ -64,7 +64,7 @@ private[spark] class WorkerWatcher(
     }
 
   override def receive: PartialFunction[Any, Unit] = {
-    case e => logWarning(s"Received unexpected message: $e")
+    case e => logWarning(log"Received unexpected message: ${MDC(LogKeys.ERROR, 
e)}")
   }
 
   override def onConnected(remoteAddress: RpcAddress): Unit = {
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index d0a202cb7951..dde9b541b62f 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -28,7 +28,7 @@ import com.google.common.cache.CacheBuilder
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException}
 import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.io.NioBufferedFileInputStream
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
@@ -436,8 +436,8 @@ private[spark] class IndexShuffleBlockResolver(
         if (checksumTmp.exists()) {
           try {
             if (!checksumTmp.delete()) {
-              logError(s"Failed to delete temporary checksum file " +
-                s"at ${checksumTmp.getAbsolutePath}")
+              logError(log"Failed to delete temporary checksum file at " +
+                log"${MDC(LogKeys.PATH, checksumTmp.getAbsolutePath)}")
             }
           } catch {
             case e: Exception =>
diff --git 
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
 
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
index 2afab01ec7b0..20b3d9444884 100644
--- 
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
+++ 
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -24,7 +24,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import 
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 
@@ -36,7 +36,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, 
hadoopConf: Configur
       s" an fs (path: $path) with abortable stream support")
   }
 
-  logInfo(s"Writing atomically to $path based on abortable stream")
+  logInfo(log"Writing atomically to ${MDC(LogKeys.PATH, path)} based on 
abortable stream")
 
   class AbortableStreamBasedFSDataOutputStream(
       fsDataOutputStream: FSDataOutputStream,
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index b523bd750836..b3c48f13591f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -503,8 +503,8 @@ class LogisticRegression @Since("1.2.0") (
       tol, fitIntercept, maxBlockSizeInMB)
 
     if (dataset.storageLevel != StorageLevel.NONE) {
-      instr.logWarning(s"Input instances will be standardized, blockified to 
blocks, and " +
-        s"then cached during training. Be careful of double caching!")
+      instr.logWarning("Input instances will be standardized, blockified to 
blocks, and " +
+        "then cached during training. Be careful of double caching!")
     }
 
     val instances = dataset.select(
@@ -569,8 +569,8 @@ class LogisticRegression @Since("1.2.0") (
 
     val isConstantLabel = histogram.count(_ != 0.0) == 1
     if ($(fitIntercept) && isConstantLabel && 
!usingBoundConstrainedOptimization) {
-      instr.logWarning(s"All labels are the same value and fitIntercept=true, 
so the " +
-        s"coefficients will be zeros. Training is not needed.")
+      instr.logWarning("All labels are the same value and fitIntercept=true, 
so the " +
+        "coefficients will be zeros. Training is not needed.")
       val constantLabelIndex = Vectors.dense(histogram).argmax
       val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures,
         new Array[Int](numCoefficientSets + 1), Array.emptyIntArray, 
Array.emptyDoubleArray,
@@ -584,8 +584,8 @@ class LogisticRegression @Since("1.2.0") (
     }
 
     if (!$(fitIntercept) && isConstantLabel) {
-      instr.logWarning(s"All labels belong to a single class and 
fitIntercept=false. It's a " +
-        s"dangerous ground, so the algorithm may not converge.")
+      instr.logWarning("All labels belong to a single class and 
fitIntercept=false. It's a " +
+        "dangerous ground, so the algorithm may not converge.")
     }
 
     val featuresMean = summarizer.mean.toArray
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b5f9be3193f..2523941e0bff 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -528,8 +528,9 @@ private[spark] class ApplicationMaster(
     } catch {
       case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
         logError(
-          s"SparkContext did not initialize after waiting for $totalWaitTime 
ms. " +
-           "Please check earlier log output for errors. Failing the 
application.")
+          log"""SparkContext did not initialize after waiting for
+          |${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms.
+          | Please check earlier log output for errors. Failing the 
application.""".stripMargin)
         finish(FinalApplicationStatus.FAILED,
           ApplicationMaster.EXIT_SC_NOT_INITED,
           "Timed out waiting for SparkContext.")
@@ -690,7 +691,7 @@ private[spark] class ApplicationMaster(
       }
     } catch {
       case ioe: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+        logError(log"Failed to cleanup staging dir ${MDC(LogKeys.PATH, 
stagingDirPath)}", ioe)
     }
   }
 
@@ -736,7 +737,8 @@ private[spark] class ApplicationMaster(
       override def run(): Unit = {
         try {
           if (!Modifier.isStatic(mainMethod.getModifiers)) {
-            logError(s"Could not find static main method in object 
${args.userClass}")
+            logError(log"Could not find static main method in object " +
+              log"${MDC(LogKeys.CLASS_NAME, args.userClass)}")
             finish(FinalApplicationStatus.FAILED, 
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
           } else {
             mainMethod.invoke(null, userArgs.toArray)
@@ -866,7 +868,8 @@ private[spark] class ApplicationMaster(
             finish(FinalApplicationStatus.FAILED, exitCode)
           }
         } else {
-          logError(s"Application Master lost connection with driver! Shutting 
down. $remoteAddress")
+          logError(log"Application Master lost connection with driver! 
Shutting down. " +
+            log"${MDC(LogKeys.REMOTE_ADDRESS, remoteAddress)}")
           finish(FinalApplicationStatus.FAILED, 
ApplicationMaster.EXIT_DISCONNECTED)
         }
       }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0567d8efb85e..bf31e03ba9a8 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1385,7 +1385,7 @@ private[spark] class Client(
       val YarnAppReport(appState, finalState, diags) = monitorApplication()
       if (appState == YarnApplicationState.FAILED || finalState == 
FinalApplicationStatus.FAILED) {
         diags.foreach { err =>
-          logError(s"Application diagnostics message: $err")
+          logError(log"Application diagnostics message: ${MDC(LogKeys.ERROR, 
err)}")
         }
         throw new SparkException(s"Application $appId finished with failed 
status")
       }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 26be1ff89314..8032d782cf4f 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -26,7 +26,7 @@ import 
org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicati
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{APP_ID, APP_STATE}
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -120,7 +120,7 @@ private[spark] class YarnClientSchedulerBackend(
         logError(log"YARN application has exited unexpectedly with state " +
           log"${MDC(APP_STATE, state)}! Check the YARN application logs for 
more details.")
         diags.foreach { err =>
-          logError(s"Diagnostics message: $err")
+          logError(log"Diagnostics message: ${MDC(LogKeys.ERROR, err)}")
         }
         allowInterrupt = false
         sc.stop()
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
index 42c8c783e54c..9219c1d139b9 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.types
 import scala.collection.mutable
 
 import org.apache.spark.annotation.{DeveloperApi, Since}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.util.SparkClassUtils
 
@@ -58,7 +58,8 @@ object UDTRegistration extends Serializable with Logging {
    */
   def register(userClass: String, udtClass: String): Unit = {
     if (udtMap.contains(userClass)) {
-      logWarning(s"Cannot register UDT for ${userClass}, which is already 
registered.")
+      logWarning(log"Cannot register UDT for ${MDC(LogKeys.CLASS_NAME, 
userClass)}, " +
+        log"which is already registered.")
     } else {
       // When register UDT with class name, we can't check if the UDT class is 
an UserDefinedType,
       // or not. The check is deferred.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ba3f1df22bc4..a39c10866984 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -32,9 +32,8 @@ import org.codehaus.janino.util.ClassFile
 
 import org.apache.spark.{SparkException, SparkIllegalArgumentException, 
TaskContext, TaskKilledException}
 import org.apache.spark.executor.InputMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.MDC
 import org.apache.spark.metrics.source.CodegenMetrics
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.HashableWeakReference
@@ -1593,9 +1592,10 @@ object CodeGenerator extends Logging {
             
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
 
             if (byteCodeSize > DEFAULT_JVM_HUGE_METHOD_LIMIT) {
-              logInfo("Generated method too long to be JIT compiled: " +
-                log"${MDC(CLASS_NAME, cf.getThisClassName)}.${MDC(METHOD_NAME, 
method.getName)} " +
-                log"is ${MDC(BYTECODE_SIZE, byteCodeSize)} bytes")
+              logInfo(log"Generated method too long to be JIT compiled: " +
+                log"${MDC(LogKeys.CLASS_NAME, cf.getThisClassName)}." +
+                log"${MDC(LogKeys.METHOD_NAME, method.getName)} is " +
+                log"${MDC(LogKeys.BYTECODE_SIZE, byteCodeSize)} bytes")
             }
 
             byteCodeSize
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 9370b3d8d1d7..bb7d904402de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.adaptive
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.execution.{CoalescedPartitionSpec, 
PartialReducerPartitionSpec, ShufflePartitionSpec}
 
 object ShufflePartitionsUtil extends Logging {
@@ -61,8 +61,10 @@ object ShufflePartitionsUtil extends Logging {
     val targetSize = 
maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
 
     val shuffleIds = 
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
-    logInfo(s"For shuffle($shuffleIds), advisory target size: 
$advisoryTargetSize, " +
-      s"actual target size $targetSize, minimum partition size: 
$minPartitionSize")
+    logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_ID, shuffleIds)}, advisory 
target size: " +
+      log"${MDC(LogKeys.ADVISORY_TARGET_SIZE, advisoryTargetSize)}, actual 
target size " +
+      log"${MDC(LogKeys.TARGET_SIZE, targetSize)}, minimum partition size: " +
+      log"${MDC(LogKeys.PARTITION_SIZE, minPartitionSize)}")
 
     // If `inputPartitionSpecs` are all empty, it means skew join optimization 
is not applied.
     if (inputPartitionSpecs.forall(_.isEmpty)) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
index 75a38b8ea622..7d80cc272810 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution.datasources.v2.python
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
@@ -52,7 +52,8 @@ class PythonStreamingPartitionReaderFactory(
       val block = SparkEnv.get.blockManager.get[InternalRow](part.blockId.get)
         .map(_.data.asInstanceOf[Iterator[InternalRow]])
       if (block.isEmpty) {
-        logWarning(s"Prefetched block ${part.blockId} for Python data source 
not found.")
+        logWarning(log"Prefetched block ${MDC(LogKeys.BLOCK_ID, part.blockId)} 
" +
+          log"for Python data source not found.")
       }
       block
     } else None
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
index a512b34db345..33612b6947f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
@@ -27,7 +27,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.PYTHON_EXEC
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
@@ -214,7 +214,8 @@ class PythonStreamingSourceRunner(
    * Stop the python worker process and invoke stop() on stream reader.
    */
   def stop(): Unit = {
-    logInfo(s"Stopping streaming runner for module: $workerModule.")
+    logInfo(log"Stopping streaming runner for module: " +
+      log"${MDC(LogKeys.MODULE_NAME, workerModule)}.")
     try {
       pythonWorkerFactory.foreach { factory =>
         pythonWorker.foreach { worker =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 3842ed574355..c440ec451b72 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -352,8 +352,9 @@ abstract class ProgressContext(
           metrics = sourceMetrics
         )
       }
-      logInfo(s"Extracting source progress metrics for 
source=${source.toString} took " +
-        s"duration_ms=$duration")
+      logInfo(log"Extracting source progress metrics for source=" +
+        log"${MDC(LogKeys.SOURCE, source.toString)} " +
+        log"took duration_ms=${MDC(LogKeys.DURATION, duration)}")
       result
     }
   }
@@ -368,8 +369,8 @@ abstract class ProgressContext(
 
       SinkProgress(sink.toString, sinkOutput, sinkMetrics)
     }
-    logInfo(s"Extracting sink progress metrics for sink=${sink.toString} took 
" +
-      s"duration_ms=$duration")
+    logInfo(log"Extracting sink progress metrics for sink=${MDC(LogKeys.SINK, 
sink.toString)} " +
+      log"took duration_ms=${MDC(LogKeys.DURATION, duration)}")
     result
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 420deda3e017..4198d7367fe2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,7 +32,7 @@ import 
com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, 
SparkThrowable}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT, 
PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -322,7 +322,8 @@ abstract class StreamExecution(
         if (state.compareAndSet(INITIALIZING, ACTIVE)) {
           // Log logical plan at the start of the query to help debug issues 
related to
           // plan changes.
-          logInfo(s"Finish initializing with logical plan:\n$logicalPlan")
+          logInfo(log"Finish initializing with logical plan:\n" +
+            log"${MDC(LogKeys.QUERY_PLAN, logicalPlan)}")
 
           // Unblock `awaitInitialization`
           initializationLatch.countDown()
@@ -372,7 +373,8 @@ abstract class StreamExecution(
           case _ => None
         }
 
-        logError(s"Query $prettyIdString terminated with error", e)
+        logError(log"Query ${MDC(LogKeys.PRETTY_ID_STRING, prettyIdString)} " +
+          log"terminated with error", e)
         getLatestExecutionContext().updateStatusMessage(s"Terminated with 
exception: $message")
         // Rethrow the fatal errors to allow the user using 
`Thread.UncaughtExceptionHandler` to
         // handle them
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index fa49da5feeed..633aaf2682db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -445,7 +445,8 @@ class ContinuousExecution(
    */
   def stopInNewThread(error: Throwable): Unit = {
     if (failure.compareAndSet(null, error)) {
-      logError(s"Query $prettyIdString received exception $error")
+      logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} received 
exception " +
+        log"${MDC(ERROR, error)}")
       stopInNewThread()
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
index 5d54b5754915..d5daa9a875f8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.execution.streaming.continuous
 
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.write.DataWriter
@@ -89,9 +89,11 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], 
writerFactory: StreamingDat
       })(catchBlock = {
         // If there is an error, abort this writer. We enter this callback in 
the middle of
         // rethrowing an exception, so compute() will stop executing at this 
point.
-        logError(s"Writer for partition ${context.partitionId()} is aborting.")
+        logError(log"Writer for partition ${MDC(LogKeys.PARTITION_ID, 
context.partitionId())} " +
+          log"is aborting.")
         if (dataWriter != null) dataWriter.abort()
-        logError(s"Writer for partition ${context.partitionId()} aborted.")
+        logError(log"Writer for partition ${MDC(LogKeys.PARTITION_ID, 
context.partitionId())} " +
+          log"aborted.")
       }, finallyBlock = {
         if (dataWriter != null) dataWriter.close()
       })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 23f867d3e6c0..20df67b25bfe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -25,7 +25,7 @@ import com.google.common.io.ByteStreams
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.{FSError, Path}
 
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -176,7 +176,8 @@ class StateStoreChangelogWriterV1(
     } catch {
       case e: Throwable =>
         abort()
-        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME, 
file)} " +
+          log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
         throw e
     } finally {
       backingFileStream = null
@@ -255,7 +256,8 @@ class StateStoreChangelogWriterV2(
     } catch {
       case e: Throwable =>
         abort()
-        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME, 
file)} " +
+          log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
         throw e
     } finally {
       backingFileStream = null
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index b86e996a3408..51a5e88aa633 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -30,7 +30,7 @@ import 
org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, 
TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, 
TTypeQualifiers, TTypeQualifierValue}
 
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -83,7 +83,7 @@ private[hive] class SparkExecuteStatementOperation(
       val sparkType = new StructType().add("Result", "string")
       SparkExecuteStatementOperation.toTTableSchema(sparkType)
     } else {
-      logInfo(s"Result Schema: ${result.schema.sql}")
+      logInfo(log"Result Schema: ${MDC(LogKeys.SCHEMA, result.schema.sql)}")
       SparkExecuteStatementOperation.toTTableSchema(result.schema)
     }
   }
@@ -126,8 +126,8 @@ private[hive] class SparkExecuteStatementOperation(
   override def runInternal(): Unit = {
     setState(OperationState.PENDING)
     logInfo(
-      log"Submitting query '${MDC(REDACTED_STATEMENT, redactedStatement)}' 
with " +
-        log"${MDC(STATEMENT_ID, statementId)}")
+      log"Submitting query '${MDC(LogKeys.REDACTED_STATEMENT, 
redactedStatement)}' with " +
+      log"${MDC(LogKeys.STATEMENT_ID, statementId)}")
     HiveThriftServer2.eventManager.onStatementStart(
       statementId,
       parentSession.getSessionHandle.getSessionId.toString,
@@ -215,8 +215,8 @@ private[hive] class SparkExecuteStatementOperation(
       synchronized {
         if (getStatus.getState.isTerminal) {
           logInfo(
-            log"Query with ${MDC(STATEMENT_ID, statementId)} in terminal state 
" +
-              log"before it started running")
+            log"Query with ${MDC(LogKeys.STATEMENT_ID, statementId)} in 
terminal state " +
+            log"before it started running")
           return
         } else {
           logInfo(log"Running query with ${MDC(STATEMENT_ID, statementId)}")
@@ -289,8 +289,8 @@ private[hive] class SparkExecuteStatementOperation(
     synchronized {
       if (!getStatus.getState.isTerminal) {
         logInfo(
-          log"Query with ${MDC(STATEMENT_ID, statementId)} timed out after " +
-            log"${MDC(TIMEOUT, timeout)} seconds")
+          log"Query with ${MDC(LogKeys.STATEMENT_ID, statementId)} timed out " 
+
+          log"after ${MDC(LogKeys.TIMEOUT, timeout)} seconds")
         setState(OperationState.TIMEDOUT)
         cleanup()
         HiveThriftServer2.eventManager.onStatementTimeout(statementId)
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 53a94a128c0e..9cf31d99ccfa 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -27,7 +27,7 @@ import 
org.apache.hive.service.cli.operation.GetFunctionsOperation
 import 
org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG
 import org.apache.hive.service.cli.session.HiveSession
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.SQLContext
 
 /**
@@ -53,7 +53,7 @@ private[hive] class SparkGetFunctionsOperation(
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
     val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
-    logInfo(s"$logMsg with $statementId")
+    logInfo(log"${MDC(LogKeys.MESSAGE, logMsg)} with 
${MDC(LogKeys.STATEMENT_ID, statementId)}")
     setState(OperationState.RUNNING)
     // Always use the latest class loader provided by executionHive's state.
     val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 6cbc74a75a06..f09f9caf129b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE, 
CHECKPOINT_TIME, NUM_RETRY, PATH, TEMP_FILE}
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.io.CompressionCodec
@@ -102,7 +102,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
     assert(framework != null, "Checkpoint.framework is null")
     assert(graph != null, "Checkpoint.graph is null")
     assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
-    logInfo(s"Checkpoint for time $checkpointTime validated")
+    logInfo(log"Checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME, 
checkpointTime)} validated")
   }
 }
 
@@ -242,7 +242,8 @@ class CheckpointWriter(
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1
         try {
-          logInfo(s"Saving checkpoint for time $checkpointTime to file 
'$checkpointFile'")
+          logInfo(log"Saving checkpoint for time 
${MDC(LogKeys.CHECKPOINT_TIME, checkpointTime)} " +
+            log"to file '${MDC(LogKeys.CHECKPOINT_FILE, checkpointFile)}'")
           if (fs == null) {
             fs = new Path(checkpointDir).getFileSystem(hadoopConf)
           }
@@ -275,15 +276,19 @@ class CheckpointWriter(
           val allCheckpointFiles = 
Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
           if (allCheckpointFiles.size > 10) {
             allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { 
file =>
-              logInfo(s"Deleting $file")
+              logInfo(log"Deleting ${MDC(LogKeys.FILE_NAME, file)}")
               fs.delete(file, true)
             }
           }
 
           // All done, print success
-          logInfo(s"Checkpoint for time $checkpointTime saved to file 
'$checkpointFile'" +
-            s", took ${bytes.length} bytes and " +
-            s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)} ms")
+          logInfo(
+            log"Checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME, 
checkpointTime)} " +
+              log"saved to file " +
+            log"'${MDC(LogKeys.CHECKPOINT_FILE, checkpointFile)}', took " +
+            log"${MDC(LogKeys.BYTE_SIZE, bytes.length)} bytes and " +
+            log"${MDC(LogKeys.TIME, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
+              - startTimeNs))} ms")
           jobGenerator.onCheckpointCompletion(checkpointTime, 
clearCheckpointDataLater)
           return
         } catch {
@@ -304,7 +309,8 @@ class CheckpointWriter(
       val bytes = Checkpoint.serialize(checkpoint, conf)
       executor.execute(new CheckpointWriteHandler(
         checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
-      logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to 
writer queue")
+      logInfo(log"Submitted checkpoint of time ${MDC(LogKeys.CHECKPOINT_TIME,
+        checkpoint.checkpointTime)} to writer queue")
     } catch {
       case rej: RejectedExecutionException =>
         logError("Could not submit checkpoint task to the thread pool 
executor", rej)
@@ -316,8 +322,10 @@ class CheckpointWriter(
 
     val startTimeNs = System.nanoTime()
     ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS))
-    logInfo(s"CheckpointWriter executor terminated? ${executor.isTerminated}," 
+
-      s" waited for ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)} ms.")
+    logInfo(log"CheckpointWriter executor terminated? " +
+      log"${MDC(LogKeys.EXECUTOR_STATE, executor.isTerminated)}, waited for " +
+      log"${MDC(LogKeys.DURATION, TimeUnit.NANOSECONDS.toMillis(
+        System.nanoTime() - startTimeNs))} ms.")
     stopped = true
   }
 }
@@ -357,15 +365,17 @@ object CheckpointReader extends Logging {
     }
 
     // Try to read the checkpoint files in the order
-    logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
+    logInfo(log"Checkpoint files found: " +
+      log"${MDC(LogKeys.CHECKPOINT_FILE, checkpointFiles.mkString(","))}")
     var readError: Exception = null
     checkpointFiles.foreach { file =>
-      logInfo(s"Attempting to load checkpoint from file $file")
+      logInfo(log"Attempting to load checkpoint from file 
${MDC(LogKeys.FILE_NAME, file)}")
       try {
         val fis = fs.open(file)
         val cp = Checkpoint.deserialize(fis, conf)
-        logInfo(s"Checkpoint successfully loaded from file $file")
-        logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
+        logInfo(log"Checkpoint successfully loaded from file 
${MDC(LogKeys.FILE_NAME, file)}")
+        logInfo(log"Checkpoint was generated at time " +
+          log"${MDC(LogKeys.CHECKPOINT_TIME, cp.checkpointTime)}")
         return Some(cp)
       } catch {
         case e: Exception =>
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 30bd30329283..94b695e6452e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -36,7 +36,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.input.FixedLengthBinaryInputFormat
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.SerializationDebugger
@@ -725,7 +725,8 @@ class StreamingContext private[streaming] (
 
   private def stopOnShutdown(): Unit = {
     val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN)
-    logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown 
hook")
+    logInfo(log"Invoking stop(stopGracefully=" +
+      log"${MDC(LogKeys.VALUE, stopGracefully)}) from shutdown hook")
     // Do not stop SparkContext, let its own shutdown hook stop it
     stop(stopSparkContext = false, stopGracefully = stopGracefully)
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 38f55f80657b..87d6a4909fdd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import scala.util.matching.Regex
 
 import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{FROM_TIME, SLIDE_DURATION, TO_TIME}
 import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
@@ -201,7 +201,8 @@ abstract class DStream[T: ClassTag] (
     // Set the checkpoint interval to be slideDuration or 10 seconds, which 
ever is larger
     if (mustCheckpoint && checkpointDuration == null) {
       checkpointDuration = slideDuration * math.ceil(Seconds(10) / 
slideDuration).toInt
-      logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
+      logInfo(log"Checkpoint interval automatically set to " +
+        log"${MDC(LogKeys.CHECKPOINT_INTERVAL, checkpointDuration)}")
     }
 
     // Set the minimum value of the rememberDuration if not already set
@@ -277,11 +278,11 @@ abstract class DStream[T: ClassTag] (
 
     dependencies.foreach(_.validateAtStart())
 
-    logInfo(s"Slide time = $slideDuration")
-    logInfo(s"Storage level = ${storageLevel.description}")
-    logInfo(s"Checkpoint interval = $checkpointDuration")
-    logInfo(s"Remember interval = $rememberDuration")
-    logInfo(s"Initialized and validated $this")
+    logInfo(log"Slide time = ${MDC(LogKeys.SLIDE_DURATION, slideDuration)}")
+    logInfo(log"Storage level = ${MDC(LogKeys.STORAGE_LEVEL, 
storageLevel.description)}")
+    logInfo(log"Checkpoint interval = ${MDC(LogKeys.CHECKPOINT_INTERVAL, 
checkpointDuration)}")
+    logInfo(log"Remember interval = ${MDC(LogKeys.INTERVAL, 
rememberDuration)}")
+    logInfo(log"Initialized and validated ${MDC(LogKeys.DSTREAM, this)}")
   }
 
   private[streaming] def setContext(s: StreamingContext): Unit = {
@@ -289,7 +290,7 @@ abstract class DStream[T: ClassTag] (
       throw new SparkException(s"Context must not be set again for $this")
     }
     ssc = s
-    logInfo(s"Set context for $this")
+    logInfo(log"Set context for ${MDC(LogKeys.STREAMING_CONTEXT, this)}")
     dependencies.foreach(_.setContext(ssc))
   }
 
@@ -304,7 +305,9 @@ abstract class DStream[T: ClassTag] (
   private[streaming] def remember(duration: Duration): Unit = {
     if (duration != null && (rememberDuration == null || duration > 
rememberDuration)) {
       rememberDuration = duration
-      logInfo(s"Duration for remembering RDDs set to $rememberDuration for 
$this")
+      logInfo(log"Duration for remembering RDDs set to " +
+        log"${MDC(LogKeys.DURATION, rememberDuration)} for " +
+        log"${MDC(LogKeys.DSTREAM, this.toString)}")
     }
     dependencies.foreach(_.remember(parentRememberDuration))
   }
@@ -314,8 +317,10 @@ abstract class DStream[T: ClassTag] (
     if (!isInitialized) {
       throw new SparkException (this.toString + " has not been initialized")
     } else if (time <= zeroTime || ! (time - 
zeroTime).isMultipleOf(slideDuration)) {
-      logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
-        s" , slideDuration is $slideDuration and difference is ${time - 
zeroTime}")
+      logInfo(log"Time ${MDC(LogKeys.TIME, time)} is invalid as zeroTime is " +
+        log"${MDC(LogKeys.ZERO_TIME, zeroTime)}, slideDuration is " +
+        log"${MDC(LogKeys.SLIDE_DURATION, slideDuration)} and difference is " +
+        log"${MDC(LogKeys.DURATION, time - zeroTime)}")
       false
     } else {
       logDebug(s"Time $time is valid")
@@ -353,7 +358,8 @@ abstract class DStream[T: ClassTag] (
           }
           if (checkpointDuration != null && (time - 
zeroTime).isMultipleOf(checkpointDuration)) {
             newRDD.checkpoint()
-            logInfo(s"Marking RDD ${newRDD.id} for time $time for 
checkpointing")
+            logInfo(log"Marking RDD ${MDC(LogKeys.RDD_ID, newRDD.id)} for time 
" +
+              log"${MDC(LogKeys.TIME, time)} for checkpointing")
           }
           generatedRDDs.put(time, newRDD)
         }
@@ -461,7 +467,8 @@ abstract class DStream[T: ClassTag] (
         // Explicitly remove blocks of BlockRDD
         rdd match {
           case b: BlockRDD[_] =>
-            logInfo(s"Removing blocks of RDD $b of time $time")
+            logInfo(log"Removing blocks of RDD ${MDC(LogKeys.RDD_ID, b)} " +
+              log"of time ${MDC(LogKeys.TIME, time)}")
             b.removeBlocks()
           case _ =>
         }
@@ -898,8 +905,10 @@ abstract class DStream[T: ClassTag] (
       fromTime.floor(slideDuration, zeroTime)
     }
 
-    logInfo(s"Slicing from $fromTime to $toTime" +
-      s" (aligned to $alignedFromTime and $alignedToTime)")
+    logInfo(log"Slicing from ${MDC(LogKeys.FROM_TIME, fromTime)} to " +
+      log"${MDC(LogKeys.TO_TIME, toTime)}" +
+      log" (aligned to ${MDC(LogKeys.ALIGNED_FROM_TIME, alignedFromTime)} and 
" +
+      log"${MDC(LogKeys.ALIGNED_TO_TIME, alignedToTime)})")
 
     alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
       if (time >= zeroTime) getOrCompute(time) else None
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 883d56c012f6..34b079219c99 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.receiver.Receiver
@@ -56,7 +56,7 @@ class SocketReceiver[T: ClassTag](
 
   def onStart(): Unit = {
 
-    logInfo(s"Connecting to $host:$port")
+    logInfo(log"Connecting to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, 
port)}")
     try {
       socket = new Socket(host, port)
     } catch {
@@ -64,7 +64,7 @@ class SocketReceiver[T: ClassTag](
         restart(s"Error connecting to $host:$port", e)
         return
     }
-    logInfo(s"Connected to $host:$port")
+    logInfo(log"Connected to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, 
port)}")
 
     // Start the thread that receives data over a connection
     new Thread("Socket Receiver") {
@@ -79,7 +79,7 @@ class SocketReceiver[T: ClassTag](
       if (socket != null) {
         socket.close()
         socket = null
-        logInfo(s"Closed socket to $host:$port")
+        logInfo(log"Closed socket to ${MDC(LogKeys.HOST, 
host)}:${MDC(LogKeys.PORT, port)}")
       }
     }
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 79bfd8674b44..7cc08b421f78 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -25,7 +25,7 @@ import scala.concurrent._
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{DELAY, ERROR, MESSAGE, STREAM_ID}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -145,10 +145,10 @@ private[streaming] abstract class ReceiverSupervisor(
   def startReceiver(): Unit = synchronized {
     try {
       if (onReceiverStart()) {
-        logInfo(s"Starting receiver $streamId")
+        logInfo(log"Starting receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
         receiverState = Started
         receiver.onStart()
-        logInfo(s"Called receiver $streamId onStart")
+        logInfo(log"Called receiver ${MDC(LogKeys.STREAM_ID, streamId)} 
onStart")
       } else {
         // The driver refused us
         stop("Registered unsuccessfully because Driver refused to start 
receiver " + streamId, None)
@@ -162,7 +162,8 @@ private[streaming] abstract class ReceiverSupervisor(
   /** Stop receiver */
   def stopReceiver(message: String, error: Option[Throwable]): Unit = 
synchronized {
     try {
-      logInfo("Stopping receiver with message: " + message + ": " + 
error.getOrElse(""))
+      logInfo(log"Stopping receiver with message: ${MDC(LogKeys.MESSAGE, 
message)}: " +
+        log"${MDC(LogKeys.ERROR, error.getOrElse(""))}")
       receiverState match {
         case Initialized =>
           logWarning("Skip stopping receiver because it has not yet stared")
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 47beb4521950..aafa99bd5285 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -28,7 +28,7 @@ import com.google.common.base.Throwables
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{ERROR, MESSAGE}
 import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.storage.StreamBlockId
@@ -85,7 +85,7 @@ private[streaming] class ReceiverSupervisorImpl(
           logDebug("Received delete old batch signal")
           cleanupOldBlocks(threshTime)
         case UpdateRateLimit(eps) =>
-          logInfo(s"Received a new rate limit: $eps.")
+          logInfo(log"Received a new rate limit: ${MDC(LogKeys.RATE_LIMIT, 
eps)}.")
           registeredBlockGenerators.asScala.foreach { bg =>
             bg.updateRate(eps)
           }
@@ -195,10 +195,10 @@ private[streaming] class ReceiverSupervisorImpl(
   }
 
   override protected def onReceiverStop(message: String, error: 
Option[Throwable]): Unit = {
-    logInfo("Deregistering receiver " + streamId)
+    logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
     val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
     trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, 
errorString))
-    logInfo("Stopped receiver " + streamId)
+    logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
   }
 
   override def createBlockGenerator(
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 5aa2a9df3ba8..903cde8082db 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -21,7 +21,7 @@ package org.apache.spark.streaming.scheduler
 import scala.util.Random
 
 import org.apache.spark.{ExecutorAllocationClient, SparkConf}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.internal.config.Streaming._
 import org.apache.spark.resource.ResourceProfile
@@ -75,8 +75,10 @@ private[streaming] class ExecutorAllocationManager(
 
   def start(): Unit = {
     timer.start()
-    logInfo(s"ExecutorAllocationManager started with " +
-      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = 
$scalingIntervalSecs sec")
+    logInfo(log"ExecutorAllocationManager started with ratios = " +
+      log"[${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " +
+      log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}] and interval = 
" +
+      log"${MDC(LogKeys.INTERVAL, scalingIntervalSecs)} sec")
   }
 
   def stop(): Unit = {
@@ -89,11 +91,14 @@ private[streaming] class ExecutorAllocationManager(
    * batch statistics.
    */
   private def manageAllocation(): Unit = synchronized {
-    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, 
$scalingDownRatio]")
+    logInfo(log"Managing executor allocation with ratios = [" +
+      log"${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " +
+      log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}]")
     if (batchProcTimeCount > 0) {
       val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
       val ratio = averageBatchProcTime.toDouble / batchDurationMs
-      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
+      logInfo(log"Average: ${MDC(LogKeys.AVG_BATCH_PROC_TIME, 
averageBatchProcTime)}, " +
+        log"ratio = ${MDC(LogKeys.RATIO, ratio)}")
       if (ratio >= scalingUpRatio) {
         logDebug("Requesting executors")
         val numNewExecutors = math.max(math.round(ratio).toInt, 1)
@@ -119,7 +124,8 @@ private[streaming] class ExecutorAllocationManager(
       Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> targetTotalExecutors),
       Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0),
       Map.empty)
-    logInfo(s"Requested total $targetTotalExecutors executors")
+    logInfo(log"Requested total ${MDC(LogKeys.NUM_EXECUTORS,
+      targetTotalExecutors)} executors")
   }
 
   /** Kill an executor that is not running any receiver, if possible */
@@ -129,7 +135,9 @@ private[streaming] class ExecutorAllocationManager(
 
     if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
       val execIdsWithReceivers = 
receiverTracker.allocatedExecutors().values.flatten.toSeq
-      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): 
${execIdsWithReceivers}")
+      logInfo(log"Executors with receivers (${MDC(LogKeys.NUM_EXECUTORS,
+        execIdsWithReceivers.size)}): " +
+        log"${MDC(LogKeys.EXECUTOR_IDS, execIdsWithReceivers)}")
 
       val removableExecIds = allExecIds.diff(execIdsWithReceivers)
       logDebug(s"Removable executors (${removableExecIds.size}): 
${removableExecIds}")
@@ -142,7 +150,7 @@ private[streaming] class ExecutorAllocationManager(
         } else {
           client.killExecutor(execIdToRemove)
         }
-        logInfo(s"Requested to kill executor $execIdToRemove")
+        logInfo(log"Requested to kill executor ${MDC(LogKeys.EXECUTOR_ID, 
execIdToRemove)}")
       } else {
         logInfo(s"No non-receiver executors to kill")
       }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
index 639ac6de4f5d..bd9ea7b5a268 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.streaming.{StreamingContext, Time}
 
 /**
@@ -82,7 +82,8 @@ private[streaming] class InputInfoTracker(ssc: 
StreamingContext) extends Logging
   /** Cleanup the tracked input information older than threshold batch time */
   def cleanup(batchThreshTime: Time): Unit = synchronized {
     val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
-    logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
+    logInfo(log"remove old batch metadata: " +
+      log"${MDC(LogKeys.DURATION, timesToCleanup.mkString(" "))}")
     batchTimeToInputInfos --= timesToCleanup
   }
 }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index f2700737384b..7fb35a04be6d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{RECEIVED_BLOCK_INFO, 
RECEIVED_BLOCK_TRACKER_LOG_EVENT}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.streaming.Time
@@ -127,7 +127,9 @@ private[streaming] class ReceivedBlockTracker(
         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
         lastAllocatedBatchTime = batchTime
       } else {
-        logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
+        logInfo(log"Possibly processed batch ${MDC(LogKeys.BATCH_TIMESTAMP,
+          batchTime)} needs to be " +
+          log"processed again in WAL recovery")
       }
     } else {
       // This situation occurs when:
@@ -137,7 +139,9 @@ private[streaming] class ReceivedBlockTracker(
       // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
       // lastAllocatedBatchTime.
       // This situation will only occurs in recovery time.
-      logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
+      logInfo(log"Possibly processed batch ${MDC(LogKeys.BATCH_TIMESTAMP,
+        batchTime)} needs to be processed " +
+        log"again in WAL recovery")
     }
   }
 
@@ -175,7 +179,7 @@ private[streaming] class ReceivedBlockTracker(
   def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): 
Unit = synchronized {
     require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < 
cleanupThreshTime }.toSeq
-    logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
+    logInfo(log"Deleting batches: ${MDC(LogKeys.DURATION, 
timesToCleanup.mkString(" "))}")
     if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
       timeToAllocatedBlocks --= timesToCleanup
       writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, 
waitForCompletion))
@@ -221,9 +225,10 @@ private[streaming] class ReceivedBlockTracker(
     }
 
     writeAheadLogOption.foreach { writeAheadLog =>
-      logInfo(s"Recovering from write ahead logs in 
${checkpointDirOption.get}")
+      logInfo(log"Recovering from write ahead logs in " +
+        log"${MDC(LogKeys.PATH, checkpointDirOption.get)}")
       writeAheadLog.readAll().asScala.foreach { byteBuffer =>
-        logInfo("Recovering record " + byteBuffer)
+        logInfo(log"Recovering record ${MDC(LogKeys.BYTE_BUFFER, byteBuffer)}")
         Utils.deserialize[ReceivedBlockTrackerLogEvent](
           JavaUtils.bufferToArray(byteBuffer), 
Thread.currentThread().getContextClassLoader) match {
           case BlockAdditionEvent(receivedBlockInfo) =>
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 48273b3b593c..a37ba04c1012 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext
 import scala.util.{Failure, Success}
 
 import org.apache.spark._
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{ERROR, MESSAGE, RECEIVER_ID, 
RECEIVER_IDS, STREAM_ID}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc._
@@ -232,7 +232,8 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 
     // Signal the receivers to delete old block data
     if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
-      logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
+      logInfo(log"Cleanup old received batch data: " +
+        log"${MDC(LogKeys.CLEANUP_LOCAL_DIRS, cleanupThreshTime)}")
       synchronized {
         if (isTrackerStarted) {
           endpoint.send(CleanupOldBlocks(cleanupThreshTime))
@@ -306,7 +307,8 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         endpoint = Some(receiverEndpoint))
       receiverTrackingInfos.put(streamId, receiverTrackingInfo)
       
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
-      logInfo("Registered receiver for stream " + streamId + " from " + 
senderAddress)
+      logInfo(log"Registered receiver for stream ${MDC(LogKeys.STREAM_ID, 
streamId)} " +
+        log"from ${MDC(LogKeys.RPC_ADDRESS, senderAddress)}")
       true
     }
   }
@@ -447,7 +449,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 
     runDummySparkJob()
 
-    logInfo("Starting " + receivers.length + " receivers")
+    logInfo(log"Starting ${MDC(LogKeys.NUM_RECEIVERS, receivers.length)} 
receivers")
     endpoint.send(StartAllReceivers(receivers.toImmutableArraySeq))
   }
 
@@ -625,7 +627,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
           if (!shouldStartReceiver) {
             onReceiverJobFinish(receiverId)
           } else {
-            logInfo(s"Restarting Receiver $receiverId")
+            logInfo(log"Restarting Receiver ${MDC(LogKeys.STREAM_ID, 
receiverId)}")
             self.send(RestartReceiver(receiver))
           }
         case Failure(e) =>
@@ -633,11 +635,11 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
             onReceiverJobFinish(receiverId)
           } else {
             logError("Receiver has been stopped. Try to restart it.", e)
-            logInfo(s"Restarting Receiver $receiverId")
+            logInfo(log"Restarting Receiver ${MDC(LogKeys.STREAM_ID, 
receiverId)}")
             self.send(RestartReceiver(receiver))
           }
       }(ThreadUtils.sameThread)
-      logInfo(s"Receiver ${receiver.streamId} started")
+      logInfo(log"Receiver ${MDC(LogKeys.STREAM_ID, receiver.streamId)} 
started")
     }
 
     override def onStop(): Unit = {
@@ -660,7 +662,8 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
     /** Send stop signal to the receivers. */
     private def stopReceivers(): Unit = {
       receiverTrackingInfos.values.flatMap(_.endpoint).foreach { 
_.send(StopReceiver) }
-      logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " 
receivers")
+      logInfo(log"Sent stop signal to all " +
+        log"${MDC(LogKeys.NUM_RECEIVERS, receiverTrackingInfos.size)} 
receivers")
     }
   }
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
index dc02062b9eb4..1b05a6ac30cc 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.scheduler.rate
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 
 /**
  * Implements a proportional-integral-derivative (PID) controller which acts on
@@ -74,8 +74,11 @@ private[streaming] class PIDRateEstimator(
     minRate > 0,
     s"Minimum rate in PIDRateEstimator should be > 0")
 
-  logInfo(s"Created PIDRateEstimator with proportional = $proportional, 
integral = $integral, " +
-    s"derivative = $derivative, min rate = $minRate")
+  logInfo(log"Created PIDRateEstimator with proportional = " +
+    log"${MDC(LogKeys.PROPORTIONAL, proportional)}, integral = " +
+    log"${MDC(LogKeys.INTEGRAL, integral)}, derivative = " +
+    log"${MDC(LogKeys.DERIVATIVE, derivative)}, min rate = " +
+    log"${MDC(LogKeys.MIN_RATE, minRate)}")
 
   def compute(
       time: Long, // in milliseconds
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
index 5dcdd573c744..8befe53efffa 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
@@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.RECORDS
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -122,7 +122,8 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: 
WriteAheadLog, conf: Sp
    * Stop the batched writer thread, fulfill promises with failures and close 
the wrapped WAL.
    */
   override def close(): Unit = {
-    logInfo(s"BatchedWriteAheadLog shutting down at time: 
${System.currentTimeMillis()}.")
+    logInfo(log"BatchedWriteAheadLog shutting down at time: " +
+      log"${MDC(LogKeys.TIME, System.currentTimeMillis())}.")
     if (!active.getAndSet(false)) return
     batchedWriterThread.interrupt()
     batchedWriterThread.join()
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 58a6b929a81f..d90095c73785 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{NUM_RETRY, WRITE_AHEAD_LOG_INFO}
 import org.apache.spark.util.{CompletionIterator, ThreadUtils}
 import org.apache.spark.util.ArrayImplicits._
@@ -137,7 +137,8 @@ private[streaming] class FileBasedWriteAheadLog(
    */
   def readAll(): JIterator[ByteBuffer] = synchronized {
     val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
-    logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+    logInfo(log"Reading from the logs:\n" +
+      log"${MDC(LogKeys.PATHS, logFilesToRead.mkString("\n"))}")
     def readFile(file: String): Iterator[ByteBuffer] = {
       logDebug(s"Creating log reader with $file")
       val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
@@ -170,8 +171,11 @@ private[streaming] class FileBasedWriteAheadLog(
       pastLogs --= expiredLogs
       expiredLogs
     }
-    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in 
$logDirectory " +
-      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+    logInfo(log"Attempting to clear ${MDC(LogKeys.NUM_RECORDS_READ, 
oldLogFiles.size)} " +
+      log"old log files in " +
+      log"${MDC(LogKeys.PATH, logDirectory)} older than " +
+      log"${MDC(LogKeys.THRESHOLD, threshTime)}: " +
+      log"${MDC(LogKeys.FILES, oldLogFiles.map(_.path).mkString("\n"))}")
 
     def deleteFile(walInfo: LogInfo): Unit = {
       try {
@@ -184,7 +188,8 @@ private[streaming] class FileBasedWriteAheadLog(
           logWarning(log"Error clearing write ahead log file " +
             log"${MDC(WRITE_AHEAD_LOG_INFO, walInfo)}", ex)
       }
-      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+      logInfo(log"Cleared log files in ${MDC(LogKeys.PATH, logDirectory)} 
older than " +
+        log"${MDC(LogKeys.THRESH_TIME, threshTime)}")
     }
     oldLogFiles.foreach { logInfo =>
       if (!executionContext.isShutdown) {
@@ -252,7 +257,9 @@ private[streaming] class FileBasedWriteAheadLog(
           fileSystem.listStatus(logDirectoryPath).map { _.getPath 
}.toImmutableArraySeq)
         pastLogs.clear()
         pastLogs ++= logFileInfo
-        logInfo(s"Recovered ${logFileInfo.size} write ahead log files from 
$logDirectory")
+        logInfo(log"Recovered ${MDC(LogKeys.NUM_FILES, logFileInfo.size)} " +
+          log"write ahead log files from " +
+          log"${MDC(LogKeys.PATH, logDirectory)}")
         logDebug(s"Recovered files 
are:\n${logFileInfo.map(_.path).mkString("\n")}")
       }
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to