Repository: spark
Updated Branches:
refs/heads/master 446c45bd8 -> 8a333d2da
[SPARK-14243][CORE] update task metrics when removing blocks
## What changes were proposed in this pull request?
This PR try to use `incUpdatedBlockStatuses ` to update the
`updatedBlockStatuses ` when removing blocks, making sure `BlockManager`
correctly updates `updatedBlockStatuses`
## How was this patch tested?
test("updated block statuses") in BlockManagerSuite.scala
Author: jeanlyn <[email protected]>
Closes #12091 from jeanlyn/updateBlock.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a333d2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a333d2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a333d2d
Branch: refs/heads/master
Commit: 8a333d2da859fd593bda183413630bc3757529c9
Parents: 446c45b
Author: jeanlyn <[email protected]>
Authored: Thu Mar 31 12:04:42 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Mar 31 12:04:42 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/storage/BlockManager.scala | 7 +++++--
.../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8a333d2d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0c7763f..3014caf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1264,9 +1264,12 @@ private[spark] class BlockManager(
"the disk, memory, or external block store")
}
blockInfoManager.removeBlock(blockId)
+ val removeBlockStatus = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
- val status = getCurrentBlockStatus(blockId, info)
- reportBlockStatus(blockId, info, status)
+ reportBlockStatus(blockId, info, removeBlockStatus)
+ }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId,
removeBlockStatus)))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8a333d2d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6fc32cb..9f3a775 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -928,6 +928,16 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
assert(!store.diskStore.contains("list3"), "list3 was in disk store")
assert(!store.diskStore.contains("list4"), "list4 was in disk store")
assert(!store.diskStore.contains("list5"), "list5 was in disk store")
+
+ // remove block - list2 should be removed from disk
+ val updatedBlocks6 = getUpdatedBlocks {
+ store.removeBlock(
+ "list2", tellMaster = true)
+ }
+ assert(updatedBlocks6.size === 1)
+ assert(updatedBlocks6.head._1 === TestBlockId("list2"))
+ assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
+ assert(!store.diskStore.contains("list2"), "list2 was in disk store")
}
test("query block statuses") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]