featzhang created FLINK-39401:
---------------------------------
Summary: [format] Extend Raw Format to Support Line Delimiter and
Multi-Row Splitting
Key: FLINK-39401
URL: https://issues.apache.org/jira/browse/FLINK-39401
Project: Flink
Issue Type: Improvement
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: featzhang
Add a new optional configuration option {{raw.line-delimiter}} to the Raw
format, enabling deserialization of line-delimited messages into multiple
records and appending delimiters during serialization.
----
h2. Motivation
The Raw format currently treats each incoming message as a single record. This
makes it difficult to handle common scenarios where messages contain multiple
lines (e.g., newline-separated text) or use a custom delimiter to separate
logical records within a single message payload.
A concrete use case: a Kafka topic where each message contains multiple records
separated by {{\\n}}, and users want Flink to split each message into
individual rows automatically, without writing custom UDFs or pre-processing
logic.
This feature adds a {{raw.line-delimiter}} option that enables:
# *Deserialization*: Split incoming byte messages by the configured delimiter,
emitting one {{RowData}} per segment.
# *Serialization*: Append the configured delimiter bytes after each serialized
record.
----
h2. Public Interfaces
A new optional config option is added to {{RawFormatOptions}}:
{code:java}
public static final ConfigOption<String> LINE_DELIMITER =
ConfigOptions.key("raw.line-delimiter")
.stringType()
.noDefaultValue()
.withDescription(
"Optional line delimiter for the raw format. "
+ "When set, deserialization splits the input by this delimiter
and emits one record per segment. "
+ "Serialization appends the delimiter after each record. "
+ "Supports Java escape sequences such as '\\n' and '\\r\\n'.");
{code}
*Example DDL usage:*
{code:sql}
CREATE TABLE source_table (
data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'raw',
'raw.line-delimiter' = '\n'
);
{code}
----
h2. Proposed Changes
h3. RawFormatOptions.java
Add a new {{ConfigOption}}:
{code:java}
public static final ConfigOption<String> LINE_DELIMITER =
ConfigOptions.key("raw.line-delimiter")
.stringType()
.noDefaultValue()
.withDescription("...");
{code}
h3. RawFormatDeserializationSchema.java
* Add a new constructor parameter {{@Nullable String lineDelimiter}}.
* Override {{deserialize(byte[] message, Collector<RowData> out)}} to split by
delimiter.
* When {{lineDelimiter == null}}: fall back to original single-record behavior.
* When {{lineDelimiter}} is set: decode message with the configured charset,
split using {{Pattern.quote(lineDelimiter)}}, and collect one {{RowData}} per
segment.
{code:java}
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
if (message == null) {
return;
}
if (lineDelimiter == null) {
out.collect(deserialize(message));
return;
}
String decoded = new String(message, Charset.forName(charsetName));
String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1);
for (String part : parts) {
out.collect(converter.convert(part.getBytes(Charset.forName(charsetName))));
}
}
{code}
h3. RawFormatSerializationSchema.java
* Add a new constructor parameter {{@Nullable String lineDelimiter}}.
* In {{serialize()}}, append the delimiter bytes after the serialized value
when {{lineDelimiter != null}}.
{code:java}
@Override
public byte[] serialize(RowData row) {
byte[] valueBytes = converter.convert(row);
if (lineDelimiter == null || valueBytes == null) {
return valueBytes;
}
byte[] delimiterBytes =
lineDelimiter.getBytes(Charset.forName(charsetName));
byte[] result = Arrays.copyOf(valueBytes, valueBytes.length +
delimiterBytes.length);
System.arraycopy(delimiterBytes, 0, result, valueBytes.length,
delimiterBytes.length);
return result;
}
{code}
h3. RawFormatFactory.java
* Register {{LINE_DELIMITER}} in {{optionalOptions()}}.
* Read the option and pass it to both schema constructors.
----
h2. Compatibility, Deprecation, and Migration Plan
* *Fully backward compatible*: {{raw.line-delimiter}} is optional with no
default value. Existing jobs without this option continue to behave identically.
* No deprecation or migration required.
* The change follows the existing pattern of optional format options in the Raw
format (e.g., {{raw.charset}}, {{raw.endianness}}).
----
h2. Test Plan
A new test class {{RawFormatLineDelimiterTest}} covers the following scenarios:
|| Test Case || Description ||
| {{testDeserializeWithoutDelimiter_singleRow}} | Backward compatibility: no
delimiter, single record emitted |
| {{testDeserializeWithNewlineDelimiter_multipleRows}} | Split on {{\\n}},
multiple records emitted |
| {{testDeserializeWithCustomMultiCharDelimiter}} | Split on custom multi-char
delimiter {{||}} |
| {{testDeserializeWithNullMessage_noOutput}} | Null message is handled
gracefully (no output) |
| {{testDeserializeWithGbkCharset}} | Multi-byte charset (GBK) support |
| {{testSerializeWithoutDelimiter_noAppend}} | Backward compatibility: no
delimiter appended |
| {{testSerializeWithNewlineDelimiter_appendsDelimiter}} | Delimiter {{\\n}}
appended after serialized value |
| {{testSerializeWithCustomDelimiter_appendsDelimiter}} | Custom delimiter
{{||}} appended |
| {{testSerializeNullRow_returnsNull}} | Null row returns null (no delimiter
appended) |
Factory wiring is validated in {{RawFormatFactoryTest#testLineDelimiterOption}}.
----
h2. Rejected Alternatives
* *Handling trailing empty strings from split*: Java's {{String.split(regex,
-1)}} with a negative limit retains trailing empty strings, which may produce
unexpected empty records when the input ends with the delimiter. The current
implementation uses {{-1}} for correctness; filtering trailing empty segments
could be considered as a follow-up improvement.
* *Defaulting to {{\\n}}*: Making {{\\n}} the default would break backward
compatibility for existing users who rely on raw format treating the entire
message as a single record.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)