This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 04bf24f Upgrade Thrift to 0.15.0 (#2273) 04bf24f is described below commit 04bf24f777a3422d968240c66adf9167ffc4b2e0 Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Sep 29 07:27:37 2021 -0400 Upgrade Thrift to 0.15.0 (#2273) --- .../core/clientImpl/ThriftTransportPool.java | 26 +++++++++++++++ .../iteratorsImpl/system/SystemIteratorUtil.java | 5 ++- .../apache/accumulo/core/rpc/FilterTransport.java | 16 +++++++++ .../apache/accumulo/core/rpc/TBufferedSocket.java | 3 +- .../accumulo/core/rpc/TTimeoutTransport.java | 38 ++++++++++++---------- .../org/apache/accumulo/core/rpc/ThriftUtil.java | 12 +++---- .../core/rpc/UGIAssumingTransportFactory.java | 9 ++++- .../accumulo/core/util/ThriftMessageUtil.java | 21 ++++++------ core/src/main/scripts/generate-thrift.sh | 2 +- .../accumulo/core/rpc/TTimeoutTransportTest.java | 9 ++--- .../accumulo/core/util/ThriftMessageUtilTest.java | 2 +- pom.xml | 2 +- .../server/rpc/CustomNonBlockingServer.java | 6 ++-- .../org/apache/accumulo/tracer/TraceFormatter.java | 8 ++--- .../org/apache/accumulo/tracer/TraceServer.java | 16 +++++++++ .../java/org/apache/accumulo/test/rpc/Mocket.java | 24 +++++++++++--- 16 files changed, 143 insertions(+), 56 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index df43f51..4be3e67 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.singletons.SingletonService; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.thrift.TConfiguration; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; @@ -654,6 +655,31 @@ public class ThriftTransportPool { } } + @Override + public TConfiguration getConfiguration() { + return wrappedTransport.getConfiguration(); + } + + @Override + public void updateKnownMessageSize(long size) throws TTransportException { + try { + ioCount++; + wrappedTransport.updateKnownMessageSize(size); + } finally { + ioCount++; + } + } + + @Override + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + try { + ioCount++; + wrappedTransport.checkReadBytesAvailable(numBytes); + } finally { + ioCount++; + } + } + public ThriftTransportKey getCacheKey() { return cacheKey; } diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SystemIteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SystemIteratorUtil.java index 9e241c2..41a7acc 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SystemIteratorUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SystemIteratorUtil.java @@ -72,9 +72,8 @@ public class SystemIteratorUtil { } public static byte[] encodeIteratorSettings(IteratorConfig iterators) { - TSerializer tser = new TSerializer(new TBinaryProtocol.Factory()); - try { + TSerializer tser = new TSerializer(new TBinaryProtocol.Factory()); return tser.serialize(iterators); } catch (TException e) { throw new RuntimeException(e); @@ -86,9 +85,9 @@ public class SystemIteratorUtil { } public static List<IteratorSetting> decodeIteratorSettings(byte[] enc) { - TDeserializer tdser = new TDeserializer(new TBinaryProtocol.Factory()); IteratorConfig ic = new IteratorConfig(); try { + TDeserializer tdser = new TDeserializer(new TBinaryProtocol.Factory()); tdser.deserialize(ic, enc); } catch (TException e) { throw new RuntimeException(e); diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java index f80dc94..3aaba4a 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.rpc; import static java.util.Objects.requireNonNull; +import org.apache.thrift.TConfiguration; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -103,4 +104,19 @@ public class FilterTransport extends TTransport { public void consumeBuffer(int len) { wrapped.consumeBuffer(len); } + + @Override + public TConfiguration getConfiguration() { + return wrapped.getConfiguration(); + } + + @Override + public void updateKnownMessageSize(long size) throws TTransportException { + wrapped.updateKnownMessageSize(size); + } + + @Override + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + wrapped.checkReadBytesAvailable(numBytes); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java b/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java index c3eb01b..aa68b96 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java @@ -24,12 +24,13 @@ import java.io.IOException; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; public class TBufferedSocket extends TIOStreamTransport { String client; - public TBufferedSocket(TSocket sock, int bufferSize) throws IOException { + public TBufferedSocket(TSocket sock, int bufferSize) throws IOException, TTransportException { super(new BufferedInputStream(sock.getSocket().getInputStream(), bufferSize), new BufferedOutputStream(sock.getSocket().getOutputStream(), bufferSize)); client = sock.getSocket().getInetAddress().getHostAddress() + ":" + sock.getSocket().getPort(); diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java index 10bff34..0bc8635 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.util.HostAndPort; import org.apache.hadoop.net.NetUtils; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +61,10 @@ public class TTimeoutTransport { * @param timeoutMillis * The timeout in milliseconds for the connection * @return A TTransport connected to the given <code>addr</code> - * @throws IOException + * @throws TTransportException * If the transport fails to be created/connected */ - public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException { + public static TTransport create(HostAndPort addr, long timeoutMillis) throws TTransportException { return INSTANCE.createInternal(new InetSocketAddress(addr.getHost(), addr.getPort()), timeoutMillis); } @@ -77,16 +78,16 @@ public class TTimeoutTransport { * @param timeoutMillis * The socket timeout in milliseconds * @return A TTransport instance to the given <code>addr</code> - * @throws IOException + * @throws TTransportException * If the Thrift client is failed to be connected/created */ - TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException { + TTransport createInternal(SocketAddress addr, long timeoutMillis) throws TTransportException { Socket socket = null; try { socket = openSocket(addr); } catch (IOException e) { // openSocket handles closing the Socket on error - throw e; + throw new TTransportException(e); } // Should be non-null @@ -98,16 +99,23 @@ public class TTimeoutTransport { OutputStream output = wrapOutputStream(socket, timeoutMillis); return new TIOStreamTransport(input, output); } catch (IOException e) { - try { - socket.close(); - } catch (IOException ioe) { - log.error("Failed to close socket after unsuccessful I/O stream setup", e); - } - + closeSocket(socket, e); + throw new TTransportException(e); + } catch (TTransportException e) { + closeSocket(socket, e); throw e; } } + private void closeSocket(Socket socket, Exception e) { + try { + if (socket != null) + socket.close(); + } catch (IOException ioe) { + log.error("Failed to close socket after unsuccessful I/O stream setup", e); + } + } + // Visible for testing InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException { return new BufferedInputStream(NetUtils.getInputStream(socket, timeoutMillis), 1024 * 10); @@ -134,13 +142,7 @@ public class TTimeoutTransport { socket.connect(addr); return socket; } catch (IOException e) { - try { - if (socket != null) - socket.close(); - } catch (IOException ioe) { - log.error("Failed to close socket after unsuccessful open.", e); - } - + closeSocket(socket, e); throw e; } } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 6cb6ef6..6836863 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -44,13 +44,13 @@ import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.layered.TFramedTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +82,7 @@ public class ThriftUtil { } /** - * An instance of {@link org.apache.thrift.transport.TFramedTransport.Factory} + * An instance of {@link org.apache.thrift.transport.layered.TFramedTransport.Factory} * * @return The default Thrift TTransportFactory for RPC */ @@ -298,9 +298,9 @@ public class ThriftUtil { // Make sure a timeout is set try { transport = TTimeoutTransport.create(address, timeout); - } catch (IOException e) { + } catch (TTransportException e) { log.warn("Failed to open transport to {}", address); - throw new TTransportException(e); + throw e; } try { @@ -371,9 +371,9 @@ public class ThriftUtil { } else { try { transport = TTimeoutTransport.create(address, timeout); - } catch (IOException ex) { + } catch (TTransportException ex) { log.warn("Failed to open transport to {}", address); - throw new TTransportException(ex); + throw ex; } // Open the transport diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java index 5c3327b..93ae977 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java @@ -24,6 +24,7 @@ import java.security.PrivilegedAction; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; /** @@ -47,6 +48,12 @@ public class UGIAssumingTransportFactory extends TTransportFactory { @Override public TTransport getTransport(final TTransport trans) { - return ugi.doAs((PrivilegedAction<TTransport>) () -> wrapped.getTransport(trans)); + return ugi.doAs((PrivilegedAction<TTransport>) () -> { + try { + return wrapped.getTransport(trans); + } catch (TTransportException e) { + throw new RuntimeException(e); + } + }); } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java index e993695..7b7765e 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java @@ -29,6 +29,7 @@ import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransportException; /** * Serializes and deserializes Thrift messages to and from byte arrays. This class is not @@ -41,15 +42,15 @@ public class ThriftMessageUtil { private final TMemoryInputTransport inputTransport; private final TCompactProtocol inputProtocol; - public ThriftMessageUtil() { - this(64); - } - - public ThriftMessageUtil(int initialCapacity) { + public ThriftMessageUtil() throws IOException { // TODO does this make sense? better to push this down to the serialize method (accept the // transport as an argument)? - this.initialCapacity = initialCapacity; - this.inputTransport = new TMemoryInputTransport(); + this.initialCapacity = 64; + try { + this.inputTransport = new TMemoryInputTransport(); + } catch (TTransportException e) { + throw new IOException(e); + } this.inputProtocol = new TCompactProtocol(inputTransport); } @@ -64,14 +65,14 @@ public class ThriftMessageUtil { */ public ByteBuffer serialize(TBase<?,?> msg) throws IOException { requireNonNull(msg); - TMemoryBuffer transport = new TMemoryBuffer(initialCapacity); - TProtocol protocol = new TCompactProtocol(transport); try { + TMemoryBuffer transport = new TMemoryBuffer(initialCapacity); + TProtocol protocol = new TCompactProtocol(transport); msg.write(protocol); + return ByteBuffer.wrap(transport.getArray(), 0, transport.length()); } catch (TException e) { throw new IOException(e); } - return ByteBuffer.wrap(transport.getArray(), 0, transport.length()); } /** diff --git a/core/src/main/scripts/generate-thrift.sh b/core/src/main/scripts/generate-thrift.sh index b973481..8d13420 100755 --- a/core/src/main/scripts/generate-thrift.sh +++ b/core/src/main/scripts/generate-thrift.sh @@ -29,7 +29,7 @@ # INCLUDED_MODULES should be an array that includes other Maven modules with src/main/thrift directories # Use INCLUDED_MODULES=(-) in calling scripts that require no other modules # ======================================================================================================================== -[[ -z $REQUIRED_THRIFT_VERSION ]] && REQUIRED_THRIFT_VERSION='0.12.0' +[[ -z $REQUIRED_THRIFT_VERSION ]] && REQUIRED_THRIFT_VERSION='0.15.0' [[ -z $INCLUDED_MODULES ]] && INCLUDED_MODULES=(../server/tracer) [[ -z $BASE_OUTPUT_PACKAGE ]] && BASE_OUTPUT_PACKAGE='org.apache.accumulo.core' [[ -z $PACKAGES_TO_GENERATE ]] && PACKAGES_TO_GENERATE=(gc master manager tabletserver securityImpl clientImpl dataImpl replication trace compaction) diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java index 2c50ad1..e5af852 100644 --- a/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java +++ b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java @@ -31,6 +31,7 @@ import java.io.InputStream; import java.net.Socket; import java.net.SocketAddress; +import org.apache.thrift.transport.TTransportException; import org.junit.Test; /** @@ -104,8 +105,8 @@ public class TTimeoutTransportTest { try { timeoutTransport.createInternal(addr, timeout); - fail("Expected to catch IOException but got none"); - } catch (IOException e) { + fail("Expected to catch TTransportException but got none"); + } catch (TTransportException e) { // Expected } @@ -144,8 +145,8 @@ public class TTimeoutTransportTest { try { timeoutTransport.createInternal(addr, timeout); - fail("Expected to catch IOException but got none"); - } catch (IOException e) { + fail("Expected to catch TTransportException but got none"); + } catch (TTransportException e) { // Expected } diff --git a/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java b/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java index 727233d..dbd5d0d 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java @@ -33,7 +33,7 @@ public class ThriftMessageUtilTest { private ThriftMessageUtil util; @Before - public void setup() { + public void setup() throws IOException { msg = new TAuthenticationTokenIdentifier("principal"); util = new ThriftMessageUtil(); } diff --git a/pom.xml b/pom.xml index d602b26..cd71aef 100644 --- a/pom.xml +++ b/pom.xml @@ -154,7 +154,7 @@ <!-- 3.0.0-M5 causes RowHashIT.test and ShellServerIT.scansWithClassLoaderContext to fail --> <surefire.version>3.0.0-M4</surefire.version> <!-- Thrift version --> - <thrift.version>0.13.0</thrift.version> + <thrift.version>0.15.0</thrift.version> <unitTestMemSize>-Xmx1G</unitTestMemSize> <!-- ZooKeeper version --> <zookeeper.version>3.5.9</zookeeper.version> diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java index d31cdac..57f7824 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java @@ -28,6 +28,7 @@ import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingSocket; import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; /** * This class implements a custom non-blocking thrift server that stores the client address in @@ -83,7 +84,8 @@ public class CustomNonBlockingServer extends THsHaServer { @Override protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, - final SelectionKey selectionKey, final AbstractSelectThread selectThread) { + final SelectionKey selectionKey, final AbstractSelectThread selectThread) + throws TTransportException { if (processorFactory_.isAsyncProcessor()) { throw new IllegalStateException("This implementation does not support AsyncProcessors"); } @@ -99,7 +101,7 @@ public class CustomNonBlockingServer extends THsHaServer { private class CustomFrameBuffer extends FrameBuffer { public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, - AbstractSelectThread selectThread) { + AbstractSelectThread selectThread) throws TTransportException { super(trans, selectionKey, selectThread); } diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceFormatter.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceFormatter.java index 992b95f..cc8d44f 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceFormatter.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceFormatter.java @@ -57,15 +57,15 @@ public class TraceFormatter implements Formatter { private FormatterConfig config; public static RemoteSpan getRemoteSpan(Entry<Key,Value> entry) { - TMemoryInputTransport transport = new TMemoryInputTransport(entry.getValue().get()); - TCompactProtocol protocol = new TCompactProtocol(transport); - RemoteSpan span = new RemoteSpan(); try { + TMemoryInputTransport transport = new TMemoryInputTransport(entry.getValue().get()); + TCompactProtocol protocol = new TCompactProtocol(transport); + RemoteSpan span = new RemoteSpan(); span.read(protocol); + return span; } catch (TException ex) { throw new RuntimeException(ex); } - return span; } @Override diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 439de34..251447e 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@ -63,6 +63,7 @@ import org.apache.accumulo.tracer.thrift.SpanReceiver.Iface; import org.apache.accumulo.tracer.thrift.SpanReceiver.Processor; import org.apache.hadoop.io.Text; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TServer; @@ -124,6 +125,21 @@ public class TraceServer implements Watcher, AutoCloseable { out.write(buf, off, len); } + @Override + public TConfiguration getConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateKnownMessageSize(long size) { + throw new UnsupportedOperationException(); + } + + @Override + public void checkReadBytesAvailable(long numBytes) { + throw new UnsupportedOperationException(); + } + public byte[] get() { return out.get(); } diff --git a/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java b/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java index 4a79160..9476100 100644 --- a/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java +++ b/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.thrift.TConfiguration; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; @@ -141,13 +142,13 @@ public class Mocket { public void listen() {} @Override - public void close() { - acceptImpl().close(); + public TTransport accept() { + return servTrans; } @Override - protected TTransport acceptImpl() { - return servTrans; + public void close() { + servTrans.close(); } @Override @@ -173,6 +174,21 @@ public class Mocket { } @Override + public TConfiguration getConfiguration() { + return null; + } + + @Override + public void updateKnownMessageSize(long size) { + + } + + @Override + public void checkReadBytesAvailable(long numBytes) { + + } + + @Override public int read(byte[] buf, int off, int len) { return input.read(buf, off, len); }