This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 89ca8b6065e
[SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`
89ca8b6065e is described below
commit 89ca8b6065e9f690a492c778262080741d50d94d
Author: yangjie01 <[email protected]>
AuthorDate: Sun Oct 29 09:19:30 2023 -0500
[SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`
### What changes were proposed in this pull request?
This pr replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`
due to `s.c.MapOps.mapValues` marked as deprecated since Scala 2.13.0:
https://github.com/scala/scala/blob/bf45e199e96383b96a6955520d7d2524c78e6e12/src/library/scala/collection/Map.scala#L256-L262
```scala
deprecated("Use .view.mapValues(f). A future version will include a
strict version of this method (for now, .view.mapValues(f).toMap).", "2.13.0")
def mapValues[W](f: V => W): MapView[K, W] = new MapView.MapValues(this,
f)
```
### Why are the changes needed?
Cleanup deprecated API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Acitons
- Packaged the client, manually tested
`DFSReadWriteTest/MiniReadWriteTest/PowerIterationClusteringExample`.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43448 from LuciferYang/SPARK-45605.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/util/sketch/CountMinSketchSuite.scala | 2 +-
.../org/apache/spark/sql/avro/AvroUtils.scala | 1 +
.../scala/org/apache/spark/sql/SparkSession.scala | 2 +-
.../spark/sql/ClientDataFrameStatSuite.scala | 2 +-
.../org/apache/spark/sql/connect/dsl/package.scala | 2 +-
.../sql/connect/planner/SparkConnectPlanner.scala | 13 ++++++----
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 3 ++-
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 2 +-
.../streaming/kafka010/ConsumerStrategy.scala | 6 ++---
.../kafka010/DirectKafkaInputDStream.scala | 2 +-
.../kafka010/DirectKafkaStreamSuite.scala | 2 +-
.../spark/streaming/kafka010/KafkaTestUtils.scala | 2 +-
.../spark/streaming/kinesis/KinesisTestUtils.scala | 2 +-
.../kinesis/KPLBasedKinesisTestUtils.scala | 2 +-
.../kinesis/KinesisBackedBlockRDDSuite.scala | 4 +--
.../spark/sql/protobuf/utils/ProtobufUtils.scala | 1 +
.../org/apache/spark/api/java/JavaPairRDD.scala | 4 +--
.../apache/spark/api/java/JavaSparkContext.scala | 2 +-
.../spark/api/python/PythonWorkerFactory.scala | 2 +-
.../apache/spark/scheduler/InputFormatInfo.scala | 2 +-
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 2 +-
...plicationEnvironmentInfoWrapperSerializer.scala | 5 ++--
.../ExecutorSummaryWrapperSerializer.scala | 3 ++-
.../status/protobuf/JobDataWrapperSerializer.scala | 2 +-
.../protobuf/StageDataWrapperSerializer.scala | 6 ++---
.../org/apache/spark/SparkThrowableSuite.scala | 2 +-
.../apache/spark/rdd/PairRDDFunctionsSuite.scala | 2 +-
.../test/scala/org/apache/spark/rdd/RDDSuite.scala | 1 +
.../scheduler/ExecutorResourceInfoSuite.scala | 1 +
.../BlockManagerDecommissionIntegrationSuite.scala | 2 +-
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 2 +-
.../util/collection/ExternalSorterSuite.scala | 2 +-
.../apache/spark/examples/DFSReadWriteTest.scala | 1 +
.../apache/spark/examples/MiniReadWriteTest.scala | 1 +
.../mllib/PowerIterationClusteringExample.scala | 2 +-
.../spark/graphx/lib/ShortestPathsSuite.scala | 2 +-
.../spark/ml/evaluation/ClusteringMetrics.scala | 1 +
.../apache/spark/ml/feature/VectorIndexer.scala | 2 +-
.../org/apache/spark/ml/feature/Word2Vec.scala | 2 +-
.../apache/spark/ml/tree/impl/RandomForest.scala | 4 +--
.../spark/mllib/clustering/BisectingKMeans.scala | 2 +-
.../mllib/linalg/distributed/BlockMatrix.scala | 4 +--
.../apache/spark/mllib/stat/test/ChiSqTest.scala | 1 +
.../apache/spark/ml/recommendation/ALSSuite.scala | 8 +++---
.../apache/spark/mllib/feature/Word2VecSuite.scala | 12 ++++-----
.../org/apache/spark/sql/types/Metadata.scala | 2 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++-
.../catalyst/catalog/ExternalCatalogUtils.scala | 2 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 2 +-
.../spark/sql/catalyst/expressions/package.scala | 2 +-
.../catalyst/optimizer/NestedColumnAliasing.scala | 2 +-
.../spark/sql/catalyst/optimizer/Optimizer.scala | 1 +
.../spark/sql/catalyst/parser/AstBuilder.scala | 2 +-
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 4 +--
.../spark/sql/catalyst/util/ToNumberParser.scala | 2 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 1 +
.../scala/org/apache/spark/sql/SparkSession.scala | 2 +-
.../command/AnalyzePartitionCommand.scala | 2 +-
.../execution/datasources/PartitioningUtils.scala | 2 +-
.../execution/datasources/orc/OrcFiltersBase.scala | 1 +
.../datasources/parquet/ParquetFilters.scala | 1 +
.../execution/exchange/EnsureRequirements.scala | 1 +
.../sql/execution/streaming/ProgressReporter.scala | 12 ++++-----
.../sql/execution/streaming/state/RocksDB.scala | 4 +--
.../execution/streaming/statefulOperators.scala | 2 +-
.../spark/sql/execution/ui/AllExecutionsPage.scala | 6 ++---
.../org/apache/spark/sql/DataFrameStatSuite.scala | 2 +-
.../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +-
.../test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +-
.../spark/sql/execution/SQLViewTestSuite.scala | 2 +-
.../command/AlterTableDropPartitionSuiteBase.scala | 2 +-
.../datasources/parquet/ParquetIOSuite.scala | 2 +-
.../sql/execution/metric/SQLMetricsTestUtils.scala | 4 +--
.../execution/ui/SQLAppStatusListenerSuite.scala | 30 +++++++++++-----------
.../StreamingQueryStatusAndProgressSuite.scala | 6 ++---
.../sql/streaming/continuous/ContinuousSuite.scala | 2 +-
.../spark/sql/hive/HiveExternalCatalog.scala | 2 +-
.../api/java/JavaStreamingListenerWrapper.scala | 4 +--
.../scheduler/ReceiverSchedulingPolicy.scala | 2 +-
.../streaming/scheduler/ReceiverTracker.scala | 2 +-
.../apache/spark/streaming/ui/BatchUIData.scala | 2 +-
.../ui/StreamingJobProgressListener.scala | 5 ++--
83 files changed, 140 insertions(+), 119 deletions(-)
diff --git
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
index 087dae26047..689452caa32 100644
---
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
+++
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
@@ -56,7 +56,7 @@ class CountMinSketchSuite extends AnyFunSuite { //
scalastyle:ignore funsuite
val exactFreq = {
val sampledItems = sampledItemIndices.map(allItems)
- sampledItems.groupBy(identity).mapValues(_.length.toLong)
+ sampledItems.groupBy(identity).view.mapValues(_.length.toLong)
}
val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 6a1655a91c9..e738f541ca7 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -242,6 +242,7 @@ private[sql] object AvroUtils extends Logging {
private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray
private[this] val fieldMap = avroSchema.getFields.asScala
.groupBy(_.name.toLowerCase(Locale.ROOT))
+ .view
.mapValues(_.toSeq) // toSeq needed for scala 2.13
/** The fields which have matching equivalents in both Avro and Catalyst
schemas. */
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5e640bea570..78daaa5d3f5 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -306,7 +306,7 @@ class SparkSession private[sql] (
proto.SqlCommand
.newBuilder()
.setSql(sqlText)
-
.putAllNamedArguments(args.asScala.mapValues(lit(_).expr).toMap.asJava)))
+
.putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
// .toBuffer forces that the iterator is consumed and closed
val responseSeq = client.execute(plan.build()).toBuffer.toSeq
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
index 069d8ec502f..747ca45d10d 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
@@ -120,7 +120,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession {
val columnNames = crosstab.schema.fieldNames
assert(columnNames(0) === "a_b")
// reduce by key
- val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length)
+ val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length)
val rows = crosstab.collect()
rows.foreach { row =>
val i = row.getString(0).toInt
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 7c41491ba06..24fa2324f66 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -361,7 +361,7 @@ package object dsl {
}
def fillValueMap(valueMap: Map[String, Any]): Relation = {
- val (cols, values) = valueMap.mapValues(toLiteralProto).toSeq.unzip
+ val (cols, values) =
valueMap.view.mapValues(toLiteralProto).toSeq.unzip
Relation
.newBuilder()
.setFillNa(
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index f5d83b81999..17c10e63301 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -287,11 +287,11 @@ class SparkConnectPlanner(
if (!namedArguments.isEmpty) {
NameParameterizedQuery(
parsedPlan,
- namedArguments.asScala.mapValues(transformExpression).toMap)
+ namedArguments.asScala.view.mapValues(transformExpression).toMap)
} else if (!posArguments.isEmpty) {
PosParameterizedQuery(parsedPlan,
posArguments.asScala.map(transformExpression).toSeq)
} else if (!args.isEmpty) {
- NameParameterizedQuery(parsedPlan,
args.asScala.mapValues(transformLiteral).toMap)
+ NameParameterizedQuery(parsedPlan,
args.asScala.view.mapValues(transformLiteral).toMap)
} else if (!posArgs.isEmpty) {
PosParameterizedQuery(parsedPlan,
posArgs.asScala.map(transformLiteral).toSeq)
} else {
@@ -2518,7 +2518,7 @@ class SparkConnectPlanner(
val df = if (!namedArguments.isEmpty) {
session.sql(
getSqlCommand.getSql,
- namedArguments.asScala.mapValues(e =>
Column(transformExpression(e))).toMap,
+ namedArguments.asScala.view.mapValues(e =>
Column(transformExpression(e))).toMap,
tracker)
} else if (!posArguments.isEmpty) {
session.sql(
@@ -2526,7 +2526,10 @@ class SparkConnectPlanner(
posArguments.asScala.map(e => Column(transformExpression(e))).toArray,
tracker)
} else if (!args.isEmpty) {
- session.sql(getSqlCommand.getSql,
args.asScala.mapValues(transformLiteral).toMap, tracker)
+ session.sql(
+ getSqlCommand.getSql,
+ args.asScala.view.mapValues(transformLiteral).toMap,
+ tracker)
} else if (!posArgs.isEmpty) {
session.sql(getSqlCommand.getSql,
posArgs.asScala.map(transformLiteral).toArray, tracker)
} else {
@@ -3262,7 +3265,7 @@ class SparkConnectPlanner(
proto.GetResourcesCommandResult
.newBuilder()
.putAllResources(
- session.sparkContext.resources
+ session.sparkContext.resources.view
.mapValues(resource =>
proto.ResourceInformation
.newBuilder()
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 4e8da137a47..54e05ff95c9 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -2294,7 +2294,8 @@ abstract class KafkaSourceSuiteBase extends
KafkaSourceTest {
Execute { q =>
// wait to reach the last offset in every partition
q.awaitOffset(0,
- KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap),
streamingTimeout.toMillis)
+ KafkaSourceOffset(partitionOffsets.view.mapValues(_ => 3L).toMap),
+ streamingTimeout.toMillis)
},
CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
StopStream,
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c54afc6290b..1624f7320bb 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -377,7 +377,7 @@ class KafkaTestUtils(
}
def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
-
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).mapValues(_.size).toSeq
+
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).view.mapValues(_.size).toSeq
}
/** Create a Kafka topic and wait until it is propagated to the whole
cluster */
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index b96a2597f5d..1dd66675b91 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -241,7 +241,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition,
jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
+ new ju.HashMap[TopicPartition,
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
}
/**
@@ -320,7 +320,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition,
jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
+ new ju.HashMap[TopicPartition,
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
}
/**
@@ -404,7 +404,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition,
jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
+ new ju.HashMap[TopicPartition,
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
}
/**
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index d8037269961..f5967a74ad3 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(
- currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava)
+ currentOffsets.view.mapValues(l =>
java.lang.Long.valueOf(l)).toMap.asJava)
}
kc
}
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 7b2cac4a68b..faf114108fa 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -726,7 +726,7 @@ class DirectKafkaStreamSuite
/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time,
Array[OffsetRange])] = {
- kafkaStream.generatedRDDs.mapValues { rdd =>
+ kafkaStream.generatedRDDs.view.mapValues { rdd =>
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}.toSeq.sortBy { _._1 }
}
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 6a9ef52e990..66c253172e7 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -206,7 +206,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
/** Java-friendly function for sending messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
- sendMessages(topic,
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ sendMessages(topic,
Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*))
}
/** Send the messages to the Kafka broker */
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 50dfd50aa23..10d24df7b61 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator(
sentSeqNumbers += ((num, seqNumber))
}
- shardIdToSeqNumbers.mapValues(_.toSeq).toMap
+ shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
}
}
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index f95bee1d98f..2799965a8fc 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String)
extends KinesisDataG
Futures.addCallback(future, kinesisCallBack,
ThreadUtils.sameThreadExecutorService())
}
producer.flushSync()
- shardIdToSeqNumbers.mapValues(_.toSeq).toMap
+ shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
}
}
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 4b3b7454b86..c9d1498a5a4 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData:
Boolean)
require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to
multiple shards")
shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
- shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap
- shardIdToSeqNumbers =
shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap
+ shardIdToData =
shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._1)).toMap
+ shardIdToSeqNumbers =
shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._2)).toMap
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
val seqNumRange = SequenceNumberRange(
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last,
seqNumbers.size)
diff --git
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index 2388668f66a..6c18a8863af 100644
---
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -75,6 +75,7 @@ private[sql] object ProtobufUtils extends Logging {
private[this] val protoFieldArray = descriptor.getFields.asScala.toArray
private[this] val fieldMap = descriptor.getFields.asScala
.groupBy(_.getName.toLowerCase(Locale.ROOT))
+ .view
.mapValues(_.toSeq) // toSeq needed for scala 2.13
/** The fields which have matching equivalents in both Protobuf and
Catalyst schemas. */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 7d11e4b157a..aa8e1b1520e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -147,7 +147,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(
withReplacement,
- fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double;
toMap to serialize
+ fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala
Double; toMap to serialize
seed))
/**
@@ -179,7 +179,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKeyExact(
withReplacement,
- fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double;
toMap to serialize
+ fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala
Double; toMap to serialize
seed))
/**
diff --git
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 6ead36971a9..c016910ed76 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -782,7 +782,7 @@ class JavaSparkContext(val sc: SparkContext) extends
Closeable {
* @note This does not necessarily mean the caching or computation was
successful.
*/
def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = {
- sc.getPersistentRDDs.mapValues(s => JavaRDD.fromRDD(s)).toMap
+ sc.getPersistentRDDs.view.mapValues(s => JavaRDD.fromRDD(s)).toMap
.asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]]
}
diff --git
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 2c8f4f2ca2a..14265f03795 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -369,7 +369,7 @@ private[spark] class PythonWorkerFactory(
daemon = null
daemonPort = 0
} else {
- simpleWorkers.mapValues(_.destroy())
+ simpleWorkers.view.mapValues(_.destroy())
}
}
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 4b3fd580341..e04c5b2de7d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -178,6 +178,6 @@ object InputFormatInfo {
}
}
- nodeToSplit.mapValues(_.toSet).toMap
+ nodeToSplit.view.mapValues(_.toSet).toMap
}
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d00578e73e9..41f6b3ad64b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -155,7 +155,7 @@ private[spark] class TaskSchedulerImpl(
.build[String, ExecutorDecommissionState]()
def runningTasksByExecutors: Map[String, Int] = synchronized {
- executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
+ executorIdToRunningTaskIds.toMap.view.mapValues(_.size).toMap
}
// The set of executors we have on each host; this is used to compute
hostsAlive, which
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index dd53757fe85..c49b2411e76 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -736,7 +736,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
- executorDataMap.mapValues(v => v.registrationTs).toMap
+ executorDataMap.view.mapValues(v => v.registrationTs).toMap
}
override def isExecutorActive(id: String): Boolean = synchronized {
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index baedccd9b92..4ee864f00e8 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -130,9 +130,10 @@ private[protobuf] class
ApplicationEnvironmentInfoWrapperSerializer
new ResourceProfileInfo(
id = info.getId,
executorResources =
-
info.getExecutorResourcesMap.asScala.mapValues(deserializeExecutorResourceRequest).toMap,
+
info.getExecutorResourcesMap.asScala.view.mapValues(deserializeExecutorResourceRequest)
+ .toMap,
taskResources =
-
info.getTaskResourcesMap.asScala.mapValues(deserializeTaskResourceRequest).toMap)
+
info.getTaskResourcesMap.asScala.view.mapValues(deserializeTaskResourceRequest).toMap)
}
private def deserializeExecutorResourceRequest(info:
StoreTypes.ExecutorResourceRequest):
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 97daf37995d..188187c9b75 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -137,7 +137,8 @@ private[protobuf] class ExecutorSummaryWrapperSerializer
blacklistedInStages =
binary.getBlacklistedInStagesList.asScala.map(_.toInt).toSet,
peakMemoryMetrics = peakMemoryMetrics,
attributes = binary.getAttributesMap.asScala.toMap,
- resources =
binary.getResourcesMap.asScala.mapValues(deserializeResourceInformation).toMap,
+ resources =
+
binary.getResourcesMap.asScala.view.mapValues(deserializeResourceInformation).toMap,
resourceProfileId = binary.getResourceProfileId,
isExcluded = binary.getIsExcluded,
excludedInStages =
binary.getExcludedInStagesList.asScala.map(_.toInt).toSet)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 41d1fee1608..5252b8b8c01 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -107,6 +107,6 @@ private[protobuf] class JobDataWrapperSerializer extends
ProtobufSerDe[JobDataWr
numCompletedStages = info.getNumCompletedStages,
numSkippedStages = info.getNumSkippedStages,
numFailedStages = info.getNumFailedStages,
- killedTasksSummary =
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
+ killedTasksSummary =
info.getKillTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index 8793ca7a12c..4fbaff0327d 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -382,7 +382,7 @@ private[protobuf] class StageDataWrapperSerializer extends
ProtobufSerDe[StageDa
new StageDataWrapper(
info = info,
jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet,
- locality = binary.getLocalityMap.asScala.mapValues(_.toLong).toMap
+ locality = binary.getLocalityMap.asScala.view.mapValues(_.toLong).toMap
)
}
@@ -402,7 +402,7 @@ private[protobuf] class StageDataWrapperSerializer extends
ProtobufSerDe[StageDa
entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
} else None
val executorSummary = if
(MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) {
- Some(binary.getExecutorSummaryMap.asScala.mapValues(
+ Some(binary.getExecutorSummaryMap.asScala.view.mapValues(
ExecutorStageSummarySerializer.deserialize).toMap)
} else None
val speculationSummary =
@@ -475,7 +475,7 @@ private[protobuf] class StageDataWrapperSerializer extends
ProtobufSerDe[StageDa
tasks = tasks,
executorSummary = executorSummary,
speculationSummary = speculationSummary,
- killedTasksSummary =
binary.getKilledTasksSummaryMap.asScala.mapValues(_.toInt).toMap,
+ killedTasksSummary =
binary.getKilledTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap,
resourceProfileId = binary.getResourceProfileId,
peakExecutorMetrics = peakExecutorMetrics,
taskMetricsDistributions = taskMetricsDistributions,
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index fb82714949b..9f32d81f1ae 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -65,7 +65,7 @@ class SparkThrowableSuite extends SparkFunSuite {
}
def checkIfUnique(ss: Seq[Any]): Unit = {
- val dups = ss.groupBy(identity).mapValues(_.size).filter(_._2 >
1).keys.toSeq
+ val dups = ss.groupBy(identity).view.mapValues(_.size).filter(_._2 >
1).keys.toSeq
assert(dups.isEmpty, s"Duplicate error classes: ${dups.mkString(", ")}")
}
diff --git
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index a669993352f..c30b4ca4dae 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -805,7 +805,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with
SharedSparkContext {
seed: Long,
n: Long): Unit = {
val trials = stratifiedData.countByKey()
- val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
+ val expectedSampleSize =
stratifiedData.countByKey().view.mapValues(count =>
math.ceil(count * samplingRate).toInt)
val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
val sample = if (exact) {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 046017af3a3..f925a8b8b71 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -1254,6 +1254,7 @@ class RDDSuite extends SparkFunSuite with
SharedSparkContext with Eventually {
.getPartitions
.map(coalescedRDD.getPreferredLocations(_).head)
.groupBy(identity)
+ .view
.mapValues(_.size)
// Make sure the coalesced partitions are distributed fairly evenly
between the two locations.
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
index e392ff53e02..3f99e2b4598 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
@@ -101,6 +101,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
// assert that each address was assigned `slots` times
info.assignedAddrs
.groupBy(identity)
+ .view
.mapValues(_.size)
.foreach(x => assert(x._2 == slots))
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 5ab9f644be6..59ebc750af9 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -299,7 +299,7 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
val blockLocs = rddUpdates.map { update =>
(update.blockUpdatedInfo.blockId.name,
update.blockUpdatedInfo.blockManagerId)}
- val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
+ val blocksToManagers = blockLocs.groupBy(_._1).view.mapValues(_.size)
assert(blocksToManagers.exists(_._2 > 1),
s"We should have a block that has been on multiple BMs in rdds:\n
${rddUpdates} from:\n" +
s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index c16dae77b83..769939a0a22 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -988,7 +988,7 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer()
)
- configureMockTransfer(blocks.mapValues(_ =>
createMockManagedBuffer(0)).toMap)
+ configureMockTransfer(blocks.view.mapValues(_ =>
createMockManagedBuffer(0)).toMap)
val iterator = createShuffleBlockIteratorWithDefaults(
Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0))
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 7bec9612187..02cc2bb35af 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -544,7 +544,7 @@ class ExternalSorterSuite extends SparkFunSuite with
LocalSparkContext {
val expected = (0 until 3).map { p =>
var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k
% 3 == p }.toSet
if (withPartialAgg) {
- v = v.groupBy(_._1).mapValues { s => s.map(_._2).sum }.toSet
+ v = v.groupBy(_._1).view.mapValues { s => s.map(_._2).sum }.toSet
}
(p, v.toSet)
}.toSet
diff --git
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 5dfb8ec898c..4cad0f16426 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -89,6 +89,7 @@ object DFSReadWriteTest {
.flatMap(_.split("\t"))
.filter(_.nonEmpty)
.groupBy(w => w)
+ .view
.mapValues(_.size)
.values
.sum
diff --git
a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
index c003dc8ba38..9095c9b75af 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
@@ -81,6 +81,7 @@ object MiniReadWriteTest {
.flatMap(_.split("\t"))
.filter(_.nonEmpty)
.groupBy(w => w)
+ .view
.mapValues(_.size)
.values
.sum
diff --git
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index 2a77702e223..d26c9a3a056 100644
---
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -101,7 +101,7 @@ object PowerIterationClusteringExample {
.setInitializationMode("degree")
.run(circlesRdd)
- val clusters =
model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
+ val clusters =
model.assignments.collect().groupBy(_.cluster).view.mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
diff --git
a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
index 527187d51ae..1f158808215 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
@@ -34,7 +34,7 @@ class ShortestPathsSuite extends SparkFunSuite with
LocalSparkContext {
val graph = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks).vertices.collect().map
{
- case (v, spMap) => (v, spMap.mapValues(i => i).toMap)
+ case (v, spMap) => (v, spMap.view.mapValues(i => i).toMap)
}
assert(results.toSet === shortestPaths)
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
index b8563bed601..ee87f49806a 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
@@ -331,6 +331,7 @@ private[evaluation] object SquaredEuclideanSilhouette
extends Silhouette {
clustersStatsRDD
.collectAsMap()
+ .view
.mapValues {
case (featureSum: DenseVector, squaredNormSum: Double, weightSum:
Double) =>
SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum,
weightSum)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index cf1751f86f9..ff997194b42 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -300,7 +300,7 @@ class VectorIndexerModel private[ml] (
/** Java-friendly version of [[categoryMaps]] */
@Since("1.4.0")
def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = {
- categoryMaps.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt,
JMap[JDouble, JInt]]]
+ categoryMaps.view.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt,
JMap[JDouble, JInt]]]
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index 6a22c3580e3..053aaac742b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -217,7 +217,7 @@ class Word2VecModel private[ml] (
@Since("1.5.0")
@transient lazy val getVectors: DataFrame = {
val spark = SparkSession.builder().getOrCreate()
- val wordVec = wordVectors.getVectors.mapValues(vec =>
Vectors.dense(vec.map(_.toDouble)))
+ val wordVec = wordVectors.getVectors.view.mapValues(vec =>
Vectors.dense(vec.map(_.toDouble)))
spark.createDataFrame(wordVec.toSeq).toDF("word", "vector")
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index 012e942a60a..4e5b8cd5efe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -1292,8 +1292,8 @@ private[spark] object RandomForest extends Logging with
Serializable {
}
// Convert mutable maps to immutable ones.
val nodesForGroup: Map[Int, Array[LearningNode]] =
- mutableNodesForGroup.mapValues(_.toArray).toMap
- val treeToNodeToIndexInfo =
mutableTreeToNodeToIndexInfo.mapValues(_.toMap).toMap
+ mutableNodesForGroup.view.mapValues(_.toArray).toMap
+ val treeToNodeToIndexInfo =
mutableTreeToNodeToIndexInfo.view.mapValues(_.toMap).toMap
(nodesForGroup, treeToNodeToIndexInfo)
}
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index a53581e8a60..fce2537b761 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -220,7 +220,7 @@ class BisectingKMeans private (
divisibleIndices.contains(parentIndex(index))
}
newClusters = summarize(d, newAssignments, dMeasure)
- newClusterCenters =
newClusters.mapValues(_.center).map(identity).toMap
+ newClusterCenters =
newClusters.view.mapValues(_.center).map(identity).toMap
}
if (preIndices != null) {
preIndices.unpersist()
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 8ab6aba641a..af0d9e48a3b 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -443,7 +443,7 @@ class BlockMatrix @Since("1.3.0") (
val leftMatrix = blockInfo.keys.collect()
val rightMatrix = other.blockInfo.keys.collect()
- val rightCounterpartsHelper =
rightMatrix.groupBy(_._1).mapValues(_.map(_._2))
+ val rightCounterpartsHelper =
rightMatrix.groupBy(_._1).view.mapValues(_.map(_._2))
val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
val rightCounterparts = rightCounterpartsHelper.getOrElse(colIndex,
Array.emptyIntArray)
val partitions = rightCounterparts.map(b =>
partitioner.getPartition((rowIndex, b)))
@@ -452,7 +452,7 @@ class BlockMatrix @Since("1.3.0") (
partitions.toSet.map((pid: Int) => pid * midDimSplitNum +
midDimSplitIndex))
}.toMap
- val leftCounterpartsHelper =
leftMatrix.groupBy(_._2).mapValues(_.map(_._1))
+ val leftCounterpartsHelper =
leftMatrix.groupBy(_._2).view.mapValues(_.map(_._1))
val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) =>
val leftCounterparts = leftCounterpartsHelper.getOrElse(rowIndex,
Array.emptyIntArray)
val partitions = leftCounterparts.map(b => partitioner.getPartition((b,
colIndex)))
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
index 9c761824134..ead9f887fe8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
@@ -162,6 +162,7 @@ private[spark] object ChiSqTest extends Logging {
.map { case ((label, _), c) => (label, c) }
.toArray
.groupBy(_._1)
+ .view
.mapValues(_.map(_._2).sum)
labelCounts.foreach { case (label, countByLabel) =>
val nnzByLabel = labelNNZ.getOrElse(label, 0L)
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index d206b5fd280..08202c7f1f3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -855,7 +855,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest
with Logging {
Seq(2, 4, 6).foreach { k =>
val n = math.min(k, numItems).toInt
- val expectedUpToN = expected.mapValues(_.slice(0, n))
+ val expectedUpToN = expected.view.mapValues(_.slice(0, n))
val topItems = model.recommendForAllUsers(k)
assert(topItems.count() == numUsers)
assert(topItems.columns.contains("user"))
@@ -876,7 +876,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest
with Logging {
Seq(2, 3, 4).foreach { k =>
val n = math.min(k, numUsers).toInt
- val expectedUpToN = expected.mapValues(_.slice(0, n))
+ val expectedUpToN = expected.view.mapValues(_.slice(0, n))
val topUsers = getALSModel.recommendForAllItems(k)
assert(topUsers.count() == numItems)
assert(topUsers.columns.contains("item"))
@@ -898,7 +898,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest
with Logging {
Seq(2, 4, 6).foreach { k =>
val n = math.min(k, numItems).toInt
- val expectedUpToN = expected.mapValues(_.slice(0, n))
+ val expectedUpToN = expected.view.mapValues(_.slice(0, n))
val topItems = model.recommendForUserSubset(userSubset, k)
assert(topItems.count() == numUsersSubset)
assert(topItems.columns.contains("user"))
@@ -920,7 +920,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest
with Logging {
Seq(2, 3, 4).foreach { k =>
val n = math.min(k, numUsers).toInt
- val expectedUpToN = expected.mapValues(_.slice(0, n))
+ val expectedUpToN = expected.view.mapValues(_.slice(0, n))
val topUsers = model.recommendForItemSubset(itemSubset, k)
assert(topUsers.count() == numItemsSubset)
assert(topUsers.columns.contains("item"))
diff --git
a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index e4cd492be3d..1973f306441 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -43,8 +43,8 @@ class Word2VecSuite extends SparkFunSuite with
MLlibTestSparkContext {
// and a Word2VecMap give the same values.
val word2VecMap = model.getVectors
val newModel = new Word2VecModel(word2VecMap)
- assert(newModel.getVectors.mapValues(_.toSeq).toMap ===
- word2VecMap.mapValues(_.toSeq).toMap)
+ assert(newModel.getVectors.view.mapValues(_.toSeq).toMap ===
+ word2VecMap.view.mapValues(_.toSeq).toMap)
}
test("Word2Vec throws exception when vocabulary is empty") {
@@ -103,8 +103,8 @@ class Word2VecSuite extends SparkFunSuite with
MLlibTestSparkContext {
try {
model.save(sc, path)
val sameModel = Word2VecModel.load(sc, path)
- assert(sameModel.getVectors.mapValues(_.toSeq).toMap ===
- model.getVectors.mapValues(_.toSeq).toMap)
+ assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap ===
+ model.getVectors.view.mapValues(_.toSeq).toMap)
} finally {
Utils.deleteRecursively(tempDir)
}
@@ -138,8 +138,8 @@ class Word2VecSuite extends SparkFunSuite with
MLlibTestSparkContext {
try {
model.save(sc, path)
val sameModel = Word2VecModel.load(sc, path)
- assert(sameModel.getVectors.mapValues(_.toSeq).toMap ===
- model.getVectors.mapValues(_.toSeq).toMap)
+ assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap ===
+ model.getVectors.view.mapValues(_.toSeq).toMap)
}
catch {
case t: Throwable => fail("exception thrown persisting a model " +
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 3677927b9a5..dc5e49fc35b 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -210,7 +210,7 @@ object Metadata {
// `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in
Scala 2.13, call
// `toMap` for Scala version compatibility.
case map: Map[_, _] =>
- map.mapValues(hash).toMap.##
+ map.view.mapValues(hash).toMap.##
case arr: Array[_] =>
// Seq.empty[T] has the same hashCode regardless of T.
arr.toSeq.map(hash).##
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 65338f9917b..3a5f60eb376 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1314,7 +1314,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)
- val staticPartitions =
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get).toMap
+ val staticPartitions =
i.partitionSpec.filter(_._2.isDefined).view.mapValues(_.get).toMap
val query = addStaticPartitionColumns(r,
projectByName.getOrElse(i.query), staticPartitions,
isByName)
@@ -3616,6 +3616,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
// `GetStructField` without the name property.
.collect { case g: GetStructField if g.name.isEmpty => g }
.groupBy(_.child)
+ .view
.mapValues(_.map(_.ordinal).distinct.sorted)
structChildToOrdinals.foreach { case (expr, ordinals) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 67c57ec2787..0f235e0977b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -207,7 +207,7 @@ object ExternalCatalogUtils {
}
def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec
= {
- spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else
v).map(identity).toMap
+ spec.view.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else
v).map(identity).toMap
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index f48ff23f4ad..e71865df94d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -968,7 +968,7 @@ class SessionCatalog(
} else {
_.toLowerCase(Locale.ROOT)
}
- val nameToCounts =
viewColumnNames.groupBy(normalizeColName).mapValues(_.length)
+ val nameToCounts =
viewColumnNames.groupBy(normalizeColName).view.mapValues(_.length)
val nameToCurrentOrdinal =
scala.collection.mutable.HashMap.empty[String, Int]
val viewDDL = buildViewDDL(metadata, isTempView)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index b32ef3d95aa..ea985ae5f30 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -126,7 +126,7 @@ package object expressions {
}
private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] =
{
- m.mapValues(_.distinct).toMap
+ m.view.mapValues(_.distinct).toMap
}
/** Map to use for direct case insensitive attribute lookups. */
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 5d4fcf772b8..710c07fab7e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -150,7 +150,7 @@ object NestedColumnAliasing {
// A reference attribute can have multiple aliases for nested fields.
val attrToAliases =
- AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2)))
+
AttributeMap(attributeToExtractValuesAndAliases.view.mapValues(_.map(_._2)))
plan match {
case Project(projectList, child) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6f65afada17..48ecb9aee21 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1055,6 +1055,7 @@ object CollapseProject extends Rule[LogicalPlan] with
AliasHelper {
.filter(_.references.exists(producerMap.contains))
.flatMap(collectReferences)
.groupBy(identity)
+ .view
.mapValues(_.size)
.forall {
case (reference, count) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2b5f542f22b..d9d04db9ab0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1083,7 +1083,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
// window w1 as (partition by p_mfgr order by p_name
// range between 2 preceding and 2 following),
// w2 as w1
- val windowMapView = baseWindowMap.mapValues {
+ val windowMapView = baseWindowMap.view.mapValues {
case WindowSpecReference(name) =>
baseWindowMap.get(name) match {
case Some(spec: WindowSpecDefinition) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index e7be8a7e29b..0e5dd301953 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -365,7 +365,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
// `map.mapValues().view.force` return `Map` in Scala 2.12 but return
`IndexedSeq` in Scala
// 2.13, call `toMap` method manually to compatible with Scala 2.12
and Scala 2.13
// `mapValues` is lazy and we need to force it to materialize
- m.mapValues(mapChild).view.force.toMap
+ m.view.mapValues(mapChild).view.force.toMap
case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg)
case Some(child) => Some(mapChild(child))
case nonChild: AnyRef => nonChild
@@ -786,7 +786,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
Some(arg.asInstanceOf[BaseType].clone())
// `map.mapValues().view.force` return `Map` in Scala 2.12 but return
`IndexedSeq` in Scala
// 2.13, call `toMap` method manually to compatible with Scala 2.12 and
Scala 2.13
- case m: Map[_, _] => m.mapValues {
+ case m: Map[_, _] => m.view.mapValues {
case arg: TreeNode[_] if containsChild(arg) =>
arg.asInstanceOf[BaseType].clone()
case other => other
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
index d56bca30a05..1274751ac94 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
@@ -334,7 +334,7 @@ class ToNumberParser(numberFormat: String, errorOnFail:
Boolean) extends Seriali
)
}
// Make sure that the format string does not contain any prohibited
duplicate tokens.
- val inputTokenCounts = formatTokens.groupBy(identity).mapValues(_.size)
+ val inputTokenCounts =
formatTokens.groupBy(identity).view.mapValues(_.size)
Seq(DecimalPoint(),
OptionalPlusOrMinusSign(),
OptionalMinusSign(),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index ae5ebd6a974..5fcd71d8bf9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -228,6 +228,7 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
case seq => Some(CreateStruct(seq)).map(e => Alias(e, e.sql)()).get
}
.groupBy(_.dataType)
+ .view
.mapValues(values => values.map(value => toSQLId(value.name)).sorted)
.mapValues(values => if (values.length > 3) values.take(3) :+ "..." else
values)
.toList.sortBy(_._1.sql)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index aa3fe8dfb7c..59aa17baa7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -683,7 +683,7 @@ class SparkSession private(
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
if (args.nonEmpty) {
- NameParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
+ NameParameterizedQuery(parsedPlan,
args.view.mapValues(lit(_).expr).toMap)
} else {
parsedPlan
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index dc0857383e7..c2b227d6cad 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -64,7 +64,7 @@ case class AnalyzePartitionCommand(
tableId.table, tableId.database.get, schemaColumns, specColumns)
}
- val filteredSpec =
normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
+ val filteredSpec =
normalizedPartitionSpec.filter(_._2.isDefined).view.mapValues(_.get)
if (filteredSpec.isEmpty) {
None
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 8011cc00743..9b38155851e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -406,7 +406,7 @@ object PartitioningUtils extends SQLConfHelper {
val distinctPartColNames =
pathWithPartitionValues.map(_._2.columnNames).distinct
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
- seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value)
=> value }).toMap
+ seq.groupBy { case (key, _) => key }.view.mapValues(_.map { case (_,
value) => value }).toMap
val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
case (path, partValues) => partValues.columnNames -> path
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index b7de20ae293..e5444094673 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -81,6 +81,7 @@ trait OrcFiltersBase {
val dedupPrimitiveFields = primitiveFields
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
+ .view
.mapValues(_.head._2)
CaseInsensitiveMap(dedupPrimitiveFields.toMap)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index c5360c9a04f..c1d02ba5a22 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -110,6 +110,7 @@ class ParquetFilters(
primitiveFields
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
+ .view
.mapValues(_.head._2)
CaseInsensitiveMap(dedupPrimitiveFields.toMap)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 8552c950f67..bddf6d02f1f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -491,6 +491,7 @@ case class EnsureRequirements(
val numExpectedPartitions = partValues
.map(InternalRowComparableWrapper(_, partitionExprs))
.groupBy(identity)
+ .view
.mapValues(_.size)
mergedPartValues = mergedPartValues.map { case (partVal,
numParts) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 702d3ea09b6..e70e94001ee 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -136,9 +136,9 @@ trait ProgressReporter extends Logging {
from: StreamProgress,
to: StreamProgress,
latest: StreamProgress): Unit = {
- currentTriggerStartOffsets = from.mapValues(_.json).toMap
- currentTriggerEndOffsets = to.mapValues(_.json).toMap
- currentTriggerLatestOffsets = latest.mapValues(_.json).toMap
+ currentTriggerStartOffsets = from.view.mapValues(_.json).toMap
+ currentTriggerEndOffsets = to.view.mapValues(_.json).toMap
+ currentTriggerLatestOffsets = latest.view.mapValues(_.json).toMap
latestStreamProgress = to
}
@@ -243,7 +243,7 @@ trait ProgressReporter extends Logging {
batchId = currentBatchId,
batchDuration = processingTimeMills,
durationMs =
- new
java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
+ new
java.util.HashMap(currentDurationsMs.toMap.view.mapValues(long2Long).toMap.asJava),
eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
@@ -297,7 +297,7 @@ trait ProgressReporter extends Logging {
Map(
"max" -> stats.max,
"min" -> stats.min,
- "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+ "avg" -> stats.avg.toLong).view.mapValues(formatTimestamp)
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
ExecutionStats(numInputRows, stateOperators, eventTimeStats.toMap)
@@ -307,7 +307,7 @@ trait ProgressReporter extends Logging {
private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = {
def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream,
Long] = {
- tuples.groupBy(_._1).mapValues(_.map(_._2).sum).toMap // sum up rows for
each source
+ tuples.groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // sum up
rows for each source
}
def unrollCTE(plan: LogicalPlan): LogicalPlan = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 40f63c86a6a..3dfc27b14c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -507,7 +507,7 @@ class RocksDB(
"put" -> DB_WRITE,
"compaction" -> COMPACTION_TIME
).toMap
- val nativeOpsLatencyMicros = nativeOpsHistograms.mapValues { typ =>
+ val nativeOpsLatencyMicros = nativeOpsHistograms.view.mapValues { typ =>
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
}
val nativeOpsMetricTickers = Seq(
@@ -530,7 +530,7 @@ class RocksDB(
/** Number of bytes written during flush */
"totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES
).toMap
- val nativeOpsMetrics = nativeOpsMetricTickers.mapValues { typ =>
+ val nativeOpsMetrics = nativeOpsMetricTickers.view.mapValues { typ =>
nativeStats.getTickerCount(typ)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index def4a4104f7..b36aa264e28 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -156,7 +156,7 @@ trait StateStoreWriter extends StatefulOperator with
PythonSQLMetrics { self: Sp
.map(entry => entry._1 -> longMetric(entry._1).value)
val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
- new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava)
+ new
java.util.HashMap(customMetrics.view.mapValues(long2Long).toMap.asJava)
// We now don't report number of shuffle partitions inside the state
operator. Instead,
// it will be filled when the stream query progress is reported
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index 2e088ec8e4b..f66b99a154e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -79,7 +79,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
request,
"running",
running.toSeq,
- executionIdToSubExecutions.mapValues(_.toSeq).toMap,
+ executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
currentTime,
showErrorMessage = false,
showRunningJobs = true,
@@ -105,7 +105,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
request,
"completed",
completed.toSeq,
- executionIdToSubExecutions.mapValues(_.toSeq).toMap,
+ executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
currentTime,
showErrorMessage = false,
showRunningJobs = false,
@@ -132,7 +132,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
request,
"failed",
failed.toSeq,
- executionIdToSubExecutions.mapValues(_.toSeq).toMap,
+ executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
currentTime,
showErrorMessage = true,
showRunningJobs = false,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 47ff942e5ca..1dece5c8285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -343,7 +343,7 @@ class DataFrameStatSuite extends QueryTest with
SharedSparkSession {
val columnNames = crosstab.schema.fieldNames
assert(columnNames(0) === "a_b")
// reduce by key
- val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length)
+ val expected = data.map(t => (t,
1)).groupBy(_._1).view.mapValues(_.length)
val rows = crosstab.collect()
rows.foreach { row =>
val i = row.getString(0).toInt
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 226d5098d42..d6dc5165fbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -425,7 +425,7 @@ class SQLQueryTestSuite extends QueryTest with
SharedSparkSession with SQLHelper
// We need to do cartesian product for all the config dimensions, to get
a list of
// config sets, and run the query once for each config set.
val configDimLines =
comments.filter(_.startsWith("--CONFIG_DIM")).map(_.substring(12))
- val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).mapValues
{ lines =>
+ val configDims = configDimLines.groupBy(_.takeWhile(_ != '
')).view.mapValues { lines =>
lines.map(_.dropWhile(_ != ' ').substring(1)).map(_.split(",").map {
kv =>
val (conf, value) = kv.span(_ != '=')
conf.trim -> value.substring(1).trim
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 814cf2f33ff..60d82fd1ac3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -527,7 +527,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
sparkContext.parallelize(Seq(Row(Map("a" -> new
BigDecimal("2011000000000002456556"))))),
StructType(Seq(StructField("col1", MapType(StringType, DecimalType(30,
0))))))
val udf2 = org.apache.spark.sql.functions.udf((map: Map[String,
BigDecimal]) => {
- map.mapValues(value => if (value == null) null else
value.toBigInteger.toString).toMap
+ map.view.mapValues(value => if (value == null) null else
value.toBigInteger.toString).toMap
})
checkAnswer(df2.select(udf2($"col1")), Seq(Row(Map("a" ->
"2011000000000002456556"))))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index 6563a7698e2..057cb527cf0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -629,7 +629,7 @@ class PersistedViewTestSuite extends SQLViewTestSuite with
SharedSparkSession {
val meta = catalog.getTableRawMetadata(TableIdentifier("test_view",
Some("default")))
// simulate a view meta with incompatible schema change
val newProp = meta.properties
- .mapValues(_.replace("col_i", "col_j")).toMap
+ .view.mapValues(_.replace("col_i", "col_j")).toMap
val newSchema = StructType(Seq(StructField("col_j", IntegerType)))
catalog.alterTable(meta.copy(properties = newProp, schema = newSchema))
val e = intercept[AnalysisException] {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index eaf305414f1..1e786c8e578 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -47,7 +47,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with
DDLCommandTestUtil
t: String,
ifExists: String,
specs: Map[String, Any]*): Unit = {
- checkPartitions(t, specs.map(_.mapValues(_.toString).toMap): _*)
+ checkPartitions(t, specs.map(_.view.mapValues(_.toString).toMap): _*)
val specStr = specs.map(partSpecToString).mkString(", ")
sql(s"ALTER TABLE $t DROP $ifExists $specStr")
checkPartitions(t)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a5d5f8ce30f..726ae87b5ce 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -392,7 +392,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
- Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+ Row(m.view.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
})
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index cae008af6f0..30315c12b58 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -216,8 +216,8 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])],
enableWholeStage: Boolean = false): Unit = {
- val expectedMetricsPredicates = expectedMetrics.mapValues { case
(nodeName, nodeMetrics) =>
- (nodeName, nodeMetrics.mapValues(expectedMetricValue =>
+ val expectedMetricsPredicates = expectedMetrics.view.mapValues { case
(nodeName, nodeMetrics) =>
+ (nodeName, nodeMetrics.view.mapValues(expectedMetricValue =>
(actualMetricValue: Any) => {
actualMetricValue.toString.matches(expectedMetricValue.toString)
}).toMap)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 9a3313b8069..5affc9ef3b2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -220,23 +220,23 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
)))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 2).toMap)
+ accumulatorUpdates.view.mapValues(_ * 2).toMap)
// Driver accumulator updates don't belong to this execution should be
filtered and no
// exception will be thrown.
listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 2).toMap)
+ accumulatorUpdates.view.mapValues(_ * 2).toMap)
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("",
Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
- (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ *
2).toMap))
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.view.mapValues(_ *
2).toMap))
)))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 3).toMap)
+ accumulatorUpdates.view.mapValues(_ * 3).toMap)
// Retrying a stage should reset the metrics
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0,
1)))
@@ -250,7 +250,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
)))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 2).toMap)
+ accumulatorUpdates.view.mapValues(_ * 2).toMap)
// Ignore the task end for the first attempt
listener.onTaskEnd(SparkListenerTaskEnd(
@@ -258,12 +258,12 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ *
100).toMap),
+ createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ *
100).toMap),
new ExecutorMetrics,
null))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 2).toMap)
+ accumulatorUpdates.view.mapValues(_ * 2).toMap)
// Finish two tasks
listener.onTaskEnd(SparkListenerTaskEnd(
@@ -271,7 +271,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2).toMap),
+ createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ *
2).toMap),
new ExecutorMetrics,
null))
listener.onTaskEnd(SparkListenerTaskEnd(
@@ -279,12 +279,12 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap),
+ createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ *
3).toMap),
new ExecutorMetrics,
null))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 5).toMap)
+ accumulatorUpdates.view.mapValues(_ * 5).toMap)
// Summit a new stage
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1,
0)))
@@ -298,7 +298,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
)))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 7).toMap)
+ accumulatorUpdates.view.mapValues(_ * 7).toMap)
// Finish two tasks
listener.onTaskEnd(SparkListenerTaskEnd(
@@ -306,7 +306,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap),
+ createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ *
3).toMap),
new ExecutorMetrics,
null))
listener.onTaskEnd(SparkListenerTaskEnd(
@@ -314,12 +314,12 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap),
+ createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ *
3).toMap),
new ExecutorMetrics,
null))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 11).toMap)
+ accumulatorUpdates.view.mapValues(_ * 11).toMap)
assertJobs(statusStore.execution(executionId), running = Seq(0))
@@ -334,7 +334,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
assertJobs(statusStore.execution(executionId), completed = Seq(0))
checkAnswer(statusStore.executionMetrics(executionId),
- accumulatorUpdates.mapValues(_ * 11).toMap)
+ accumulatorUpdates.view.mapValues(_ * 11).toMap)
}
test("control a plan explain mode in listeners via SQLConf") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 4fa49064faa..4c478486c6b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -391,7 +391,7 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
batchDuration = 0L,
- durationMs = new java.util.HashMap(Map("total" ->
0L).mapValues(long2Long).toMap.asJava),
+ durationMs = new java.util.HashMap(Map("total" ->
0L).view.mapValues(long2Long).toMap.asJava),
eventTime = new java.util.HashMap(Map(
"max" -> "2016-12-05T20:54:20.827Z",
"min" -> "2016-12-05T20:54:20.827Z",
@@ -403,7 +403,7 @@ object StreamingQueryStatusAndProgressSuite {
numShufflePartitions = 2, numStateStoreInstances = 2,
customMetrics = new
java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
- .mapValues(long2Long).toMap.asJava)
+ .view.mapValues(long2Long).toMap.asJava)
)),
sources = Array(
new SourceProgress(
@@ -429,7 +429,7 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
batchDuration = 0L,
- durationMs = new java.util.HashMap(Map("total" ->
0L).mapValues(long2Long).toMap.asJava),
+ durationMs = new java.util.HashMap(Map("total" ->
0L).view.mapValues(long2Long).toMap.asJava),
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(operatorName = "op2",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 44de7af0b4a..e5e873cca12 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest {
c.committedOffsets.lastOption.map { case (_, offset) =>
offset match {
case o: RateStreamOffset =>
- o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap
+ o.partitionToValueAndRunTimeMs.view.mapValues(_.value).toMap
}
}
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 5c340522f91..f52ff8cf9a1 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1261,7 +1261,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[String] =
withClient {
val catalogTable = getTable(db, table)
- val partColNameMap =
buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
+ val partColNameMap =
buildLowerCasePartColNameMap(catalogTable).view.mapValues(escapePathName)
val clientPartitionNames =
client.getPartitionNames(catalogTable,
partialSpec.map(toMetaStorePartitionSpec))
clientPartitionNames.map { partitionPath =>
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index af884914ad8..5a124be5679 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -65,7 +65,7 @@ private[streaming] class
JavaStreamingListenerWrapper(javaStreamingListener: Jav
private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = {
JavaBatchInfo(
batchInfo.batchTime,
-
batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo).toMap.asJava,
+
batchInfo.streamIdToInputInfo.view.mapValues(toJavaStreamInputInfo).toMap.asJava,
batchInfo.submissionTime,
batchInfo.processingStartTime.getOrElse(-1),
batchInfo.processingEndTime.getOrElse(-1),
@@ -73,7 +73,7 @@ private[streaming] class
JavaStreamingListenerWrapper(javaStreamingListener: Jav
batchInfo.processingDelay.getOrElse(-1),
batchInfo.totalDelay.getOrElse(-1),
batchInfo.numRecords,
-
batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo).toMap.asJava
+
batchInfo.outputOperationInfos.view.mapValues(toJavaOutputOperationInfo).toMap.asJava
)
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index 02eee36d4be..5c47cde0459 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -184,7 +184,7 @@ private[streaming] class ReceiverSchedulingPolicy {
val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
- .groupBy(_._1).mapValues(_.map(_._2).sum).toMap // Sum weights for
each executor
+ .groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // Sum weights
for each executor
}
val idleExecutors = executors.toSet -- executorWeights.keys
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 342a0a43b50..685ddf67237 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -244,7 +244,7 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
*/
def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
if (isTrackerStarted) {
- endpoint.askSync[Map[Int,
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+ endpoint.askSync[Map[Int,
ReceiverTrackingInfo]](GetAllReceiverInfo).view.mapValues {
_.runningExecutor.map {
_.executorId
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 1af60857bc7..08de8d46c31 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -98,7 +98,7 @@ private[ui] object BatchUIData {
def apply(batchInfo: BatchInfo): BatchUIData = {
val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]()
- outputOperations ++=
batchInfo.outputOperationInfos.mapValues(OutputOperationUIData.apply)
+ outputOperations ++=
batchInfo.outputOperationInfos.view.mapValues(OutputOperationUIData.apply)
new BatchUIData(
batchInfo.batchTime,
batchInfo.streamIdToInputInfo,
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 9abf018584c..140eb866f6f 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -216,7 +216,8 @@ private[spark] class StreamingJobProgressListener(ssc:
StreamingContext)
def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] =
synchronized {
val _retainedBatches = retainedBatches
val latestBatches = _retainedBatches.map { batchUIData =>
- (batchUIData.batchTime.milliseconds,
batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
+ (batchUIData.batchTime.milliseconds,
+ batchUIData.streamIdToInputInfo.view.mapValues(_.numRecords))
}
streamIds.map { streamId =>
val recordRates = latestBatches.map {
@@ -230,7 +231,7 @@ private[spark] class StreamingJobProgressListener(ssc:
StreamingContext)
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption =
- lastReceivedBatch.map(_.streamIdToInputInfo.mapValues(_.numRecords))
+ lastReceivedBatch.map(_.streamIdToInputInfo.view.mapValues(_.numRecords))
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
streamIds.map { streamId =>
(streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]