kevinrr888 commented on code in PR #5484:
URL: https://github.com/apache/accumulo/pull/5484#discussion_r2049090607
##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -202,104 +212,127 @@ private Client(HostAndPort server,
TabletClientService.Client service) {
}
}
- private void sendQueued(int threshhold) {
- if (queuedDataSize > threshhold || threshhold == 0) {
- var sendTimer = Timer.startNew();
+ private void sendBackground() {
+ var queue = this.backgroundQueue.get();
+ List<Client> clients = new ArrayList<>();
- List<Client> clients = new ArrayList<>();
- try {
-
- // Send load messages to tablet servers spinning up work, but do not
wait on results.
- loadQueue.forEach((server, tabletFiles) -> {
+ try {
+ var sendTimer = Timer.startNew();
- if (log.isTraceEnabled()) {
- log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
- tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
- }
+ // Send load messages to tablet servers spinning up work, but do not
wait on results.
+ queue.forEach((server, tabletFiles) -> {
- // Tablet servers process tablets serially and perform a single
metadata table write for
- // each tablet. Break the work into per-tablet chunks so it can be
sent over multiple
- // connections to the tserver, allowing each chunk to be run in
parallel on the server
- // side. This allows multiple threads on a single tserver to do
metadata writes for this
- // bulk import.
- int neededConnections = Math.min(maxConnections,
tabletFiles.size());
- List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
- new ArrayList<>(neededConnections);
- for (int i = 0; i < neededConnections; i++) {
- chunks.add(new HashMap<>());
- }
-
- int nextConnection = 0;
- for (var entry : tabletFiles.entrySet()) {
- chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(),
entry.getValue());
- }
+ if (log.isTraceEnabled()) {
+ log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
+ tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
+ }
- for (var chunk : chunks) {
- try {
- var client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
- manager.getContext(), timeInMillis);
- // add client to list before calling send in case there is an
exception, this makes
- // sure its returned in the finally
- clients.add(new Client(server, client));
- client.send_loadFilesV2(TraceUtil.traceInfo(),
manager.getContext().rpcCreds(), tid,
- bulkDir.toString(), chunk, setTime);
- } catch (TException ex) {
- log.debug("rpc send failed server: {}, {}", server, fmtTid,
ex);
- }
+ // Tablet servers process tablets serially and perform a single
metadata table write for
+ // each tablet. Break the work into per-tablet chunks so it can be
sent over multiple
+ // connections to the tserver, allowing each chunk to be run in
parallel on the server
+ // side. This allows multiple threads on a single tserver to do
metadata writes for this
+ // bulk import.
+ int neededConnections = Math.min(maxConnections, tabletFiles.size());
+ if (log.isTraceEnabled()) {
+ if (neededConnections == maxConnections) {
Review Comment:
Should this be `>=`?
##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -202,104 +212,127 @@ private Client(HostAndPort server,
TabletClientService.Client service) {
}
}
- private void sendQueued(int threshhold) {
- if (queuedDataSize > threshhold || threshhold == 0) {
- var sendTimer = Timer.startNew();
+ private void sendBackground() {
+ var queue = this.backgroundQueue.get();
+ List<Client> clients = new ArrayList<>();
- List<Client> clients = new ArrayList<>();
- try {
-
- // Send load messages to tablet servers spinning up work, but do not
wait on results.
- loadQueue.forEach((server, tabletFiles) -> {
+ try {
+ var sendTimer = Timer.startNew();
- if (log.isTraceEnabled()) {
- log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
- tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
- }
+ // Send load messages to tablet servers spinning up work, but do not
wait on results.
+ queue.forEach((server, tabletFiles) -> {
- // Tablet servers process tablets serially and perform a single
metadata table write for
- // each tablet. Break the work into per-tablet chunks so it can be
sent over multiple
- // connections to the tserver, allowing each chunk to be run in
parallel on the server
- // side. This allows multiple threads on a single tserver to do
metadata writes for this
- // bulk import.
- int neededConnections = Math.min(maxConnections,
tabletFiles.size());
- List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
- new ArrayList<>(neededConnections);
- for (int i = 0; i < neededConnections; i++) {
- chunks.add(new HashMap<>());
- }
-
- int nextConnection = 0;
- for (var entry : tabletFiles.entrySet()) {
- chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(),
entry.getValue());
- }
+ if (log.isTraceEnabled()) {
+ log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
+ tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
+ }
- for (var chunk : chunks) {
- try {
- var client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
- manager.getContext(), timeInMillis);
- // add client to list before calling send in case there is an
exception, this makes
- // sure its returned in the finally
- clients.add(new Client(server, client));
- client.send_loadFilesV2(TraceUtil.traceInfo(),
manager.getContext().rpcCreds(), tid,
- bulkDir.toString(), chunk, setTime);
- } catch (TException ex) {
- log.debug("rpc send failed server: {}, {}", server, fmtTid,
ex);
- }
+ // Tablet servers process tablets serially and perform a single
metadata table write for
+ // each tablet. Break the work into per-tablet chunks so it can be
sent over multiple
+ // connections to the tserver, allowing each chunk to be run in
parallel on the server
+ // side. This allows multiple threads on a single tserver to do
metadata writes for this
+ // bulk import.
+ int neededConnections = Math.min(maxConnections, tabletFiles.size());
+ if (log.isTraceEnabled()) {
+ if (neededConnections == maxConnections) {
+ log.trace(
+ "{} Hitting max connection limit set by property {}. Desired
connection count {}",
+ fmtTid, Property.MANAGER_BULK_MAX_CONNECTIONS.getKey(),
tabletFiles.size());
}
- });
-
- long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
- sendTimer.restart();
+ }
+ List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks = new
ArrayList<>(neededConnections);
+ for (int i = 0; i < neededConnections; i++) {
+ chunks.add(new HashMap<>());
+ }
- int outdatedTservers = 0;
+ int nextConnection = 0;
+ for (var entry : tabletFiles.entrySet()) {
+ chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(),
entry.getValue());
+ }
- // wait for all the tservers to complete processing
- for (var client : clients) {
+ for (var chunk : chunks) {
try {
- client.service.recv_loadFilesV2();
+ var client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
+ manager.getContext(), timeInMillis);
+ // add client to list before calling send in case there is an
exception, this makes
+ // sure its returned in the finally
+ clients.add(new Client(server, client));
+ client.send_loadFilesV2(TraceUtil.traceInfo(),
manager.getContext().rpcCreds(), tid,
+ bulkDir.toString(), chunk, setTime);
} catch (TException ex) {
- String additionalInfo = "";
- if (ex instanceof TApplicationException &&
((TApplicationException) ex).getType()
- == TApplicationException.UNKNOWN_METHOD) {
- // A new RPC method was added in 2.1.4, a tserver running
2.1.3 or earlier will
- // not have this RPC. This should not kill the fate operation,
it can wait until
- // all tablet servers are upgraded.
- outdatedTservers++;
- additionalInfo = " (tserver may be running older version)";
- }
- log.debug("rpc recv failed server{}: {}, {}", additionalInfo,
client.server, fmtTid,
- ex);
+ log.debug("rpc send failed server: {}, {}", server, fmtTid, ex);
}
}
-
- if (outdatedTservers > 0) {
- log.warn(
- "{} can not proceed with bulk import because {} tablet servers
are likely running "
- + "an older version. Please update tablet servers to same
patch level as manager.",
- fmtTid, outdatedTservers);
+ });
+
+ long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+ sendTimer.restart();
+
+ int outdatedTservers = 0;
+
+ // wait for all the tservers to complete processing
+ for (var client : clients) {
+ try {
+ client.service.recv_loadFilesV2();
+ } catch (TException ex) {
+ String additionalInfo = "";
+ if (ex instanceof TApplicationException
+ && ((TApplicationException) ex).getType() ==
TApplicationException.UNKNOWN_METHOD) {
+ // A new RPC method was added in 2.1.4, a tserver running 2.1.3
or earlier will
+ // not have this RPC. This should not kill the fate operation,
it can wait until
+ // all tablet servers are upgraded.
+ outdatedTservers++;
+ additionalInfo = " (tserver may be running older version)";
+ }
+ log.debug("rpc recv failed server{}: {}, {}", additionalInfo,
client.server, fmtTid,
+ ex);
}
+ }
- if (log.isDebugEnabled()) {
- var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
- var tabletStats =
loadQueue.values().stream().mapToInt(Map::size).summaryStatistics();
- log.debug(
- "{} sent {} messages to {} tablet servers for {} tablets
(min:{} max:{} avg:{} "
- + "tablets per tserver), send time:{}ms recv time:{}ms
{}:{}",
- fmtTid, clients.size(), loadQueue.size(), tabletStats.getSum(),
- tabletStats.getMin(), tabletStats.getMax(),
tabletStats.getAverage(), sendTime,
- recvTime, Property.MANAGER_BULK_MAX_CONNECTIONS.getKey(),
maxConnections);
- }
+ if (outdatedTservers > 0) {
+ log.warn(
+ "{} can not proceed with bulk import because {} tablet servers
are likely running "
+ + "an older version. Please update tablet servers to same
patch level as manager.",
+ fmtTid, outdatedTservers);
+ }
- loadQueue.clear();
- queuedDataSize = 0;
+ if (log.isDebugEnabled()) {
+ var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+ var tabletStats =
queue.values().stream().mapToInt(Map::size).summaryStatistics();
Review Comment:
Same sync question here
##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -202,104 +212,127 @@ private Client(HostAndPort server,
TabletClientService.Client service) {
}
}
- private void sendQueued(int threshhold) {
- if (queuedDataSize > threshhold || threshhold == 0) {
- var sendTimer = Timer.startNew();
+ private void sendBackground() {
+ var queue = this.backgroundQueue.get();
+ List<Client> clients = new ArrayList<>();
- List<Client> clients = new ArrayList<>();
- try {
-
- // Send load messages to tablet servers spinning up work, but do not
wait on results.
- loadQueue.forEach((server, tabletFiles) -> {
+ try {
+ var sendTimer = Timer.startNew();
- if (log.isTraceEnabled()) {
- log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
- tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
- }
+ // Send load messages to tablet servers spinning up work, but do not
wait on results.
+ queue.forEach((server, tabletFiles) -> {
Review Comment:
Does this need to be synchronized? I don't think so since it seems the queue
under the atomic ref is only ever updated via `compareAndSet` but want to
confirm
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]