This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 1193b2a9ae Use TExternalCompaction instead of RunningCompaction (#6221)
1193b2a9ae is described below
commit 1193b2a9ae44a830f4e1607856862738cc59ca0b
Author: Dave Marion <[email protected]>
AuthorDate: Mon Mar 16 16:45:13 2026 -0400
Use TExternalCompaction instead of RunningCompaction (#6221)
This change removes the RunningCompaction object and replaces
it with TExternalCompaction. Both objects were similar except
for the startTime, which I added to TExternalCompaction.
This change will make removing the collections from the
coordinator easier in subsequent changes while keeping
the same amount of information.
---
.../util/compaction/ExternalCompactionUtil.java | 54 +++------
.../core/util/compaction/RunningCompaction.java | 103 ----------------
.../core/compaction/thrift/CompactorService.java | 44 +++----
.../compaction/thrift/TExternalCompaction.java | 108 ++++++++++++++++-
core/src/main/thrift/compaction-coordinator.thrift | 7 +-
.../accumulo/server/util/ListCompactions.java | 14 +--
.../accumulo/compactor/CompactionJobHolder.java | 22 ++--
.../org/apache/accumulo/compactor/Compactor.java | 94 +++++++-------
.../org/apache/accumulo/compactor/ExtCEnv.java | 4 +-
.../coordinator/CompactionCoordinator.java | 135 ++++++++++-----------
.../coordinator/CoordinatorSummaryLogger.java | 10 +-
.../compaction/CompactionCoordinatorTest.java | 52 ++++----
.../test/functional/MemoryConsumingCompactor.java | 6 +-
13 files changed, 315 insertions(+), 338 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index aaf615cd05..e3aa25003e 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
@@ -48,7 +49,6 @@ import
org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -62,30 +62,6 @@ import com.google.common.net.HostAndPort;
public class ExternalCompactionUtil {
- private static class RunningCompactionFuture {
- private final ResourceGroupId group;
- private final HostAndPort compactor;
- private final Future<TExternalCompactionJob> future;
-
- public RunningCompactionFuture(ServiceLockPath slp,
Future<TExternalCompactionJob> future) {
- this.group = slp.getResourceGroup();
- this.compactor = HostAndPort.fromString(slp.getServer());
- this.future = future;
- }
-
- public ResourceGroupId getGroup() {
- return group;
- }
-
- public HostAndPort getCompactor() {
- return compactor;
- }
-
- public Future<TExternalCompactionJob> getFuture() {
- return future;
- }
- }
-
private static final Logger LOG =
LoggerFactory.getLogger(ExternalCompactionUtil.class);
/**
@@ -178,17 +154,18 @@ public class ExternalCompactionUtil {
* @param context context
* @return external compaction job or null if none running
*/
- public static TExternalCompactionJob getRunningCompaction(HostAndPort
compactorAddr,
+ public static TExternalCompaction getRunningCompaction(HostAndPort
compactorAddr,
ClientContext context) {
CompactorService.Client client = null;
try {
client = ThriftUtil.getClient(ThriftClientTypes.COMPACTOR,
compactorAddr, context);
- TExternalCompactionJob job =
+ TExternalCompaction current =
client.getRunningCompaction(TraceUtil.traceInfo(),
context.rpcCreds());
- if (job.getExternalCompactionId() != null) {
- LOG.debug("Compactor {} is running {}", compactorAddr,
job.getExternalCompactionId());
- return job;
+ if (current.getJob() != null &&
current.getJob().getExternalCompactionId() != null) {
+ LOG.debug("Compactor {} is running {}", compactorAddr,
+ current.getJob().getExternalCompactionId());
+ return current;
}
} catch (TException e) {
LOG.debug("Failed to contact compactor {}", compactorAddr, e);
@@ -223,27 +200,26 @@ public class ExternalCompactionUtil {
* @param context server context
* @return list of compactor and external compaction jobs
*/
- public static List<RunningCompaction>
getCompactionsRunningOnCompactors(ClientContext context) {
- final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
+ public static List<TExternalCompaction>
getCompactionsRunningOnCompactors(ClientContext context) {
+ final List<Future<TExternalCompaction>> rcFutures = new ArrayList<>();
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build();
context.getServerPaths().getCompactor(ResourceGroupPredicate.ANY,
AddressSelector.all(), true)
.forEach(slp -> {
final HostAndPort hp = HostAndPort.fromString(slp.getServer());
- rcFutures.add(new RunningCompactionFuture(slp,
- executor.submit(() -> getRunningCompaction(hp, context))));
+ rcFutures.add(executor.submit(() -> getRunningCompaction(hp,
context)));
});
executor.shutdown();
- final List<RunningCompaction> results = new ArrayList<>();
+ final List<TExternalCompaction> results = new ArrayList<>();
rcFutures.forEach(rcf -> {
try {
- TExternalCompactionJob job = rcf.getFuture().get();
- if (null != job && null != job.getExternalCompactionId()) {
- var compactorAddress = getHostPortString(rcf.getCompactor());
- results.add(new RunningCompaction(job, compactorAddress,
rcf.getGroup()));
+ TExternalCompaction job = rcf.get();
+ if (job == null || job.getJob() == null ||
job.getJob().getExternalCompactionId() == null) {
+ return;
}
+ results.add(job);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
deleted file mode 100644
index 809422a5a2..0000000000
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util.compaction;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
-
-import org.apache.accumulo.core.compaction.thrift.TCompactionState;
-import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
-import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
-import org.apache.accumulo.core.data.ResourceGroupId;
-import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RunningCompaction {
-
- private final static Logger LOG =
LoggerFactory.getLogger(RunningCompaction.class);
-
- private final TExternalCompactionJob job;
- private final String compactorAddress;
- private final ResourceGroupId groupName;
- private final Map<Long,TCompactionStatusUpdate> updates = new TreeMap<>();
-
- // If this object were to be added to a time sorted list before the start
time
- // is set, then it will end up at the end of the list.
- private Long startTime = Long.MAX_VALUE;
-
- public RunningCompaction(TExternalCompactionJob job, String compactorAddress,
- ResourceGroupId groupName) {
- this.job = Objects.requireNonNull(job, "job cannot be null");
- this.compactorAddress =
- Objects.requireNonNull(compactorAddress, "compactor address cannot be
null");
- this.groupName = Objects.requireNonNull(groupName, "groupName cannot be
null");
- }
-
- public RunningCompaction(TExternalCompaction tEC) {
- this(tEC.getJob(), tEC.getCompactor(),
ResourceGroupId.of(tEC.getGroupName()));
- }
-
- public Map<Long,TCompactionStatusUpdate> getUpdates() {
- synchronized (updates) {
- return new TreeMap<>(updates);
- }
- }
-
- public void addUpdate(Long timestamp, TCompactionStatusUpdate update) {
- synchronized (updates) {
- this.updates.put(timestamp, update);
- if (update.getState() == TCompactionState.STARTED) {
- startTime = timestamp;
- }
- }
- }
-
- public TExternalCompactionJob getJob() {
- return job;
- }
-
- public String getCompactorAddress() {
- return compactorAddress;
- }
-
- public ResourceGroupId getGroup() {
- return groupName;
- }
-
- public boolean isStartTimeSet() {
- return startTime != Long.MAX_VALUE;
- }
-
- public Long getStartTime() {
- if (startTime == Long.MAX_VALUE) {
- LOG.warn("Programming error, RunningCompaction::startTime not set before
being compared.");
- }
- return startTime;
- }
-
- public void setStartTime(Long time) {
- // Ignore update if startTime has already been set
- if (startTime == Long.MAX_VALUE) {
- startTime = time;
- }
- }
-
-}
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactorService.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactorService.java
index 296157617b..c1a34dc42c 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactorService.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactorService.java
@@ -29,7 +29,7 @@ public class CompactorService {
public interface Iface {
- public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException;
+ public TExternalCompaction
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException;
public java.lang.String
getRunningCompactionId(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException;
@@ -41,7 +41,7 @@ public class CompactorService {
public interface AsyncIface {
- public void
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
resultHandler) throws org.apache.thrift.TException;
+ public void
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<TExternalCompaction> resultHandler)
throws org.apache.thrift.TException;
public void
getRunningCompactionId(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler)
throws org.apache.thrift.TException;
@@ -74,7 +74,7 @@ public class CompactorService {
}
@Override
- public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException
+ public TExternalCompaction
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException
{
send_getRunningCompaction(tinfo, credentials);
return recv_getRunningCompaction();
@@ -88,7 +88,7 @@ public class CompactorService {
sendBase("getRunningCompaction", args);
}
- public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob
recv_getRunningCompaction() throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException
+ public TExternalCompaction recv_getRunningCompaction() throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException
{
getRunningCompaction_result result = new getRunningCompaction_result();
receiveBase(result, "getRunningCompaction");
@@ -200,17 +200,17 @@ public class CompactorService {
}
@Override
- public void
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
resultHandler) throws org.apache.thrift.TException {
+ public void
getRunningCompaction(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<TExternalCompaction> resultHandler)
throws org.apache.thrift.TException {
checkReady();
getRunningCompaction_call method_call = new
getRunningCompaction_call(tinfo, credentials, resultHandler, this,
___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
- public static class getRunningCompaction_call extends
org.apache.thrift.async.TAsyncMethodCall<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
{
+ public static class getRunningCompaction_call extends
org.apache.thrift.async.TAsyncMethodCall<TExternalCompaction> {
private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo;
private org.apache.accumulo.core.securityImpl.thrift.TCredentials
credentials;
- public
getRunningCompaction_call(org.apache.accumulo.core.clientImpl.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
resultHandler, org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory,
org.apache.thrift.transport.TNonblockingTransport transport) throws
org.apache.thrift.TException {
+ public
getRunningCompaction_call(org.apache.accumulo.core.clientImpl.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<TExternalCompaction> resultHandler,
org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory,
org.apache.thrift.transport.TNonblockingTransport transport) throws
org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.tinfo = tinfo;
this.credentials = credentials;
@@ -227,7 +227,7 @@ public class CompactorService {
}
@Override
- public
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob getResult()
throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException {
+ public TExternalCompaction getResult() throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.thrift.TException {
if (getState() !=
org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new java.lang.IllegalStateException("Method call not
finished!");
}
@@ -519,7 +519,7 @@ public class CompactorService {
return processMap;
}
- public static class getRunningCompaction<I extends AsyncIface> extends
org.apache.thrift.AsyncProcessFunction<I, getRunningCompaction_args,
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> {
+ public static class getRunningCompaction<I extends AsyncIface> extends
org.apache.thrift.AsyncProcessFunction<I, getRunningCompaction_args,
TExternalCompaction> {
public getRunningCompaction() {
super("getRunningCompaction");
}
@@ -530,11 +530,11 @@ public class CompactorService {
}
@Override
- public
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
getResultHandler(final
org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final
int seqid) {
+ public org.apache.thrift.async.AsyncMethodCallback<TExternalCompaction>
getResultHandler(final
org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final
int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
- return new
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>()
{
+ return new
org.apache.thrift.async.AsyncMethodCallback<TExternalCompaction>() {
@Override
- public void
onComplete(org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob
o) {
+ public void onComplete(TExternalCompaction o) {
getRunningCompaction_result result = new
getRunningCompaction_result();
result.success = o;
try {
@@ -585,7 +585,7 @@ public class CompactorService {
}
@Override
- public void start(I iface, getRunningCompaction_args args,
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
resultHandler) throws org.apache.thrift.TException {
+ public void start(I iface, getRunningCompaction_args args,
org.apache.thrift.async.AsyncMethodCallback<TExternalCompaction> resultHandler)
throws org.apache.thrift.TException {
iface.getRunningCompaction(args.tinfo, args.credentials,resultHandler);
}
}
@@ -1304,7 +1304,7 @@ public class CompactorService {
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new
getRunningCompaction_resultStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new getRunningCompaction_resultTupleSchemeFactory();
- public @org.apache.thrift.annotation.Nullable
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob success; //
required
+ public @org.apache.thrift.annotation.Nullable TExternalCompaction success;
// required
public @org.apache.thrift.annotation.Nullable
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException sec; //
required
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
@@ -1377,7 +1377,7 @@ public class CompactorService {
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap
= new java.util.EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.SUCCESS, new
org.apache.thrift.meta_data.FieldMetaData("success",
org.apache.thrift.TFieldRequirementType.DEFAULT,
- new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob.class)));
+ new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
TExternalCompaction.class)));
tmpMap.put(_Fields.SEC, new
org.apache.thrift.meta_data.FieldMetaData("sec",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException.class)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
@@ -1388,7 +1388,7 @@ public class CompactorService {
}
public getRunningCompaction_result(
- org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob
success,
+ TExternalCompaction success,
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException sec)
{
this();
@@ -1401,7 +1401,7 @@ public class CompactorService {
*/
public getRunningCompaction_result(getRunningCompaction_result other) {
if (other.isSetSuccess()) {
- this.success = new
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob(other.success);
+ this.success = new TExternalCompaction(other.success);
}
if (other.isSetSec()) {
this.sec = new
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException(other.sec);
@@ -1420,11 +1420,11 @@ public class CompactorService {
}
@org.apache.thrift.annotation.Nullable
- public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob
getSuccess() {
+ public TExternalCompaction getSuccess() {
return this.success;
}
- public getRunningCompaction_result
setSuccess(@org.apache.thrift.annotation.Nullable
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob success) {
+ public getRunningCompaction_result
setSuccess(@org.apache.thrift.annotation.Nullable TExternalCompaction success) {
this.success = success;
return this;
}
@@ -1476,7 +1476,7 @@ public class CompactorService {
if (value == null) {
unsetSuccess();
} else {
-
setSuccess((org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob)value);
+ setSuccess((TExternalCompaction)value);
}
break;
@@ -1686,7 +1686,7 @@ public class CompactorService {
switch (schemeField.id) {
case 0: // SUCCESS
if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT)
{
- struct.success = new
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob();
+ struct.success = new TExternalCompaction();
struct.success.read(iprot);
struct.setSuccessIsSet(true);
} else {
@@ -1767,7 +1767,7 @@ public class CompactorService {
org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.thrift.protocol.TTupleProtocol) prot;
java.util.BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
- struct.success = new
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob();
+ struct.success = new TExternalCompaction();
struct.success.read(iprot);
struct.setSuccessIsSet(true);
}
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TExternalCompaction.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TExternalCompaction.java
index daaf22fe1f..01bbfe6542 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TExternalCompaction.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TExternalCompaction.java
@@ -32,6 +32,7 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
private static final org.apache.thrift.protocol.TField COMPACTOR_FIELD_DESC
= new org.apache.thrift.protocol.TField("compactor",
org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField UPDATES_FIELD_DESC =
new org.apache.thrift.protocol.TField("updates",
org.apache.thrift.protocol.TType.MAP, (short)3);
private static final org.apache.thrift.protocol.TField JOB_FIELD_DESC = new
org.apache.thrift.protocol.TField("job",
org.apache.thrift.protocol.TType.STRUCT, (short)4);
+ private static final org.apache.thrift.protocol.TField START_TIME_FIELD_DESC
= new org.apache.thrift.protocol.TField("startTime",
org.apache.thrift.protocol.TType.I64, (short)5);
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new TExternalCompactionStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new TExternalCompactionTupleSchemeFactory();
@@ -40,13 +41,15 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
public @org.apache.thrift.annotation.Nullable java.lang.String compactor; //
required
public @org.apache.thrift.annotation.Nullable
java.util.Map<java.lang.Long,TCompactionStatusUpdate> updates; // required
public @org.apache.thrift.annotation.Nullable
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob job; //
required
+ public long startTime; // required
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
GROUP_NAME((short)1, "groupName"),
COMPACTOR((short)2, "compactor"),
UPDATES((short)3, "updates"),
- JOB((short)4, "job");
+ JOB((short)4, "job"),
+ START_TIME((short)5, "startTime");
private static final java.util.Map<java.lang.String, _Fields> byName = new
java.util.HashMap<java.lang.String, _Fields>();
@@ -70,6 +73,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
return UPDATES;
case 4: // JOB
return JOB;
+ case 5: // START_TIME
+ return START_TIME;
default:
return null;
}
@@ -113,6 +118,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
}
// isset id assignments
+ private static final int __STARTTIME_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
public static final java.util.Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
new java.util.EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -126,6 +133,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
TCompactionStatusUpdate.class))));
tmpMap.put(_Fields.JOB, new
org.apache.thrift.meta_data.FieldMetaData("job",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob.class)));
+ tmpMap.put(_Fields.START_TIME, new
org.apache.thrift.meta_data.FieldMetaData("startTime",
org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExternalCompaction.class,
metaDataMap);
}
@@ -137,19 +146,23 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
java.lang.String groupName,
java.lang.String compactor,
java.util.Map<java.lang.Long,TCompactionStatusUpdate> updates,
- org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob job)
+ org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob job,
+ long startTime)
{
this();
this.groupName = groupName;
this.compactor = compactor;
this.updates = updates;
this.job = job;
+ this.startTime = startTime;
+ setStartTimeIsSet(true);
}
/**
* Performs a deep copy on <i>other</i>.
*/
public TExternalCompaction(TExternalCompaction other) {
+ __isset_bitfield = other.__isset_bitfield;
if (other.isSetGroupName()) {
this.groupName = other.groupName;
}
@@ -174,6 +187,7 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
if (other.isSetJob()) {
this.job = new
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob(other.job);
}
+ this.startTime = other.startTime;
}
@Override
@@ -187,6 +201,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
this.compactor = null;
this.updates = null;
this.job = null;
+ setStartTimeIsSet(false);
+ this.startTime = 0;
}
@org.apache.thrift.annotation.Nullable
@@ -300,6 +316,29 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
}
}
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public TExternalCompaction setStartTime(long startTime) {
+ this.startTime = startTime;
+ setStartTimeIsSet(true);
+ return this;
+ }
+
+ public void unsetStartTime() {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield,
__STARTTIME_ISSET_ID);
+ }
+
+ /** Returns true if field startTime is set (has been assigned a value) and
false otherwise */
+ public boolean isSetStartTime() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield,
__STARTTIME_ISSET_ID);
+ }
+
+ public void setStartTimeIsSet(boolean value) {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __STARTTIME_ISSET_ID,
value);
+ }
+
@Override
public void setFieldValue(_Fields field,
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
@@ -335,6 +374,14 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
}
break;
+ case START_TIME:
+ if (value == null) {
+ unsetStartTime();
+ } else {
+ setStartTime((java.lang.Long)value);
+ }
+ break;
+
}
}
@@ -354,6 +401,9 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
case JOB:
return getJob();
+ case START_TIME:
+ return getStartTime();
+
}
throw new java.lang.IllegalStateException();
}
@@ -374,6 +424,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
return isSetUpdates();
case JOB:
return isSetJob();
+ case START_TIME:
+ return isSetStartTime();
}
throw new java.lang.IllegalStateException();
}
@@ -427,6 +479,15 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
return false;
}
+ boolean this_present_startTime = true;
+ boolean that_present_startTime = true;
+ if (this_present_startTime || that_present_startTime) {
+ if (!(this_present_startTime && that_present_startTime))
+ return false;
+ if (this.startTime != that.startTime)
+ return false;
+ }
+
return true;
}
@@ -450,6 +511,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
if (isSetJob())
hashCode = hashCode * 8191 + job.hashCode();
+ hashCode = hashCode * 8191 +
org.apache.thrift.TBaseHelper.hashCode(startTime);
+
return hashCode;
}
@@ -501,6 +564,16 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetStartTime(),
other.isSetStartTime());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetStartTime()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.startTime,
other.startTime);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -556,6 +629,10 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
sb.append(this.job);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("startTime:");
+ sb.append(this.startTime);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -578,6 +655,8 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
private void readObject(java.io.ObjectInputStream in) throws
java.io.IOException, java.lang.ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java
serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new
org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -650,6 +729,14 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
break;
+ case 5: // START_TIME
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.startTime = iprot.readI64();
+ struct.setStartTimeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
@@ -694,6 +781,9 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
struct.job.write(oprot);
oprot.writeFieldEnd();
}
+ oprot.writeFieldBegin(START_TIME_FIELD_DESC);
+ oprot.writeI64(struct.startTime);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -725,7 +815,10 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
if (struct.isSetJob()) {
optionals.set(3);
}
- oprot.writeBitSet(optionals, 4);
+ if (struct.isSetStartTime()) {
+ optionals.set(4);
+ }
+ oprot.writeBitSet(optionals, 5);
if (struct.isSetGroupName()) {
oprot.writeString(struct.groupName);
}
@@ -745,12 +838,15 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
if (struct.isSetJob()) {
struct.job.write(oprot);
}
+ if (struct.isSetStartTime()) {
+ oprot.writeI64(struct.startTime);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot,
TExternalCompaction struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(4);
+ java.util.BitSet incoming = iprot.readBitSet(5);
if (incoming.get(0)) {
struct.groupName = iprot.readString();
struct.setGroupNameIsSet(true);
@@ -780,6 +876,10 @@ public class TExternalCompaction implements
org.apache.thrift.TBase<TExternalCom
struct.job.read(iprot);
struct.setJobIsSet(true);
}
+ if (incoming.get(4)) {
+ struct.startTime = iprot.readI64();
+ struct.setStartTimeIsSet(true);
+ }
}
}
diff --git a/core/src/main/thrift/compaction-coordinator.thrift
b/core/src/main/thrift/compaction-coordinator.thrift
index 99d5df10ee..4b506ebf51 100644
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@ -53,6 +53,7 @@ struct TExternalCompaction {
2:string compactor
3:map<i64,TCompactionStatusUpdate> updates
4:tabletserver.TExternalCompactionJob job
+ 5:i64 startTime
}
struct TExternalCompactionList {
@@ -183,7 +184,7 @@ service CompactionCoordinatorService {
service CompactorService {
- tabletserver.TExternalCompactionJob getRunningCompaction(
+ TExternalCompaction getRunningCompaction(
1:client.TInfo tinfo
2:security.TCredentials credentials
) throws (
@@ -197,6 +198,10 @@ service CompactorService {
1:client.ThriftSecurityException sec
)
+ /*
+ * Called by the Shell listcompactions command that is
+ * used to return minc and majc information
+ */
list<tabletserver.ActiveCompaction> getActiveCompactions(
2:client.TInfo tinfo
1:security.TCredentials credentials
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ListCompactions.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ListCompactions.java
index df57b9e07b..5d9547f860 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ListCompactions.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ListCompactions.java
@@ -33,7 +33,6 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
-import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.ListCompactions.RunningCommandOpts;
@@ -64,17 +63,17 @@ public class ListCompactions extends
ServerKeywordExecutable<RunningCommandOpts>
private int numFiles = 0;
private double progress = 0.0;
- public RunningCompactionSummary(RunningCompaction runningCompaction,
- RunningCompactionInfo runningCompactionInfo) {
+ public RunningCompactionSummary(TExternalCompaction runningCompaction,
boolean addDetail) {
super();
ecid = runningCompaction.getJob().getExternalCompactionId();
- addr = runningCompaction.getCompactorAddress();
+ addr = runningCompaction.getCompactor();
kind = runningCompaction.getJob().kind;
- groupName = runningCompaction.getGroup();
+ groupName = ResourceGroupId.of(runningCompaction.getGroupName());
KeyExtent extent =
KeyExtent.fromThrift(runningCompaction.getJob().extent);
ke = extent.obscured();
tableId = extent.tableId().canonical();
- if (runningCompactionInfo != null) {
+ if (addDetail) {
+ RunningCompactionInfo runningCompactionInfo = new
RunningCompactionInfo(runningCompaction);
status = runningCompactionInfo.status;
lastUpdate = runningCompactionInfo.lastUpdate;
duration = runningCompactionInfo.duration;
@@ -207,8 +206,7 @@ public class ListCompactions extends
ServerKeywordExecutable<RunningCommandOpts>
if (ec == null) {
continue;
}
- var summary = new RunningCompactionSummary(new RunningCompaction(ec),
- details ? new RunningCompactionInfo(ec) : null);
+ var summary = new RunningCompactionSummary(ec, details);
results.add(summary);
}
return results;
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
index 38d03faaf2..a768ab8f88 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
@@ -21,15 +21,15 @@ package org.apache.accumulo.compactor;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
-import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.server.compaction.FileCompactor;
public class CompactionJobHolder {
- private TExternalCompactionJob job;
+ private TExternalCompaction currentCompaction;
private Thread compactionThread;
private AtomicReference<FileCompactor> compactor;
private volatile boolean cancelled = false;
@@ -38,19 +38,19 @@ public class CompactionJobHolder {
CompactionJobHolder() {}
public synchronized void reset() {
- job = null;
+ currentCompaction = null;
compactionThread = null;
compactor = null;
cancelled = false;
stats = null;
}
- public synchronized TExternalCompactionJob getJob() {
- return job;
+ public synchronized TExternalCompaction getCurrentCompaction() {
+ return currentCompaction;
}
public TableId getTableId() {
- var tKeyExtent = getJob().getExtent();
+ var tKeyExtent = getCurrentCompaction().getJob().getExtent();
return KeyExtent.fromThrift(tKeyExtent).tableId();
}
@@ -63,7 +63,7 @@ public class CompactionJobHolder {
}
public synchronized boolean cancel(String extCompId) {
- if (isSet() && getJob().getExternalCompactionId().equals(extCompId)) {
+ if (isSet() &&
getCurrentCompaction().getJob().getExternalCompactionId().equals(extCompId)) {
cancelled = true;
if (compactor.get() != null) {
compactor.get().interrupt();
@@ -79,15 +79,15 @@ public class CompactionJobHolder {
}
public synchronized boolean isSet() {
- return (null != this.job);
+ return (null != this.currentCompaction);
}
- public synchronized void set(TExternalCompactionJob job, Thread
compactionThread,
+ public synchronized void set(TExternalCompaction job, Thread
compactionThread,
AtomicReference<FileCompactor> compactor) {
- Objects.requireNonNull(job, "CompactionJob is null");
+ Objects.requireNonNull(job, "Compaction Job is null");
Objects.requireNonNull(compactionThread, "Compaction thread is null");
Objects.requireNonNull(compactor, "Compactor object is null");
- this.job = job;
+ this.currentCompaction = job;
this.compactionThread = compactionThread;
this.compactor = compactor;
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 66f11b7eb2..92239742ab 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -67,6 +67,7 @@ import
org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.C
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -313,48 +314,53 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
protected void checkIfCanceled() {
- TExternalCompactionJob job = JOB_HOLDER.getJob();
- if (job != null) {
- try {
- var extent = KeyExtent.fromThrift(job.getExtent());
- var ecid = ExternalCompactionId.of(job.getExternalCompactionId());
-
- TabletMetadata tabletMeta =
- getContext().getAmple().readTablet(extent, ColumnType.ECOMP,
ColumnType.PREV_ROW);
- if (tabletMeta == null ||
!tabletMeta.getExternalCompactions().containsKey(ecid)) {
- // table was deleted OR tablet was split or merged OR tablet no
longer thinks compaction
- // is running for some reason
- LOG.info("Cancelling compaction {} that no longer has a metadata
entry at {}", ecid,
- extent);
- JOB_HOLDER.cancel(job.getExternalCompactionId());
- return;
- }
+ TExternalCompaction tec = JOB_HOLDER.getCurrentCompaction();
+ if (tec == null) {
+ return;
+ }
+ TExternalCompactionJob job = tec.getJob();
+ if (job == null) {
+ return;
+ }
+ try {
+ var extent = KeyExtent.fromThrift(job.getExtent());
+ var ecid = ExternalCompactionId.of(job.getExternalCompactionId());
+
+ TabletMetadata tabletMeta =
+ getContext().getAmple().readTablet(extent, ColumnType.ECOMP,
ColumnType.PREV_ROW);
+ if (tabletMeta == null ||
!tabletMeta.getExternalCompactions().containsKey(ecid)) {
+ // table was deleted OR tablet was split or merged OR tablet no longer
thinks compaction
+ // is running for some reason
+ LOG.info("Cancelling compaction {} that no longer has a metadata entry
at {}", ecid,
+ extent);
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
+ return;
+ }
- var tableState = getContext().getTableState(extent.tableId());
- if (tableState != TableState.ONLINE) {
- LOG.info("Cancelling compaction {} because table state is {}", ecid,
tableState);
- JOB_HOLDER.cancel(job.getExternalCompactionId());
- return;
- }
+ var tableState = getContext().getTableState(extent.tableId());
+ if (tableState != TableState.ONLINE) {
+ LOG.info("Cancelling compaction {} because table state is {}", ecid,
tableState);
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
+ return;
+ }
- if (job.getKind() == TCompactionKind.USER) {
+ if (job.getKind() == TCompactionKind.USER) {
- var cconf =
- CompactionConfigStorage.getConfig(getContext(),
FateId.fromThrift(job.getFateId()));
+ var cconf =
+ CompactionConfigStorage.getConfig(getContext(),
FateId.fromThrift(job.getFateId()));
- if (cconf == null) {
- LOG.info("Cancelling compaction {} for user compaction that no
longer exists {} {}",
- ecid, FateId.fromThrift(job.getFateId()), extent);
- JOB_HOLDER.cancel(job.getExternalCompactionId());
- }
+ if (cconf == null) {
+ LOG.info("Cancelling compaction {} for user compaction that no
longer exists {} {}", ecid,
+ FateId.fromThrift(job.getFateId()), extent);
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
}
- } catch (RuntimeException | KeeperException e) {
- LOG.warn("Failed to check if compaction {} for {} was canceled.",
- job.getExternalCompactionId(),
KeyExtent.fromThrift(job.getExtent()), e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
}
+ } catch (RuntimeException | KeeperException e) {
+ LOG.warn("Failed to check if compaction {} for {} was canceled.",
+ job.getExternalCompactionId(),
KeyExtent.fromThrift(job.getExtent()), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
}
@@ -467,6 +473,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
*/
protected void updateCompactionState(TExternalCompactionJob job,
TCompactionStatusUpdate update)
throws RetriesExceededException {
+ JOB_HOLDER.getCurrentCompaction().putToUpdates(System.currentTimeMillis(),
update);
RetryableThriftCall<String> thriftCall =
new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25,
() -> {
Client coordinatorClient = getCoordinatorClient();
@@ -891,7 +898,11 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
final Thread compactionThread = Threads.createNonCriticalThread(
"Compaction job for tablet " + job.getExtent().toString(), fcr);
- JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
+ final TExternalCompaction current = new TExternalCompaction();
+ current.setCompactor(clientAddress.toString());
+ current.setGroupName(getResourceGroup().canonical());
+ current.setJob(job);
+ JOB_HOLDER.set(current, compactionThread, fcr.getFileCompactor());
try {
// mark compactor as busy while compacting
@@ -913,6 +924,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
compactionThread.start(); // start the compactionThread
+ current.setStartTime(System.currentTimeMillis());
started.await(); // wait until the compactor is started
final long inputEntries = totalInputEntries.sum();
final long waitTime =
calculateProgressCheckTime(totalInputBytes.sum());
@@ -1093,7 +1105,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
* @return current compaction job or empty compaction job is none running
*/
@Override
- public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials
credentials)
+ public TExternalCompaction getRunningCompaction(TInfo tinfo, TCredentials
credentials)
throws ThriftSecurityException, TException {
// do not expect users to call this directly, expect other tservers to
call this method
if
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
@@ -1105,13 +1117,13 @@ public class Compactor extends AbstractServer
implements MetricsProducer, Compac
// method is called by a coordinator starting up to determine what is
currently running on all
// compactors.
- TExternalCompactionJob job = null;
+ TExternalCompaction job = null;
synchronized (JOB_HOLDER) {
- job = JOB_HOLDER.getJob();
+ job = JOB_HOLDER.getCurrentCompaction();
}
if (null == job) {
- return new TExternalCompactionJob();
+ return new TExternalCompaction();
} else {
return job;
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
index 7a596250e9..795db2ea3e 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
@@ -75,7 +75,7 @@ public class ExtCEnv implements CompactionEnv {
ExtCEnv(CompactionJobHolder jobHolder, ResourceGroupId groupName) {
this.jobHolder = jobHolder;
- this.job = jobHolder.getJob();
+ this.job = jobHolder.getCurrentCompaction().getJob();
this.groupName = groupName;
}
@@ -100,7 +100,7 @@ public class ExtCEnv implements CompactionEnv {
builder.isUserCompaction();
}
- if (!jobHolder.getJob().isPropagateDeletes()) {
+ if (!jobHolder.getCurrentCompaction().getJob().isPropagateDeletes()) {
builder.isFullMajorCompaction();
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 1939728622..0729f09169 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -127,7 +127,6 @@ import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
-import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.volume.Volume;
@@ -174,10 +173,10 @@ public class CompactionCoordinator
private static final int UPPER_LIMIT = 50;
- Comparator<RunningCompaction> oldestFirstComparator =
- Comparator.comparingLong(RunningCompaction::getStartTime)
+ Comparator<TExternalCompaction> oldestFirstComparator =
+ Comparator.comparingLong(TExternalCompaction::getStartTime)
.thenComparing(rc -> rc.getJob().getExternalCompactionId());
- private final ConcurrentSkipListSet<RunningCompaction> compactions =
+ private final ConcurrentSkipListSet<TExternalCompaction> compactions =
new ConcurrentSkipListSet<>(oldestFirstComparator);
// Tracking size here as ConcurrentSkipListSet.size() is not constant time
@@ -187,7 +186,7 @@ public class CompactionCoordinator
return size.get();
}
- public boolean add(RunningCompaction e) {
+ public boolean add(TExternalCompaction e) {
boolean added = compactions.add(e);
if (added) {
if (size.incrementAndGet() > UPPER_LIMIT) {
@@ -205,11 +204,11 @@ public class CompactionCoordinator
return removed;
}
- public Iterator<RunningCompaction> iterator() {
+ public Iterator<TExternalCompaction> iterator() {
return compactions.iterator();
}
- public Stream<RunningCompaction> stream() {
+ public Stream<TExternalCompaction> stream() {
return compactions.stream();
}
@@ -253,13 +252,13 @@ public class CompactionCoordinator
"Coordinator restarted, compaction found in progress";
/*
- * Map of compactionId to RunningCompactions. This is an informational cache
of what external
+ * Map of compactionId to TExternalCompaction. This is an informational
cache of what external
* compactions may be running. Its possible it may contain external
compactions that are not
* actually running. It may not contain compactions that are actually
running. The metadata table
* is the most authoritative source of what external compactions are
currently running, but it
* does not have the stats that this map has.
*/
- protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
+ protected final Map<ExternalCompactionId,TExternalCompaction> RUNNING_CACHE =
new ConcurrentHashMap<>();
protected final Map<String,TimeOrderedRunningCompactionSet>
LONG_RUNNING_COMPACTIONS_BY_RG =
@@ -275,7 +274,7 @@ public class CompactionCoordinator
// Exposed for tests
protected final CountDownLatch shutdown = new CountDownLatch(1);
- private final Cache<ExternalCompactionId,RunningCompaction> completed;
+ private final Cache<ExternalCompactionId,TExternalCompaction> completed;
private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
private final Cache<Path,Integer> tabletDirCache;
private final DeadCompactionDetector deadCompactionDetector;
@@ -407,21 +406,20 @@ public class CompactionCoordinator
// the external compaction came from to re-populate the RUNNING collection.
LOG.info("Checking for running external compactions");
// On re-start contact the running Compactors to try and seed the list of
running compactions
- List<RunningCompaction> running = getCompactionsRunningOnCompactors();
+ List<TExternalCompaction> running = getCompactionsRunningOnCompactors();
if (running.isEmpty()) {
LOG.info("No running external compactions found");
} else {
LOG.info("Found {} running external compactions", running.size());
- running.forEach(rc -> {
+ running.forEach(tec -> {
TCompactionStatusUpdate update = new TCompactionStatusUpdate();
update.setState(TCompactionState.IN_PROGRESS);
update.setMessage(RESTART_UPDATE_MSG);
- rc.addUpdate(System.currentTimeMillis(), update);
- rc.setStartTime(this.coordinatorStartTime);
-
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
rc);
+ tec.putToUpdates(coordinatorStartTime, update);
+
RUNNING_CACHE.put(ExternalCompactionId.of(tec.getJob().getExternalCompactionId()),
tec);
LONG_RUNNING_COMPACTIONS_BY_RG
- .computeIfAbsent(rc.getGroup().canonical(), k -> new
TimeOrderedRunningCompactionSet())
- .add(rc);
+ .computeIfAbsent(tec.getGroupName(), k -> new
TimeOrderedRunningCompactionSet())
+ .add(tec);
});
}
@@ -449,12 +447,12 @@ public class CompactionCoordinator
final Set<String> emptyQueues = new HashSet<>();
// Remove all of the compactors that are running a compaction
- RUNNING_CACHE.values().forEach(rc -> {
- Set<HostAndPort> busyCompactors =
allCompactors.get(rc.getGroup().canonical());
+ RUNNING_CACHE.values().forEach(tec -> {
+ Set<HostAndPort> busyCompactors = allCompactors.get(tec.getGroupName());
if (busyCompactors != null
- &&
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
+ &&
busyCompactors.remove(HostAndPort.fromString(tec.getCompactor()))) {
if (busyCompactors.isEmpty()) {
- emptyQueues.add(rc.getGroup().canonical());
+ emptyQueues.add(tec.getGroupName());
}
}
});
@@ -523,8 +521,11 @@ public class CompactionCoordinator
result = createThriftJob(externalCompactionId, ecm, rcJob,
compactionConfig);
// It is possible that by the time this added that the the compactor
that made this request
// is dead. In this cases the compaction is not actually running.
-
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
- new RunningCompaction(result, compactorAddress, groupId));
+ TExternalCompaction tec = new TExternalCompaction();
+ tec.setCompactor(compactorAddress);
+ tec.setGroupName(groupName);
+ tec.setJob(result);
+
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
tec);
TabletLogger.compacting(rcJob.getExtent(), rcJob.getSelectedFateId(),
cid, compactorAddress,
rcJob, ecm.getCompactTmpName());
break;
@@ -849,11 +850,11 @@ public class CompactionCoordinator
}
private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
- var rc = RUNNING_CACHE.get(ecid);
- if (rc != null) {
- failingQueues.compute(rc.getGroup(), FailureCounts::incrementFailure);
- final String compactor = rc.getCompactorAddress();
- failingCompactors.compute(compactor, FailureCounts::incrementFailure);
+ var tec = RUNNING_CACHE.get(ecid);
+ if (tec != null) {
+ failingQueues.compute(ResourceGroupId.of(tec.getGroupName()),
+ FailureCounts::incrementFailure);
+ failingCompactors.compute(tec.getCompactor(),
FailureCounts::incrementFailure);
}
failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
}
@@ -908,11 +909,11 @@ public class CompactionCoordinator
}
private void captureSuccess(ExternalCompactionId ecid, KeyExtent extent) {
- var rc = RUNNING_CACHE.get(ecid);
- if (rc != null) {
- failingQueues.compute(rc.getGroup(), FailureCounts::incrementSuccess);
- final String compactor = rc.getCompactorAddress();
- failingCompactors.compute(compactor, FailureCounts::incrementSuccess);
+ var tec = RUNNING_CACHE.get(ecid);
+ if (tec != null) {
+ failingQueues.compute(ResourceGroupId.of(tec.getGroupName()),
+ FailureCounts::incrementSuccess);
+ failingCompactors.compute(tec.getCompactor(),
FailureCounts::incrementSuccess);
}
failingTables.compute(extent.tableId(), FailureCounts::incrementSuccess);
}
@@ -1048,20 +1049,22 @@ public class CompactionCoordinator
}
LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}",
externalCompactionId,
timestamp, update);
- final RunningCompaction rc =
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
- if (null != rc) {
- rc.addUpdate(timestamp, update);
+ final TExternalCompaction tec =
+ RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
+ if (null != tec) {
+ tec.putToUpdates(timestamp, update);
switch (update.state) {
case STARTED:
-
LONG_RUNNING_COMPACTIONS_BY_RG.computeIfAbsent(rc.getGroup().canonical(),
- k -> new TimeOrderedRunningCompactionSet()).add(rc);
+ LONG_RUNNING_COMPACTIONS_BY_RG
+ .computeIfAbsent(tec.getGroupName(), k -> new
TimeOrderedRunningCompactionSet())
+ .add(tec);
break;
case CANCELLED:
case FAILED:
case SUCCEEDED:
- var compactionSet =
LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroup().canonical());
+ var compactionSet =
LONG_RUNNING_COMPACTIONS_BY_RG.get(tec.getGroupName());
if (compactionSet != null) {
- compactionSet.remove(rc);
+ compactionSet.remove(tec);
}
break;
case ASSIGNED:
@@ -1083,12 +1086,12 @@ public class CompactionCoordinator
}
public void recordCompletion(ExternalCompactionId ecid) {
- var rc = RUNNING_CACHE.remove(ecid);
- if (rc != null) {
- completed.put(ecid, rc);
- var compactionSet =
LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroup().canonical());
+ var tec = RUNNING_CACHE.remove(ecid);
+ if (tec != null) {
+ completed.put(ecid, tec);
+ var compactionSet =
LONG_RUNNING_COMPACTIONS_BY_RG.get(tec.getGroupName());
if (compactionSet != null) {
- compactionSet.remove(rc);
+ compactionSet.remove(tec);
}
}
}
@@ -1120,13 +1123,8 @@ public class CompactionCoordinator
}
final TExternalCompactionMap result = new TExternalCompactionMap();
- RUNNING_CACHE.forEach((ecid, rc) -> {
- TExternalCompaction trc = new TExternalCompaction();
- trc.setGroupName(rc.getGroup().canonical());
- trc.setCompactor(rc.getCompactorAddress());
- trc.setUpdates(rc.getUpdates());
- trc.setJob(rc.getJob());
- result.putToCompactions(ecid.canonical(), trc);
+ RUNNING_CACHE.forEach((ecid, tec) -> {
+ result.putToCompactions(ecid.canonical(), tec);
});
return result;
}
@@ -1154,15 +1152,10 @@ public class CompactionCoordinator
for (Entry<String,TimeOrderedRunningCompactionSet> e :
LONG_RUNNING_COMPACTIONS_BY_RG
.entrySet()) {
final TExternalCompactionList compactions = new
TExternalCompactionList();
- Iterator<RunningCompaction> iter = e.getValue().iterator();
+ Iterator<TExternalCompaction> iter = e.getValue().iterator();
while (iter.hasNext()) {
- RunningCompaction rc = iter.next();
- TExternalCompaction trc = new TExternalCompaction();
- trc.setGroupName(rc.getGroup().canonical());
- trc.setCompactor(rc.getCompactorAddress());
- trc.setUpdates(rc.getUpdates());
- trc.setJob(rc.getJob());
- compactions.addToCompactions(trc);
+ TExternalCompaction tec = iter.next();
+ compactions.addToCompactions(tec);
}
result.put(e.getKey(), compactions);
}
@@ -1186,13 +1179,8 @@ public class CompactionCoordinator
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
final TExternalCompactionMap result = new TExternalCompactionMap();
- completed.asMap().forEach((ecid, rc) -> {
- TExternalCompaction trc = new TExternalCompaction();
- trc.setGroupName(rc.getGroup().canonical());
- trc.setCompactor(rc.getCompactorAddress());
- trc.setJob(rc.getJob());
- trc.setUpdates(rc.getUpdates());
- result.putToCompactions(ecid.canonical(), trc);
+ completed.asMap().forEach((ecid, tec) -> {
+ result.putToCompactions(ecid.canonical(), tec);
});
return result;
}
@@ -1213,7 +1201,7 @@ public class CompactionCoordinator
TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND,
e.getMessage());
}
- cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(),
externalCompactionId);
+ cancelCompactionOnCompactor(runningCompaction.getCompactor(),
externalCompactionId);
}
/* Method exists to be called from test */
@@ -1222,7 +1210,7 @@ public class CompactionCoordinator
}
/* Method exists to be overridden in test to hide static method */
- protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
+ protected List<TExternalCompaction> getCompactionsRunningOnCompactors() {
return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
}
@@ -1373,14 +1361,13 @@ public class CompactionCoordinator
Sets.difference(groupsWithJobs, groupsInConfiguration);
if (jobGroupsNotInConfiguration != null &&
!jobGroupsNotInConfiguration.isEmpty()) {
- RUNNING_CACHE.values().forEach(rc -> {
- if
(jobGroupsNotInConfiguration.contains(ResourceGroupId.of(rc.getGroup().canonical())))
{
+ RUNNING_CACHE.values().forEach(tec -> {
+ if
(jobGroupsNotInConfiguration.contains(ResourceGroupId.of(tec.getGroupName()))) {
LOG.warn(
"External compaction {} running in group {} on compactor {},"
+ " but group not found in current configuration. Failing
compaction...",
- rc.getJob().getExternalCompactionId(), rc.getGroup(),
rc.getCompactorAddress());
- cancelCompactionOnCompactor(rc.getCompactorAddress(),
- rc.getJob().getExternalCompactionId());
+ tec.getJob().getExternalCompactionId(), tec.getGroupName(),
tec.getCompactor());
+ cancelCompactionOnCompactor(tec.getCompactor(),
tec.getJob().getExternalCompactionId());
}
});
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
index 58f4d57f6a..2d71c2d796 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
@@ -23,11 +23,11 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
@@ -40,11 +40,11 @@ public class CoordinatorSummaryLogger {
private final ServerContext ctx;
private final CompactionJobQueues jobQueues;
- private final Map<ExternalCompactionId,RunningCompaction> running;
+ private final Map<ExternalCompactionId,TExternalCompaction> running;
private final Cache<ResourceGroupId,Integer> compactorCounts;
public CoordinatorSummaryLogger(ServerContext ctx, CompactionJobQueues
jobQueues,
- Map<ExternalCompactionId,RunningCompaction> running,
+ Map<ExternalCompactionId,TExternalCompaction> running,
Cache<ResourceGroupId,Integer> compactorCounts) {
this.ctx = ctx;
this.jobQueues = jobQueues;
@@ -65,7 +65,9 @@ public class CoordinatorSummaryLogger {
} catch (TableNotFoundException e) {
tableName = "Unmapped table id: " + tid.canonical();
}
- perQueueRunningCount.computeIfAbsent(rc.getGroup(), q -> new
AtomicLong(0)).incrementAndGet();
+ perQueueRunningCount
+ .computeIfAbsent(ResourceGroupId.of(rc.getGroupName()), q -> new
AtomicLong(0))
+ .incrementAndGet();
perTableRunningCount.computeIfAbsent(tableName, t -> new
AtomicLong(0)).incrementAndGet();
});
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 0800c42d2a..29181c1241 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -71,7 +71,6 @@ import
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
-import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.Manager;
import
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
@@ -106,11 +105,11 @@ public class CompactionCoordinatorTest {
public class TestCoordinator extends CompactionCoordinator {
- private final List<RunningCompaction> runningCompactions;
+ private final List<TExternalCompaction> runningCompactions;
private Set<ExternalCompactionId> metadataCompactionIds = null;
- public TestCoordinator(Manager manager, List<RunningCompaction>
runningCompactions) {
+ public TestCoordinator(Manager manager, List<TExternalCompaction>
runningCompactions) {
super(manager, t -> null);
this.runningCompactions = runningCompactions;
}
@@ -165,7 +164,7 @@ public class CompactionCoordinatorTest {
}
}
- public Map<ExternalCompactionId,RunningCompaction> getRunning() {
+ public Map<ExternalCompactionId,TExternalCompaction> getRunning() {
return RUNNING_CACHE;
}
@@ -179,7 +178,7 @@ public class CompactionCoordinatorTest {
}
@Override
- protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
+ protected List<TExternalCompaction> getCompactionsRunningOnCompactors() {
return runningCompactions;
}
@@ -281,14 +280,21 @@ public class CompactionCoordinatorTest {
@Test
public void testCoordinatorRestartOneRunningCompaction() throws Exception {
- List<RunningCompaction> runningCompactions = new ArrayList<>();
+ List<TExternalCompaction> runningCompactions = new ArrayList<>();
ExternalCompactionId eci =
ExternalCompactionId.generate(UUID.randomUUID());
TExternalCompactionJob job = createMock(TExternalCompactionJob.class);
expect(job.getExternalCompactionId()).andReturn(eci.toString()).atLeastOnce();
TKeyExtent extent = new TKeyExtent();
extent.setTable("1".getBytes(UTF_8));
- runningCompactions.add(new RunningCompaction(job, tserverAddr.toString(),
GROUP_ID));
+
+ TExternalCompaction current = new TExternalCompaction();
+ current.setCompactor(tserverAddr.toString());
+ current.setGroupName(GROUP_ID.canonical());
+ current.setJob(job);
+
+ runningCompactions.add(current);
+
replay(job);
var coordinator = new TestCoordinator(manager, runningCompactions);
@@ -302,18 +308,18 @@ public class CompactionCoordinatorTest {
assertEquals(1, coordinator.getRunning().size());
assertEquals(1, coordinator.getLongRunningByGroup().size());
- Map<ExternalCompactionId,RunningCompaction> running =
coordinator.getRunning();
- Entry<ExternalCompactionId,RunningCompaction> ecomp =
running.entrySet().iterator().next();
+ Map<ExternalCompactionId,TExternalCompaction> running =
coordinator.getRunning();
+ Entry<ExternalCompactionId,TExternalCompaction> ecomp =
running.entrySet().iterator().next();
assertEquals(eci, ecomp.getKey());
- RunningCompaction rc = ecomp.getValue();
- assertEquals(GROUP_ID, rc.getGroup());
- assertEquals(tserverAddr.toString(), rc.getCompactorAddress());
+ TExternalCompaction tec = ecomp.getValue();
+ assertEquals(GROUP_ID, ResourceGroupId.of(tec.getGroupName()));
+ assertEquals(tserverAddr.toString(), tec.getCompactor());
assertTrue(coordinator.getLongRunningByGroup().containsKey(GROUP_ID.toString()));
assertTrue(coordinator.getLongRunningByGroup().get(GROUP_ID.toString()).size()
== 1);
- rc =
coordinator.getLongRunningByGroup().get(GROUP_ID.toString()).iterator().next();
- assertEquals(GROUP_ID, rc.getGroup());
- assertEquals(tserverAddr.toString(), rc.getCompactorAddress());
+ tec =
coordinator.getLongRunningByGroup().get(GROUP_ID.toString()).iterator().next();
+ assertEquals(GROUP_ID, ResourceGroupId.of(tec.getGroupName()));
+ assertEquals(tserverAddr.toString(), tec.getCompactor());
verify(job);
}
@@ -358,10 +364,10 @@ public class CompactionCoordinatorTest {
assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
assertEquals(1, coordinator.getRunning().size());
- Entry<ExternalCompactionId,RunningCompaction> entry =
+ Entry<ExternalCompactionId,TExternalCompaction> entry =
coordinator.getRunning().entrySet().iterator().next();
assertEquals(eci.toString(), entry.getKey().toString());
- assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
+ assertEquals("localhost:10241", entry.getValue().getCompactor());
assertEquals(eci.toString(),
entry.getValue().getJob().getExternalCompactionId());
verify(tm);
@@ -379,16 +385,10 @@ public class CompactionCoordinatorTest {
@Test
public void testCleanUpRunning() throws Exception {
TExternalCompaction ext1 = createMock(TExternalCompaction.class);
- expect(ext1.getJob()).andReturn(new
TExternalCompactionJob()).atLeastOnce();
- expect(ext1.getCompactor()).andReturn("localhost:9133").atLeastOnce();
expect(ext1.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce();
TExternalCompaction ext2 = createMock(TExternalCompaction.class);
- expect(ext2.getJob()).andReturn(new
TExternalCompactionJob()).atLeastOnce();
- expect(ext2.getCompactor()).andReturn("localhost:9133").atLeastOnce();
expect(ext2.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce();
TExternalCompaction ext3 = createMock(TExternalCompaction.class);
- expect(ext3.getJob()).andReturn(new
TExternalCompactionJob()).atLeastOnce();
- expect(ext3.getCompactor()).andReturn("localhost:9133").atLeastOnce();
expect(ext3.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce();
replay(ext1, ext2, ext3);
@@ -398,9 +398,9 @@ public class CompactionCoordinatorTest {
var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());
- coordinator.getRunning().put(ecid1, new RunningCompaction(ext1));
- coordinator.getRunning().put(ecid2, new RunningCompaction(ext2));
- coordinator.getRunning().put(ecid3, new RunningCompaction(ext3));
+ coordinator.getRunning().put(ecid1, ext1);
+ coordinator.getRunning().put(ecid2, ext2);
+ coordinator.getRunning().put(ecid3, ext3);
coordinator.cleanUpInternalState();
assertEquals(Set.of(ecid1, ecid2, ecid3),
coordinator.getRunning().keySet());
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
index 7929f33f8e..54e9176ade 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
@@ -26,13 +26,13 @@ import org.apache.accumulo.core.cli.ServerOpts;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
-import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl;
import org.apache.thrift.TException;
@@ -61,7 +61,7 @@ public class MemoryConsumingCompactor extends Compactor {
}
@Override
- public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials
credentials)
+ public TExternalCompaction getRunningCompaction(TInfo tinfo, TCredentials
credentials)
throws ThriftSecurityException, TException {
// Use the getRunningCompaction Thrift RPC to consume the memory
LOG.warn("getRunningCompaction called, consuming memory");
@@ -76,7 +76,7 @@ public class MemoryConsumingCompactor extends Compactor {
} catch (IOException e) {
throw new TException("Error consuming memory", e);
}
- return new TExternalCompactionJob();
+ return new TExternalCompaction();
}
public static void main(String[] args) throws Exception {