ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1439152043
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* (if there is one). It returns true iff the segment is
deletable.
* @return the segments ready to be deleted
*/
- private[log] def deletableSegments(predicate: (LogSegment,
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
- def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment],
upperBoundOffset: Long): Boolean = {
- val allowDeletionDueToLogStartOffsetIncremented =
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+ private[log] def deletableSegments(predicate: (LogSegment,
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {
Review Comment:
We should change this to return `java.util.Collection[LogSegment]` to avoid
unnecessary conversions.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
localLog.checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
- localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true,
reason)
+ localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,
true, reason)
Review Comment:
This conversion can be avoided if we make the change to the method signature
suggested above.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1564,8 +1565,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteLogStartOffsetBreachedSegments(): Int = {
- def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]):
Boolean = {
- nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled())
localLogStartOffset() else logStartOffset))
+ def shouldDelete(segment: LogSegment, nextSegmentOpt:
Optional[LogSegment]): Boolean = {
+ if (nextSegmentOpt.isPresent)
+ nextSegmentOpt.get().baseOffset <= (if (remoteLogEnabled())
localLogStartOffset() else logStartOffset)
+ else false
Review Comment:
Nit: you can do `nextSegmentOpt.isPresent && nextSegmentOpt.get...`
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private[log] def collectAbortedTransactions(startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn] = {
Review Comment:
We should change this to return `java.util.Collection` or `java.util.List`
to avoid unnecessary conversions.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
def deleteProducerSnapshots(): Unit = {
LocalLog.maybeHandleIOException(logDirFailureChannel,
parentDir,
- s"Error while deleting producer state snapshots for $topicPartition in
dir $parentDir") {
+ s"Error while deleting producer state snapshots for $topicPartition in
dir $parentDir", {
snapshotsToDelete.foreach { snapshot =>
snapshot.deleteIfExists()
}
- }
+ return;
Review Comment:
Hmm, it is a bit odd that a `return` with no value is required for scala
code. Is this right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]