This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 69185dce1e71 [SPARK-53258][CORE][SQL] Use `JavaUtils`'s `check(Argument|State)` 69185dce1e71 is described below commit 69185dce1e718f0fc5b4d012550c42c5f17dace7 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Tue Aug 12 06:42:37 2025 -0700 [SPARK-53258][CORE][SQL] Use `JavaUtils`'s `check(Argument|State)` ### What changes were proposed in this pull request? This PR aims to use `checkArgument` and `checkState` of `JavaUtils` class instead of `com.google.common.base.Preconditions` ### Why are the changes needed? To use Apache Spark's existing code instead of 3rd party library. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51988 from dongjoon-hyun/SPARK-53258. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/util/kvstore/ArrayWrappers.java | 7 ++++--- .../apache/spark/util/kvstore/InMemoryStore.java | 5 ++--- .../org/apache/spark/util/kvstore/KVStoreView.java | 5 ++--- .../org/apache/spark/util/kvstore/KVTypeInfo.java | 21 ++++++++++----------- .../java/org/apache/spark/util/kvstore/LevelDB.java | 12 ++++++------ .../apache/spark/util/kvstore/LevelDBIterator.java | 4 ++-- .../apache/spark/util/kvstore/LevelDBTypeInfo.java | 11 ++++++----- .../java/org/apache/spark/util/kvstore/RocksDB.java | 12 ++++++------ .../apache/spark/util/kvstore/RocksDBIterator.java | 5 +++-- .../apache/spark/util/kvstore/RocksDBTypeInfo.java | 11 ++++++----- .../spark/network/client/TransportClient.java | 4 ++-- .../org/apache/spark/network/crypto/AuthEngine.java | 12 ++++++------ .../apache/spark/network/crypto/AuthRpcHandler.java | 4 ++-- .../spark/network/crypto/CtrTransportCipher.java | 6 +++--- .../spark/network/crypto/GcmTransportCipher.java | 13 ++++++++----- .../protocol/EncryptedMessageWithHeader.java | 4 ++-- .../spark/network/protocol/MessageWithHeader.java | 6 +++--- .../apache/spark/network/sasl/SaslEncryption.java | 6 +++--- .../network/server/BlockPushNonFatalFailure.java | 4 ++-- .../network/server/OneForOneStreamManager.java | 4 ++-- .../spark/network/util/LimitedInputStream.java | 4 +--- .../apache/spark/network/util/TransportConf.java | 3 +-- .../spark/network/util/TransportFrameDecoder.java | 7 +++---- .../org/apache/spark/network/TestManagedBuffer.java | 4 ++-- .../spark/network/shuffle/OneForOneBlockPusher.java | 5 ++--- .../network/shuffle/RemoteBlockPushResolver.java | 11 +++++------ .../network/shuffle/RetryingBlockTransferor.java | 6 +++--- .../org/apache/spark/io/ReadAheadInputStream.java | 4 ++-- dev/checkstyle.xml | 1 + .../datasources/parquet/ParquetColumnVector.java | 6 +++--- .../parquet/VectorizedDeltaBinaryPackedReader.java | 6 +++--- .../parquet/VectorizedRleValuesReader.java | 4 ++-- 32 files changed, 108 insertions(+), 109 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java index 5265881e990e..a9d6784805f6 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java @@ -19,7 +19,7 @@ package org.apache.spark.util.kvstore; import java.util.Arrays; -import com.google.common.base.Preconditions; +import org.apache.spark.network.util.JavaUtils; /** * A factory for array wrappers so that arrays can be used as keys in a map, sorted or not. @@ -38,7 +38,7 @@ class ArrayWrappers { @SuppressWarnings("unchecked") public static Comparable<Object> forArray(Object a) { - Preconditions.checkArgument(a.getClass().isArray()); + JavaUtils.checkArgument(a.getClass().isArray(), "Input should be an array"); Comparable<?> ret; if (a instanceof int[] ia) { ret = new ComparableIntArray(ia); @@ -47,7 +47,8 @@ class ArrayWrappers { } else if (a instanceof byte[] ba) { ret = new ComparableByteArray(ba); } else { - Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive()); + JavaUtils.checkArgument(!a.getClass().getComponentType().isPrimitive(), + "Array element is primitive"); ret = new ComparableObjectArray((Object[]) a); } return (Comparable<Object>) ret; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index 85008d7b3911..9a45a10532de 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -32,9 +32,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.google.common.base.Preconditions; - import org.apache.spark.annotation.Private; +import org.apache.spark.network.util.JavaUtils; /** * Implementation of KVStore that keeps data deserialized in memory. This store does not index @@ -419,7 +418,7 @@ public class InMemoryStore implements KVStore { // Go through all the values in `data` and collect all the objects has certain parent // value. This can be slow when there is a large number of entries in `data`. KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); - Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); + JavaUtils.checkArgument(parentGetter != null, "Parent filter for non-child index."); return data.values().stream() .filter(e -> compare(e, parentGetter, parentKey) == 0) .collect(Collectors.toList()); diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java index 130f7ab4b8ba..cfdcc1a2c878 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java @@ -19,9 +19,8 @@ package org.apache.spark.util.kvstore; import java.util.Objects; -import com.google.common.base.Preconditions; - import org.apache.spark.annotation.Private; +import org.apache.spark.network.util.JavaUtils; /** * A configurable view that allows iterating over values in a {@link KVStore}. @@ -98,7 +97,7 @@ public abstract class KVStoreView<T> implements Iterable<T> { * Stops iteration after a number of elements has been retrieved. */ public KVStoreView<T> max(long max) { - Preconditions.checkArgument(max > 0L, "max must be positive."); + JavaUtils.checkArgument(max > 0L, "max must be positive."); this.max = max; return this; } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index bf7c256fc94f..86f32abc9075 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -23,9 +23,8 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; -import com.google.common.base.Preconditions; - import org.apache.spark.annotation.Private; +import org.apache.spark.network.util.JavaUtils; /** * Wrapper around types managed in a KVStore, providing easy access to their indexed fields. @@ -56,7 +55,7 @@ public class KVTypeInfo { KVIndex idx = m.getAnnotation(KVIndex.class); if (idx != null) { checkIndex(idx, indices); - Preconditions.checkArgument(m.getParameterCount() == 0, + JavaUtils.checkArgument(m.getParameterCount() == 0, "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); m.setAccessible(true); indices.put(idx.value(), idx); @@ -64,29 +63,29 @@ public class KVTypeInfo { } } - Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME), + JavaUtils.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME), "No natural index defined for type %s.", type.getName()); for (KVIndex idx : indices.values()) { if (!idx.parent().isEmpty()) { KVIndex parent = indices.get(idx.parent()); - Preconditions.checkArgument(parent != null, + JavaUtils.checkArgument(parent != null, "Cannot find parent %s of index %s.", idx.parent(), idx.value()); - Preconditions.checkArgument(parent.parent().isEmpty(), + JavaUtils.checkArgument(parent.parent().isEmpty(), "Parent index %s of index %s cannot be itself a child index.", idx.parent(), idx.value()); } } } private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) { - Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(), + JavaUtils.checkArgument(idx.value() != null && !idx.value().isEmpty(), "No name provided for index in type %s.", type.getName()); - Preconditions.checkArgument( + JavaUtils.checkArgument( !idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME), "Index name %s (in type %s) is not allowed.", idx.value(), type.getName()); - Preconditions.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()), + JavaUtils.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()), "Index %s cannot be parent of itself.", idx.value()); - Preconditions.checkArgument(!indices.containsKey(idx.value()), + JavaUtils.checkArgument(!indices.containsKey(idx.value()), "Duplicate index %s for type %s.", idx.value(), type.getName()); } @@ -104,7 +103,7 @@ public class KVTypeInfo { Accessor getAccessor(String indexName) { Accessor a = accessors.get(indexName); - Preconditions.checkArgument(a != null, "No index %s.", indexName); + JavaUtils.checkArgument(a != null, "No index %s.", indexName); return a; } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 74843806b3ea..a15d08ee018d 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -30,7 +30,6 @@ import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; @@ -39,6 +38,7 @@ import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteBatch; import org.apache.spark.annotation.Private; +import org.apache.spark.network.util.JavaUtils; /** * Implementation of KVStore that uses LevelDB as the underlying data store. @@ -137,20 +137,20 @@ public class LevelDB implements KVStore { } private void put(byte[] key, Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); + JavaUtils.checkArgument(value != null, "Null values are not allowed."); db().put(key, serializer.serialize(value)); } @Override public <T> T read(Class<T> klass, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); + JavaUtils.checkArgument(naturalKey != null, "Null keys are not allowed."); byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey); return get(key, klass); } @Override public void write(Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); + JavaUtils.checkArgument(value != null, "Null values are not allowed."); LevelDBTypeInfo ti = getTypeInfo(value.getClass()); try (WriteBatch batch = db().createWriteBatch()) { @@ -163,7 +163,7 @@ public class LevelDB implements KVStore { } public void writeAll(List<?> values) throws Exception { - Preconditions.checkArgument(values != null && !values.isEmpty(), + JavaUtils.checkArgument(values != null && !values.isEmpty(), "Non-empty values required."); // Group by class, in case there are values from different classes in the values @@ -225,7 +225,7 @@ public class LevelDB implements KVStore { @Override public void delete(Class<?> type, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); + JavaUtils.checkArgument(naturalKey != null, "Null keys are not allowed."); try (WriteBatch batch = db().createWriteBatch()) { LevelDBTypeInfo ti = getTypeInfo(type); byte[] key = ti.naturalIndex().start(null, naturalKey); diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index 29ed37ffa44e..352d65270412 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -26,12 +26,12 @@ import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.iq80.leveldb.DBIterator; import org.apache.spark.internal.SparkLogger; import org.apache.spark.internal.SparkLoggerFactory; +import org.apache.spark.network.util.JavaUtils; class LevelDBIterator<T> implements KVStoreIterator<T> { @@ -66,7 +66,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> { this.resourceCleaner = new ResourceCleaner(it, db); this.cleanable = CLEANER.register(this, this.resourceCleaner); - Preconditions.checkArgument(!index.isChild() || params.parent != null, + JavaUtils.checkArgument(!index.isChild() || params.parent != null, "Cannot iterate over child index %s without parent value.", params.index); byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java index c03487f67d0c..341e34606a9b 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java @@ -25,9 +25,10 @@ import java.util.Map; import java.util.Objects; import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.common.base.Preconditions; import org.iq80.leveldb.WriteBatch; +import org.apache.spark.network.util.JavaUtils; + /** * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected * via reflection, to make it cheaper to access it multiple times. @@ -164,7 +165,7 @@ class LevelDBTypeInfo { Index index(String name) { Index i = indices.get(name); - Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name, + JavaUtils.checkArgument(i != null, "Index %s does not exist for type %s.", name, type.getName()); return i; } @@ -253,7 +254,7 @@ class LevelDBTypeInfo { * same parent index exist. */ byte[] childPrefix(Object value) { - Preconditions.checkState(parent == null, "Not a parent index."); + JavaUtils.checkState(parent == null, "Not a parent index."); return buildKey(name, toParentKey(value)); } @@ -268,9 +269,9 @@ class LevelDBTypeInfo { private void checkParent(byte[] prefix) { if (prefix != null) { - Preconditions.checkState(parent != null, "Parent prefix provided for parent index."); + JavaUtils.checkState(parent != null, "Parent prefix provided for parent index."); } else { - Preconditions.checkState(parent == null, "Parent prefix missing for child index."); + JavaUtils.checkState(parent == null, "Parent prefix missing for child index."); } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java index 8c9ac5a23200..8fcbc9cb3d1e 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java @@ -31,11 +31,11 @@ import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.rocksdb.*; import org.apache.spark.annotation.Private; +import org.apache.spark.network.util.JavaUtils; /** * Implementation of KVStore that uses RocksDB as the underlying data store. @@ -170,20 +170,20 @@ public class RocksDB implements KVStore { } private void put(byte[] key, Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); + JavaUtils.checkArgument(value != null, "Null values are not allowed."); db().put(key, serializer.serialize(value)); } @Override public <T> T read(Class<T> klass, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); + JavaUtils.checkArgument(naturalKey != null, "Null keys are not allowed."); byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey); return get(key, klass); } @Override public void write(Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); + JavaUtils.checkArgument(value != null, "Null values are not allowed."); RocksDBTypeInfo ti = getTypeInfo(value.getClass()); byte[] data = serializer.serialize(value); synchronized (ti) { @@ -195,7 +195,7 @@ public class RocksDB implements KVStore { } public void writeAll(List<?> values) throws Exception { - Preconditions.checkArgument(values != null && !values.isEmpty(), + JavaUtils.checkArgument(values != null && !values.isEmpty(), "Non-empty values required."); // Group by class, in case there are values from different classes in the values @@ -257,7 +257,7 @@ public class RocksDB implements KVStore { @Override public void delete(Class<?> type, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); + JavaUtils.checkArgument(naturalKey != null, "Null keys are not allowed."); try (WriteBatch writeBatch = new WriteBatch()) { RocksDBTypeInfo ti = getTypeInfo(type); byte[] key = ti.naturalIndex().start(null, naturalKey); diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java index e350ddc2d445..9ade85004ee0 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java @@ -23,10 +23,11 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.rocksdb.RocksIterator; +import org.apache.spark.network.util.JavaUtils; + class RocksDBIterator<T> implements KVStoreIterator<T> { private static final Cleaner CLEANER = Cleaner.create(); @@ -58,7 +59,7 @@ class RocksDBIterator<T> implements KVStoreIterator<T> { this.resourceCleaner = new RocksDBIterator.ResourceCleaner(it, db); this.cleanable = CLEANER.register(this, resourceCleaner); - Preconditions.checkArgument(!index.isChild() || params.parent != null, + JavaUtils.checkArgument(!index.isChild() || params.parent != null, "Cannot iterate over child index %s without parent value.", params.index); byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBTypeInfo.java index 3c208356e92a..3b325a56ff2c 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBTypeInfo.java @@ -24,10 +24,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import com.google.common.base.Preconditions; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; +import org.apache.spark.network.util.JavaUtils; + import static java.nio.charset.StandardCharsets.UTF_8; /** @@ -166,7 +167,7 @@ class RocksDBTypeInfo { Index index(String name) { Index i = indices.get(name); - Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name, + JavaUtils.checkArgument(i != null, "Index %s does not exist for type %s.", name, type.getName()); return i; } @@ -255,7 +256,7 @@ class RocksDBTypeInfo { * same parent index exist. */ byte[] childPrefix(Object value) { - Preconditions.checkState(parent == null, "Not a parent index."); + JavaUtils.checkState(parent == null, "Not a parent index."); return buildKey(name, toParentKey(value)); } @@ -270,9 +271,9 @@ class RocksDBTypeInfo { private void checkParent(byte[] prefix) { if (prefix != null) { - Preconditions.checkState(parent != null, "Parent prefix provided for parent index."); + JavaUtils.checkState(parent != null, "Parent prefix provided for parent index."); } else { - Preconditions.checkState(parent == null, "Parent prefix missing for child index."); + JavaUtils.checkState(parent == null, "Parent prefix missing for child index."); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 2670ff4732af..8c92fbfd9825 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; @@ -42,6 +41,7 @@ import org.apache.spark.internal.MDC; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.protocol.*; +import org.apache.spark.network.util.JavaUtils; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; @@ -112,7 +112,7 @@ public class TransportClient implements Closeable { * Trying to set a different client ID after it's been set will result in an exception. */ public void setClientId(String id) { - Preconditions.checkState(clientId == null, "Client ID has already been set."); + JavaUtils.checkState(clientId == null, "Client ID has already been set."); this.clientId = id; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java index d35c7e920eef..f02fbc3aa26c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java @@ -25,7 +25,6 @@ import java.util.Objects; import java.util.Properties; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.primitives.Bytes; import com.google.crypto.tink.subtle.AesGcmJce; import com.google.crypto.tink.subtle.Hkdf; @@ -34,6 +33,8 @@ import com.google.crypto.tink.subtle.X25519; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; /** @@ -125,7 +126,7 @@ class AuthEngine implements Closeable { private byte[] decryptEphemeralPublicKey( AuthMessage encryptedPublicKey, byte[] transcript) throws GeneralSecurityException { - Preconditions.checkArgument(appId.equals(encryptedPublicKey.appId())); + JavaUtils.checkArgument(appId.equals(encryptedPublicKey.appId()), "appID is different."); // Mix in the app ID, salt, and transcript into HKDF and use it as AES-GCM AAD byte[] aadState = Bytes.concat(appId.getBytes(UTF_8), encryptedPublicKey.salt(), transcript); // Use HKDF to derive an AES_GCM key from the pre-shared key, non-secret salt, and AAD state @@ -161,7 +162,7 @@ class AuthEngine implements Closeable { * @return An encrypted server ephemeral public key to be sent to the client. */ AuthMessage response(AuthMessage encryptedClientPublicKey) throws GeneralSecurityException { - Preconditions.checkArgument(appId.equals(encryptedClientPublicKey.appId())); + JavaUtils.checkArgument(appId.equals(encryptedClientPublicKey.appId()), "appId is different."); // Compute a shared secret given the client public key and the server private key byte[] clientPublicKey = decryptEphemeralPublicKey(encryptedClientPublicKey, EMPTY_TRANSCRIPT); @@ -189,8 +190,7 @@ class AuthEngine implements Closeable { */ void deriveSessionCipher(AuthMessage encryptedClientPublicKey, AuthMessage encryptedServerPublicKey) throws GeneralSecurityException { - Preconditions.checkArgument(appId.equals(encryptedClientPublicKey.appId())); - Preconditions.checkArgument(appId.equals(encryptedServerPublicKey.appId())); + JavaUtils.checkArgument(appId.equals(encryptedClientPublicKey.appId()), "appId is different."); // Compute a shared secret given the server public key and the client private key, // mixing in the protocol transcript. byte[] serverPublicKey = decryptEphemeralPublicKey( @@ -251,7 +251,7 @@ class AuthEngine implements Closeable { } TransportCipher sessionCipher() { - Preconditions.checkState(sessionCipher != null); + JavaUtils.checkState(sessionCipher != null, "sessionCipher is null."); return sessionCipher; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 2a6e36ea45e3..8ea65498bc88 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -20,7 +20,6 @@ package org.apache.spark.network.crypto; import java.nio.ByteBuffer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -36,6 +35,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.sasl.SaslRpcHandler; import org.apache.spark.network.server.AbstractAuthRpcHandler; import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; /** @@ -111,7 +111,7 @@ class AuthRpcHandler extends AbstractAuthRpcHandler { AuthEngine engine = null; try { String secret = secretKeyHolder.getSecretKey(challenge.appId()); - Preconditions.checkState(secret != null, + JavaUtils.checkState(secret != null, "Trying to authenticate non-registered app %s.", challenge.appId()); LOG.debug("Authenticating challenge for app {}.", challenge.appId()); engine = new AuthEngine(challenge.appId(), secret, conf); diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/CtrTransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/CtrTransportCipher.java index 85b893751b39..de7d1ae5753d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/CtrTransportCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/CtrTransportCipher.java @@ -27,7 +27,6 @@ import javax.crypto.spec.SecretKeySpec; import javax.crypto.spec.IvParameterSpec; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; @@ -37,6 +36,7 @@ import org.apache.commons.crypto.stream.CryptoOutputStream; import org.apache.spark.network.util.AbstractFileRegion; import org.apache.spark.network.util.ByteArrayReadableChannel; import org.apache.spark.network.util.ByteArrayWritableChannel; +import org.apache.spark.network.util.JavaUtils; /** * Cipher for encryption and decryption. @@ -239,7 +239,7 @@ public class CtrTransportCipher implements TransportCipher { Object msg, ByteArrayWritableChannel byteEncChannel, ByteArrayWritableChannel byteRawChannel) { - Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion, + JavaUtils.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion, "Unrecognized message type: %s", msg.getClass().getName()); this.handler = handler; this.isByteBuf = msg instanceof ByteBuf; @@ -304,7 +304,7 @@ public class CtrTransportCipher implements TransportCipher { @Override public long transferTo(WritableByteChannel target, long position) throws IOException { - Preconditions.checkArgument(position == transferred(), "Invalid position."); + JavaUtils.checkArgument(position == transferred(), "Invalid position."); if (transferred == count) { return 0; diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/GcmTransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/GcmTransportCipher.java index c3540838bef0..e1cf22a612ea 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/GcmTransportCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/GcmTransportCipher.java @@ -18,15 +18,16 @@ package org.apache.spark.network.crypto; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import com.google.crypto.tink.subtle.*; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.util.ReferenceCounted; + import org.apache.spark.network.util.AbstractFileRegion; import org.apache.spark.network.util.ByteBufferWriteableChannel; +import org.apache.spark.network.util.JavaUtils; import javax.crypto.spec.SecretKeySpec; import java.io.IOException; @@ -118,7 +119,7 @@ public class GcmTransportCipher implements TransportCipher { Object plaintextMessage, ByteBuffer plaintextBuffer, ByteBuffer ciphertextBuffer) throws GeneralSecurityException { - Preconditions.checkArgument( + JavaUtils.checkArgument( plaintextMessage instanceof ByteBuf || plaintextMessage instanceof FileRegion, "Unrecognized message type: %s", plaintextMessage.getClass().getName()); this.plaintextMessage = plaintextMessage; @@ -221,10 +222,12 @@ public class GcmTransportCipher implements TransportCipher { int readLimit = (int) Math.min(readableBytes, plaintextBuffer.remaining()); if (plaintextMessage instanceof ByteBuf byteBuf) { - Preconditions.checkState(0 == plaintextBuffer.position()); + JavaUtils.checkState(0 == plaintextBuffer.position(), + "plaintextBuffer.position is not 0"); plaintextBuffer.limit(readLimit); byteBuf.readBytes(plaintextBuffer); - Preconditions.checkState(readLimit == plaintextBuffer.position()); + JavaUtils.checkState(readLimit == plaintextBuffer.position(), + "plaintextBuffer.position should be equal to readLimit."); } else if (plaintextMessage instanceof FileRegion fileRegion) { ByteBufferWriteableChannel plaintextChannel = new ByteBufferWriteableChannel(plaintextBuffer); @@ -347,7 +350,7 @@ public class GcmTransportCipher implements TransportCipher { @Override public void channelRead(ChannelHandlerContext ctx, Object ciphertextMessage) throws GeneralSecurityException { - Preconditions.checkArgument(ciphertextMessage instanceof ByteBuf, + JavaUtils.checkArgument(ciphertextMessage instanceof ByteBuf, "Unrecognized message type: %s", ciphertextMessage.getClass().getName()); ByteBuf ciphertextNettyBuf = (ByteBuf) ciphertextMessage; diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java index 321ac13881c2..84917eca1719 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/EncryptedMessageWithHeader.java @@ -21,7 +21,6 @@ import java.io.EOFException; import java.io.InputStream; import javax.annotation.Nullable; -import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; @@ -29,6 +28,7 @@ import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedInput; import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.JavaUtils; /** * A wrapper message that holds two separate pieces (a header and a body). @@ -60,7 +60,7 @@ public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> { public EncryptedMessageWithHeader( @Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) { - Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream, + JavaUtils.checkArgument(body instanceof InputStream || body instanceof ChunkedStream, "Body must be an InputStream or a ChunkedStream."); this.managedBuffer = managedBuffer; this.header = header; diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index e8eb83e7577b..993ce2381caa 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -22,13 +22,13 @@ import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import javax.annotation.Nullable; -import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.channel.FileRegion; import io.netty.util.ReferenceCountUtil; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.util.AbstractFileRegion; +import org.apache.spark.network.util.JavaUtils; /** * A wrapper message that holds two separate pieces (a header and a body). @@ -72,7 +72,7 @@ public class MessageWithHeader extends AbstractFileRegion { ByteBuf header, Object body, long bodyLength) { - Preconditions.checkArgument(body instanceof ByteBuf || body instanceof FileRegion, + JavaUtils.checkArgument(body instanceof ByteBuf || body instanceof FileRegion, "Body must be a ByteBuf or a FileRegion."); this.managedBuffer = managedBuffer; this.header = header; @@ -105,7 +105,7 @@ public class MessageWithHeader extends AbstractFileRegion { */ @Override public long transferTo(final WritableByteChannel target, final long position) throws IOException { - Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position."); + JavaUtils.checkArgument(position == totalBytesTransferred, "Invalid position."); // Bytes written for header in this call. long writtenHeader = 0; if (header.readableBytes() > 0) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java index e1275689ae6a..1cdb951d2d04 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java @@ -23,7 +23,6 @@ import java.nio.channels.WritableByteChannel; import java.util.List; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -35,6 +34,7 @@ import io.netty.handler.codec.MessageToMessageDecoder; import org.apache.spark.network.util.AbstractFileRegion; import org.apache.spark.network.util.ByteArrayWritableChannel; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.NettyUtils; /** @@ -152,7 +152,7 @@ class SaslEncryption { private long transferred; EncryptedMessage(SaslEncryptionBackend backend, Object msg, int maxOutboundBlockSize) { - Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion, + JavaUtils.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion, "Unrecognized message type: %s", msg.getClass().getName()); this.backend = backend; this.isByteBuf = msg instanceof ByteBuf; @@ -241,7 +241,7 @@ class SaslEncryption { public long transferTo(final WritableByteChannel target, final long position) throws IOException { - Preconditions.checkArgument(position == transferred(), "Invalid position."); + JavaUtils.checkArgument(position == transferred(), "Invalid position."); long reportedWritten = 0L; long actuallyWritten = 0L; diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java index 576cddc24a19..a0e930526538 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java @@ -20,7 +20,7 @@ package org.apache.spark.network.server; import java.nio.ByteBuffer; import java.util.Objects; -import com.google.common.base.Preconditions; +import org.apache.spark.network.util.JavaUtils; /** * A special RuntimeException thrown when shuffle service experiences a non-fatal failure @@ -170,7 +170,7 @@ public class BlockPushNonFatalFailure extends RuntimeException { } public static String getErrorMsg(String blockId, ReturnCode errorCode) { - Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS); + JavaUtils.checkArgument(errorCode != ReturnCode.SUCCESS, "errorCode should not be SUCCESS."); return "Block " + blockId + errorCode.errorMsgSuffix; } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index ee03c115345c..cb53d565e7e8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -25,13 +25,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import io.netty.channel.Channel; import org.apache.spark.internal.SparkLogger; import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.util.Pair; /** @@ -167,7 +167,7 @@ public class OneForOneStreamManager extends StreamManager { public void checkAuthorization(TransportClient client, long streamId) { if (client.getClientId() != null) { StreamState state = streams.get(streamId); - Preconditions.checkArgument(state != null, "Unknown stream ID."); + JavaUtils.checkArgument(state != null, "Unknown stream ID."); if (!client.getClientId().equals(state.appId)) { throw new SecurityException(String.format( "Client %s not authorized to read stream %d (app %s).", diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java index 9bac3a2e6f77..79cf0eb7c615 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.Objects; -import com.google.common.base.Preconditions; - /** * Wraps a {@link InputStream}, limiting the number of bytes which can be read. * @@ -53,7 +51,7 @@ public final class LimitedInputStream extends FilterInputStream { public LimitedInputStream(InputStream in, long limit, boolean closeWrappedStream) { super(Objects.requireNonNull(in)); this.closeWrappedStream = closeWrappedStream; - Preconditions.checkArgument(limit >= 0, "limit must be non-negative"); + JavaUtils.checkArgument(limit >= 0, "limit must be non-negative"); left = limit; } @Override public int available() throws IOException { diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index b5e9618bd269..003e72edf29e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -21,7 +21,6 @@ import java.io.File; import java.util.Locale; import java.util.Properties; import java.util.concurrent.TimeUnit; -import com.google.common.base.Preconditions; import io.netty.util.NettyRuntime; /** @@ -503,7 +502,7 @@ public class TransportConf { if (!this.getModuleName().equalsIgnoreCase("shuffle")) { return 0; } - Preconditions.checkArgument(separateFinalizeShuffleMerge(), + JavaUtils.checkArgument(separateFinalizeShuffleMerge(), "Please set spark.shuffle.server.finalizeShuffleMergeThreadsPercent to a positive value"); int finalizeShuffleMergeThreadsPercent = Integer.parseInt(conf.get("spark.shuffle.server.finalizeShuffleMergeThreadsPercent")); diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index cef0e415aa40..6b490068507a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -20,7 +20,6 @@ package org.apache.spark.network.util; import java.util.LinkedList; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; @@ -145,9 +144,9 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { } if (frameBuf == null) { - Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, + JavaUtils.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize); - Preconditions.checkArgument(frameSize > 0, + JavaUtils.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize); frameRemainingBytes = (int) frameSize; @@ -252,7 +251,7 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { } public void setInterceptor(Interceptor interceptor) { - Preconditions.checkState(this.interceptor == null, "Already have an interceptor."); + JavaUtils.checkState(this.interceptor == null, "Already have an interceptor."); this.interceptor = interceptor; } diff --git a/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java b/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java index d1e93e3cb584..828d995ba444 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import com.google.common.base.Preconditions; import io.netty.buffer.Unpooled; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; +import org.apache.spark.network.util.JavaUtils; /** * A ManagedBuffer implementation that contains 0, 1, 2, 3, ..., (len-1). @@ -38,7 +38,7 @@ public class TestManagedBuffer extends ManagedBuffer { private NettyManagedBuffer underlying; public TestManagedBuffer(int len) { - Preconditions.checkArgument(len <= Byte.MAX_VALUE); + JavaUtils.checkArgument(len <= Byte.MAX_VALUE, "length exceeds limit " + Byte.MAX_VALUE); this.len = len; byte[] byteArray = new byte[len]; for (int i = 0; i < len; i ++) { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java index d90ca1a88a26..05158a6600d0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java @@ -21,8 +21,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; -import com.google.common.base.Preconditions; - import org.apache.spark.internal.SparkLogger; import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.network.buffer.ManagedBuffer; @@ -34,6 +32,7 @@ import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode; import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode; import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.util.JavaUtils; /** * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote shuffle service to @@ -90,7 +89,7 @@ public class OneForOneBlockPusher { ReturnCode returnCode = BlockPushNonFatalFailure.getReturnCode(pushResponse.returnCode); if (returnCode != ReturnCode.SUCCESS) { String blockId = pushResponse.failureBlockId; - Preconditions.checkArgument(!blockId.isEmpty()); + JavaUtils.checkArgument(!blockId.isEmpty(), "BlockID should not be empty"); checkAndFailRemainingBlocks(index, new BlockPushNonFatalFailure(returnCode, BlockPushNonFatalFailure.getErrorMsg(blockId, returnCode))); } else { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 7968e03f580c..a48208bad5b8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -55,7 +55,6 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -211,7 +210,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) { // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(appId); - Preconditions.checkArgument(appShuffleInfo != null, + JavaUtils.checkArgument(appShuffleInfo != null, "application " + appId + " is not registered or NM was restarted."); return appShuffleInfo; } @@ -1267,12 +1266,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { String streamId, AppShufflePartitionInfo partitionInfo, int mapIndex) { - Preconditions.checkArgument(mergeManager != null); + JavaUtils.checkArgument(mergeManager != null, "mergeManager is null"); this.mergeManager = mergeManager; - Preconditions.checkArgument(appShuffleInfo != null); + JavaUtils.checkArgument(appShuffleInfo != null, "appShuffleInfo is null"); this.appShuffleInfo = appShuffleInfo; this.streamId = streamId; - Preconditions.checkArgument(partitionInfo != null); + JavaUtils.checkArgument(partitionInfo != null, "partitionInfo is null"); this.partitionInfo = partitionInfo; this.mapIndex = mapIndex; abortIfNecessary(); @@ -1719,7 +1718,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @JsonProperty("attemptId") int attemptId, @JsonProperty("shuffleId") int shuffleId, @JsonProperty("shuffleMergeId") int shuffleMergeId) { - Preconditions.checkArgument(appId != null, "app id is null"); + JavaUtils.checkArgument(appId != null, "app id is null"); this.appId = appId; this.attemptId = attemptId; this.shuffleId = shuffleId; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java index 7f1629560071..1dae2d54120c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java @@ -25,7 +25,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.internal.SparkLogger; @@ -34,6 +33,7 @@ import org.apache.spark.internal.LogKeys; import org.apache.spark.internal.MDC; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.sasl.SaslTimeoutException; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; @@ -246,7 +246,7 @@ public class RetryingBlockTransferor { // If this is a non SASL request failure, reduce earlier SASL failures from retryCount // since some subsequent SASL attempt was successful if (!isSaslTimeout && saslRetryCount > 0) { - Preconditions.checkState(retryCount >= saslRetryCount, + JavaUtils.checkState(retryCount >= saslRetryCount, "retryCount must be greater than or equal to saslRetryCount"); retryCount -= saslRetryCount; saslRetryCount = 0; @@ -281,7 +281,7 @@ public class RetryingBlockTransferor { // If there were SASL failures earlier, remove them from retryCount, as there was // a SASL success (and some other request post bootstrap was also successful). if (saslRetryCount > 0) { - Preconditions.checkState(retryCount >= saslRetryCount, + JavaUtils.checkState(retryCount >= saslRetryCount, "retryCount must be greater than or equal to saslRetryCount"); retryCount -= saslRetryCount; saslRetryCount = 0; diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 96e50955ae2c..61ecdd60b415 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -28,13 +28,13 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.apache.spark.internal.SparkLogger; import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.internal.LogKeys; import org.apache.spark.internal.MDC; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.util.ThreadUtils; /** @@ -105,7 +105,7 @@ public class ReadAheadInputStream extends InputStream { */ public ReadAheadInputStream( InputStream inputStream, int bufferSizeInBytes) { - Preconditions.checkArgument(bufferSizeInBytes > 0, + JavaUtils.checkArgument(bufferSizeInBytes > 0, "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml index 1eb60764d3f0..8bad02a93394 100644 --- a/dev/checkstyle.xml +++ b/dev/checkstyle.xml @@ -193,6 +193,7 @@ <property name="illegalClasses" value="org.apache.hadoop.io.IOUtils" /> <property name="illegalClasses" value="org.apache.parquet.Preconditions" /> <property name="illegalClasses" value="com.google.common.base.Objects" /> + <property name="illegalClasses" value="com.google.common.base.Preconditions" /> <property name="illegalClasses" value="com.google.common.base.Strings" /> <property name="illegalClasses" value="com.google.common.collect.ArrayListMultimap" /> <property name="illegalClasses" value="com.google.common.collect.Multimap" /> diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java index 7fb8be7caf28..3331c8dfd8f5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -21,9 +21,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import com.google.common.base.Preconditions; - import org.apache.spark.memory.MemoryMode; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -131,7 +130,8 @@ final class ParquetColumnVector { definitionLevels = allocateLevelsVector(capacity, memoryMode); } } else { - Preconditions.checkArgument(column.children().size() == vector.getNumChildren()); + JavaUtils.checkArgument(column.children().size() == vector.getNumChildren(), + "The number of column children is different from the number of vector children"); boolean allChildrenAreMissing = true; for (int i = 0; i < column.children().size(); i++) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java index 8a993a032638..7b56e1ebf239 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -26,8 +26,8 @@ import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePackerForLong; import org.apache.parquet.column.values.bitpacking.Packer; import org.apache.parquet.io.ParquetDecodingException; -import com.google.common.base.Preconditions; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -81,14 +81,14 @@ public class VectorizedDeltaBinaryPackedReader extends VectorizedReaderBase { @Override public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { - Preconditions.checkArgument(valueCount >= 1, + JavaUtils.checkArgument(valueCount >= 1, "Page must have at least one value, but it has " + valueCount); this.in = in; // Read the header this.blockSizeInValues = BytesUtils.readUnsignedVarInt(in); this.miniBlockNumInABlock = BytesUtils.readUnsignedVarInt(in); double miniSize = (double) blockSizeInValues / miniBlockNumInABlock; - Preconditions.checkArgument(miniSize % 8 == 0, + JavaUtils.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize); this.miniBlockSizeInValues = (int) miniSize; // True value count. May be less than valueCount because of nulls diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 97ec1f7b9eb8..60544665409d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -27,9 +27,9 @@ import org.apache.parquet.column.values.bitpacking.BytePacker; import org.apache.parquet.column.values.bitpacking.Packer; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; -import com.google.common.base.Preconditions; import org.apache.spark.SparkUnsupportedOperationException; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; /** @@ -122,7 +122,7 @@ public final class VectorizedRleValuesReader extends ValuesReader * Initializes the internal state for decoding ints of `bitWidth`. */ private void init(int bitWidth) { - Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); + JavaUtils.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth); this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org