This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 35d286b [SPARK-31228][DSTREAMS] Add version information to the
configuration of Kafka
35d286b is described below
commit 35d286bafb248a47a1125908c0208cd759dd0416
Author: beliefer <[email protected]>
AuthorDate: Thu Mar 26 20:11:15 2020 +0900
[SPARK-31228][DSTREAMS] Add version information to the configuration of
Kafka
### What changes were proposed in this pull request?
Add version information to the configuration of Kafka.
I sorted out some information show below.
Item name | Since version | JIRA ID | Commit ID | Note
-- | -- | -- | -- | --
spark.streaming.kafka.consumer.cache.enabled | 2.2.1 | SPARK-19185 |
02cf178bb2a7dc8b4c06eb040c44b6453e41ed15#diff-c465bbcc83b2ecc7530d1c0128e4432b
|
spark.streaming.kafka.consumer.poll.ms | 2.0.1 | SPARK-12177 |
3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32
|
spark.streaming.kafka.consumer.cache.initialCapacity | 2.0.1 | SPARK-12177
|
3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32
|
spark.streaming.kafka.consumer.cache.maxCapacity | 2.0.1 | SPARK-12177 |
3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32
|
spark.streaming.kafka.consumer.cache.loadFactor | 2.0.1 | SPARK-12177 |
3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32
|
spark.streaming.kafka.maxRatePerPartition | 1.3.0 | SPARK-4964 |
a119cae48030520da9f26ee9a1270bed7f33031e#diff-26cb4369f86050dc2e75cd16291b2844
|
spark.streaming.kafka.minRatePerPartition | 2.4.0 | SPARK-25233 |
135ff16a3510a4dfb3470904004dae9848005019#diff-815f6ec5caf9e4beb355f5f981171f1f
|
spark.streaming.kafka.allowNonConsecutiveOffsets | 2.3.1 | SPARK-24067 |
1d598b771de3b588a2f377ae7ccf8193156641f2#diff-4597d93a0e951f7199697dba7dd0dc32
|
spark.kafka.producer.cache.timeout | 2.2.1 | SPARK-19968 |
f6730a70cb47ebb3df7f42209df7b076aece1093#diff-ac8844e8d791a75aaee3d0d10bfc1f2a
|
spark.kafka.producer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-21869 |
7bff2db9ed803e05a43c2d875c1dea819d81248a#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.consumer.cache.capacity | 3.0.0 | SPARK-27687 |
efa303581ac61d6f517aacd08883da2d01530bd2#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.consumer.cache.jmx.enable | 3.0.0 | SPARK-25151 |
594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.consumer.cache.timeout | 3.0.0 | SPARK-25151 |
594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.consumer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 |
594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.consumer.fetchedData.cache.timeout | 3.0.0 | SPARK-25151 |
594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 3.0.0 |
SPARK-25151 |
594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd
|
spark.kafka.clusters.${cluster}.auth.bootstrap.servers | 3.0.0 |
SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex | 3.0.0 |
SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.security.protocol | 3.0.0 | SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name | 3.0.0 |
SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.ssl.truststore.location | 3.0.0 |
SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.ssl.truststore.password | 3.0.0 |
SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.ssl.keystore.location | 3.0.0 | SPARK-27294
|
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.ssl.keystore.password | 3.0.0 | SPARK-27294
|
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.ssl.key.password | 3.0.0 | SPARK-27294 |
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
spark.kafka.clusters.${cluster}.sasl.token.mechanism | 3.0.0 | SPARK-27294
|
2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49
|
### Why are the changes needed?
Supplemental configuration version information.
### Does this PR introduce any user-facing change?
'No'.
### How was this patch tested?
Exists UT
Closes #27989 from beliefer/add-version-to-kafka-config.
Authored-by: beliefer <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
docs/configuration.md | 2 ++
docs/structured-streaming-kafka-integration.md | 26 +++++++++++++---
.../org/apache/spark/sql/kafka010/package.scala | 8 +++++
.../apache/spark/streaming/kafka010/package.scala | 36 +++++++++++++---------
4 files changed, 54 insertions(+), 18 deletions(-)
diff --git a/docs/configuration.md b/docs/configuration.md
index 9e5f5b6..a7a1477 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2661,6 +2661,7 @@ Spark subsystems.
<a href="streaming-kafka-0-10-integration.html">Kafka Integration guide</a>
for more details.
</td>
+ <td>1.3.0</td>
</tr>
<tr>
<td><code>spark.streaming.kafka.minRatePerPartition</code></td>
@@ -2669,6 +2670,7 @@ Spark subsystems.
Minimum rate (number of records per second) at which data will be read
from each Kafka
partition when using the new Kafka direct stream API.
</td>
+ <td>2.4.0</td>
</tr>
<tr>
<td><code>spark.streaming.ui.retainedBatches</code></td>
diff --git a/docs/structured-streaming-kafka-integration.md
b/docs/structured-streaming-kafka-integration.md
index a1eeee5..016faa7 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -525,21 +525,24 @@ The caching key is built up from the following
information:
The following properties are available to configure the consumer pool:
<table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
<tr>
<td>spark.kafka.consumer.cache.capacity</td>
<td>The maximum number of consumers cached. Please note that it's a soft
limit.</td>
<td>64</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.timeout</td>
<td>The minimum amount of time a consumer may sit idle in the pool before it
is eligible for eviction by the evictor.</td>
<td>5m (5 minutes)</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for
consumer pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minute)</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.jmx.enable</td>
@@ -547,6 +550,7 @@ The following properties are available to configure the
consumer pool:
The prefix of JMX name is set to
"kafka010-cached-simple-kafka-consumer-pool".
</td>
<td>false</td>
+ <td>3.0.0</td>
</tr>
</table>
@@ -571,16 +575,18 @@ Note that it doesn't leverage Apache Commons Pool due to
the difference of chara
The following properties are available to configure the fetched data pool:
<table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
<tr>
<td>spark.kafka.consumer.fetchedData.cache.timeout</td>
<td>The minimum amount of time a fetched data may sit idle in the pool
before it is eligible for eviction by the evictor.</td>
<td>5m (5 minutes)</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for fetched
data pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minute)</td>
+ <td>3.0.0</td>
</tr>
</table>
@@ -816,16 +822,18 @@ It will use different Kafka producer when delegation
token is renewed; Kafka pro
The following properties are available to configure the producer pool:
<table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
<tr>
<td>spark.kafka.producer.cache.timeout</td>
<td>The minimum amount of time a producer may sit idle in the pool before it
is eligible for eviction by the evictor.</td>
<td>10m (10 minutes)</td>
+ <td>2.2.1</td>
</tr>
<tr>
<td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for
producer pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minute)</td>
+ <td>3.0.0</td>
</tr>
</table>
@@ -935,7 +943,7 @@ When none of the above applies then unsecure connection
assumed.
Delegation tokens can be obtained from multiple clusters and
<code>${cluster}</code> is an arbitrary unique identifier which helps to group
different configurations.
<table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
<td>None</td>
@@ -943,6 +951,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
A list of coma separated host/port pairs to use for establishing the
initial connection
to the Kafka cluster. For further details please see Kafka
documentation. Only used to obtain delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
@@ -953,6 +962,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
If multiple clusters match the address, an exception will be thrown and
the query won't be started.
Kafka's secure and unsecure listeners are bound to different ports. When
both used the secure listener port has to be part of the regular expression.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
@@ -962,6 +972,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
<code>bootstrap.servers</code> config matches (for further details
please see
<code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
and can be overridden by setting <code>kafka.security.protocol</code> on
the source or sink.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
@@ -970,6 +981,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
The Kerberos principal name that Kafka runs as. This can be defined
either in Kafka's JAAS config or in Kafka's config.
For further details please see Kafka documentation. Only used to obtain
delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
@@ -977,6 +989,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
<td>
The location of the trust store file. For further details please see
Kafka documentation. Only used to obtain delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
@@ -985,6 +998,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
The store password for the trust store file. This is optional and only
needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code>
is configured.
For further details please see Kafka documentation. Only used to obtain
delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
@@ -993,6 +1007,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
The location of the key store file. This is optional for client and can
be used for two-way authentication for client.
For further details please see Kafka documentation. Only used to obtain
delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
@@ -1001,6 +1016,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
The store password for the key store file. This is optional and only
needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is
configured.
For further details please see Kafka documentation. Only used to obtain
delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
@@ -1009,6 +1025,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
The password of the private key in the key store file. This is optional
for client.
For further details please see Kafka documentation. Only used to obtain
delegation token.
</td>
+ <td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
@@ -1017,6 +1034,7 @@ Delegation tokens can be obtained from multiple clusters
and <code>${cluster}</c
SASL mechanism used for client connections with delegation token.
Because SCRAM login module used for authentication a compatible mechanism has
to be set here.
For further details please see Kafka documentation
(<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker
with delegation token.
</td>
+ <td>3.0.0</td>
</tr>
</table>
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
index 460bb8b..b7e42c0 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
@@ -29,6 +29,7 @@ package object kafka010 { // scalastyle:ignore
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
ConfigBuilder("spark.kafka.producer.cache.timeout")
.doc("The expire time to remove the unused producers.")
+ .version("2.2.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10m")
@@ -36,6 +37,7 @@ package object kafka010 { // scalastyle:ignore
ConfigBuilder("spark.kafka.producer.cache.evictorThreadRunInterval")
.doc("The interval of time between runs of the idle evictor thread for
producer pool. " +
"When non-positive, no idle evictor thread will be run.")
+ .version("3.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1m")
@@ -43,12 +45,14 @@ package object kafka010 { // scalastyle:ignore
ConfigBuilder("spark.kafka.consumer.cache.capacity")
.doc("The maximum number of consumers cached. Please note it's a soft
limit" +
" (check Structured Streaming Kafka integration guide for further
details).")
+ .version("3.0.0")
.intConf
.createWithDefault(64)
private[kafka010] val CONSUMER_CACHE_JMX_ENABLED =
ConfigBuilder("spark.kafka.consumer.cache.jmx.enable")
.doc("Enable or disable JMX for pools created with this configuration
instance.")
+ .version("3.0.0")
.booleanConf
.createWithDefault(false)
@@ -57,6 +61,7 @@ package object kafka010 { // scalastyle:ignore
.doc("The minimum amount of time a consumer may sit idle in the pool
before " +
"it is eligible for eviction by the evictor. " +
"When non-positive, no consumers will be evicted from the pool due to
idle time alone.")
+ .version("3.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
@@ -64,6 +69,7 @@ package object kafka010 { // scalastyle:ignore
ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval")
.doc("The interval of time between runs of the idle evictor thread for
consumer pool. " +
"When non-positive, no idle evictor thread will be run.")
+ .version("3.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1m")
@@ -72,6 +78,7 @@ package object kafka010 { // scalastyle:ignore
.doc("The minimum amount of time a fetched data may sit idle in the pool
before " +
"it is eligible for eviction by the evictor. " +
"When non-positive, no fetched data will be evicted from the pool due
to idle time alone.")
+ .version("3.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
@@ -79,6 +86,7 @@ package object kafka010 { // scalastyle:ignore
ConfigBuilder("spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval")
.doc("The interval of time between runs of the idle evictor thread for
fetched data pool. " +
"When non-positive, no idle evictor thread will be run.")
+ .version("3.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1m")
}
diff --git
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
index 3d2921f..0679a49 100644
---
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
+++
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -26,43 +26,51 @@ package object kafka010 { //scalastyle:ignore
private[spark] val CONSUMER_CACHE_ENABLED =
ConfigBuilder("spark.streaming.kafka.consumer.cache.enabled")
+ .version("2.2.1")
.booleanConf
.createWithDefault(true)
private[spark] val CONSUMER_POLL_MS =
ConfigBuilder("spark.streaming.kafka.consumer.poll.ms")
- .longConf
- .createOptional
+ .version("2.0.1")
+ .longConf
+ .createOptional
private[spark] val CONSUMER_CACHE_INITIAL_CAPACITY =
ConfigBuilder("spark.streaming.kafka.consumer.cache.initialCapacity")
- .intConf
- .createWithDefault(16)
+ .version("2.0.1")
+ .intConf
+ .createWithDefault(16)
private[spark] val CONSUMER_CACHE_MAX_CAPACITY =
ConfigBuilder("spark.streaming.kafka.consumer.cache.maxCapacity")
- .intConf
- .createWithDefault(64)
+ .version("2.0.1")
+ .intConf
+ .createWithDefault(64)
private[spark] val CONSUMER_CACHE_LOAD_FACTOR =
ConfigBuilder("spark.streaming.kafka.consumer.cache.loadFactor")
- .doubleConf
- .createWithDefault(0.75)
+ .version("2.0.1")
+ .doubleConf
+ .createWithDefault(0.75)
private[spark] val MAX_RATE_PER_PARTITION =
ConfigBuilder("spark.streaming.kafka.maxRatePerPartition")
- .longConf
- .createWithDefault(0)
+ .version("1.3.0")
+ .longConf
+ .createWithDefault(0)
private[spark] val MIN_RATE_PER_PARTITION =
ConfigBuilder("spark.streaming.kafka.minRatePerPartition")
- .longConf
- .createWithDefault(1)
+ .version("2.4.0")
+ .longConf
+ .createWithDefault(1)
private[spark] val ALLOW_NON_CONSECUTIVE_OFFSETS =
ConfigBuilder("spark.streaming.kafka.allowNonConsecutiveOffsets")
- .booleanConf
- .createWithDefault(false)
+ .version("2.3.1")
+ .booleanConf
+ .createWithDefault(false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]