[
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17141983#comment-17141983
]
Karsten Schnitter edited comment on KAFKA-10173 at 6/22/20, 12:21 PM:
----------------------------------------------------------------------
Hi [~vvcephei],
after trying for the whole weekend, I finally found a solution to provide the
messages. My approach is to change
{{org.apache.kafka.streams.state.internals.BufferValue}} in a way to log during
deserialisation:
{code:java}
public final class BufferValue {
// omitted
private static final Logger LOG =
LoggerFactory.getLogger(BufferValue.class);
static BufferValue deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context =
ProcessorRecordContext.deserialize(buffer);
LOG.debug("Deserialize with context <{}>", context);
try {
final byte[] priorValue = extractValue(buffer);
final byte[] oldValue;
final int oldValueLength = buffer.getInt();
if (oldValueLength == NULL_VALUE_SENTINEL) {
oldValue = null;
} else if (oldValueLength ==
OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
oldValue = new byte[oldValueLength];
buffer.get(oldValue);
}
final byte[] newValue = extractValue(buffer);
return new BufferValue(priorValue, oldValue, newValue,
context);
} catch (BufferUnderflowException underflow) {
LOG.error("Error deserializing buffer <{}>",
bytesToHex(buffer.array()));
throw underflow;
}
}
private static final char[] HEX_ARRAY =
"0123456789ABCDEF".toCharArray();
private static String bytesToHex(byte[] bytes) {
char[] hexChars = new char[bytes.length * 2];
for (int j = 0; j < bytes.length; j++) {
int v = bytes[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}
// omitted
}
{code}
The result of this is somewhat confusing to me:
{noformat}
DEBUG. Deserialize with context <ProcessorRecordContext{topic='logs-ingress',
partition=57, offset=0, timestamp=1592648746703, headers=RecordHeaders(headers
= [], isReadOnly = false)}>
ERROR. Error deserializing buffer
<00000172D14346CF00000000000000000000000C6C6F67732D696E6772657373000000390000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D14346CF>
{noformat}
# The DEBUG log of the context tells me, that a message from my main topic was
read. I would have expected a message from a store changelog. I guess the
context is misleading here?
# The ERROR log has a rather short message, that fits much better to a store
changelog. As a background: This is most likely a protobuf message containing
two uuids encoded as strings together with some integer or long numbers.
The system I used was freshly created on Saturday, June 20th. It is running a
Kafka Cluster on version 2.4.1 and the Kafka Streams application in version
2.3.1. The error was recorded with the same Kafka Streams application ugraded
to v2.5.0 with the modification from above. Since there are two stream threads,
I can provide another buffer that fails:
{noformat}
ERROR. Error deserializing buffer
<00000172D1434655000000000000006F0000000C6C6F67732D696E6772657373000000180000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D1434655>
{noformat}
Best Regards,
Karsten
was (Author: karsten.schnitter):
Hi [~vvcephei],
after trying for the whole weekend, I finally found a solution to provide the
messages. My approach is to change
{{org.apache.kafka.streams.state.internals.BufferValue}} in a way to log during
deserialisation:
{code:java}
public final class BufferValue {
// omitted
private static final Logger LOG =
LoggerFactory.getLogger(BufferValue.class);
static BufferValue deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context =
ProcessorRecordContext.deserialize(buffer);
LOG.debug("Deserialize with context <{}>", context);
try {
final byte[] priorValue = extractValue(buffer);
final byte[] oldValue;
final int oldValueLength = buffer.getInt();
if (oldValueLength == NULL_VALUE_SENTINEL) {
oldValue = null;
} else if (oldValueLength ==
OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
oldValue = new byte[oldValueLength];
buffer.get(oldValue);
}
final byte[] newValue = extractValue(buffer);
return new BufferValue(priorValue, oldValue, newValue,
context);
} catch (BufferUnderflowException underflow) {
LOG.error("Error deserializing buffer <{}>",
bytesToHex(buffer.array()));
throw underflow;
}
}
private static final char[] HEX_ARRAY =
"0123456789ABCDEF".toCharArray();
private static String bytesToHex(byte[] bytes) {
char[] hexChars = new char[bytes.length * 2];
for (int j = 0; j < bytes.length; j++) {
int v = bytes[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}
// omitted
}
{code}
The result of this is somewhat confusing to me:
{noformat}
DEBUG. Deserialize with context <ProcessorRecordContext{topic='logs-ingress',
partition=57, offset=0, timestamp=1592648746703, headers=RecordHeaders(headers
= [], isReadOnly = false)}>
ERROR. Error deserializing buffer
<00000172D14346CF00000000000000000000000C6C6F67732D696E6772657373000000390000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D14346CF>
{noformat}
# The DEBUG log of the context tells me, that a message from my main topic was
read. I would have expected a message from a store changelog. I guess the
context is misleading here?
# The ERROR log has a rather short message, that fits much better to a store
changelog. As a background: This is most likely a protobuf message containing
two uuids encoded as strings together with some integer or long numbers.
The system I used was freshly created on Saturday, June 20th. It is running a
Kafka Cluster on version 2.4.1 and the Kafka Streams application in version
2.3.1. The error was recorded with the same Kafka Streams application updated
to v2.5.0 with the modification from above. Since there are two stream threads,
I can provide another buffer that fails:
{noformat}
ERROR. Error deserializing buffer
<00000172D1434655000000000000006F0000000C6C6F67732D696E6772657373000000180000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D1434655>
{noformat}
Best Regards,
Karsten
> BufferUnderflowException during Kafka Streams Upgrade
> -----------------------------------------------------
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.5.0
> Reporter: Karsten Schnitter
> Assignee: John Roesler
> Priority: Major
> Labels: suppress
> Fix For: 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I
> followed the steps described in the upgrade guide and set the property
> {{migrate.from=2.3}}. On my dev system with just one running instance I got
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during
> processing:
> java.nio.BufferUnderflowException: null
> at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
> at java.base/java.nio.ByteBuffer.get(Unknown Source)
> at
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
> at
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
> at
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
> at
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
> at
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the
> suppress feature. If I rename the changelog topics during the migration, the
> problem will not occur.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)