featzhang created FLINK-39401:
---------------------------------

             Summary: [format] Extend Raw Format to Support Line Delimiter and 
Multi-Row Splitting
                 Key: FLINK-39401
                 URL: https://issues.apache.org/jira/browse/FLINK-39401
             Project: Flink
          Issue Type: Improvement
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
            Reporter: featzhang


Add a new optional configuration option {{raw.line-delimiter}} to the Raw 
format, enabling deserialization of line-delimited messages into multiple 
records and appending delimiters during serialization.

----

h2. Motivation

The Raw format currently treats each incoming message as a single record. This 
makes it difficult to handle common scenarios where messages contain multiple 
lines (e.g., newline-separated text) or use a custom delimiter to separate 
logical records within a single message payload.

A concrete use case: a Kafka topic where each message contains multiple records 
separated by {{\\n}}, and users want Flink to split each message into 
individual rows automatically, without writing custom UDFs or pre-processing 
logic.

This feature adds a {{raw.line-delimiter}} option that enables:
# *Deserialization*: Split incoming byte messages by the configured delimiter, 
emitting one {{RowData}} per segment.
# *Serialization*: Append the configured delimiter bytes after each serialized 
record.

----

h2. Public Interfaces

A new optional config option is added to {{RawFormatOptions}}:

{code:java}
public static final ConfigOption<String> LINE_DELIMITER =
    ConfigOptions.key("raw.line-delimiter")
        .stringType()
        .noDefaultValue()
        .withDescription(
            "Optional line delimiter for the raw format. "
                + "When set, deserialization splits the input by this delimiter 
and emits one record per segment. "
                + "Serialization appends the delimiter after each record. "
                + "Supports Java escape sequences such as '\\n' and '\\r\\n'.");
{code}

*Example DDL usage:*

{code:sql}
CREATE TABLE source_table (
  data STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'my-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'raw',
  'raw.line-delimiter' = '\n'
);
{code}

----

h2. Proposed Changes

h3. RawFormatOptions.java

Add a new {{ConfigOption}}:

{code:java}
public static final ConfigOption<String> LINE_DELIMITER =
    ConfigOptions.key("raw.line-delimiter")
        .stringType()
        .noDefaultValue()
        .withDescription("...");
{code}

h3. RawFormatDeserializationSchema.java

* Add a new constructor parameter {{@Nullable String lineDelimiter}}.
* Override {{deserialize(byte[] message, Collector<RowData> out)}} to split by 
delimiter.
* When {{lineDelimiter == null}}: fall back to original single-record behavior.
* When {{lineDelimiter}} is set: decode message with the configured charset, 
split using {{Pattern.quote(lineDelimiter)}}, and collect one {{RowData}} per 
segment.

{code:java}
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
    if (message == null) {
        return;
    }
    if (lineDelimiter == null) {
        out.collect(deserialize(message));
        return;
    }
    String decoded = new String(message, Charset.forName(charsetName));
    String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1);
    for (String part : parts) {
        
out.collect(converter.convert(part.getBytes(Charset.forName(charsetName))));
    }
}
{code}

h3. RawFormatSerializationSchema.java

* Add a new constructor parameter {{@Nullable String lineDelimiter}}.
* In {{serialize()}}, append the delimiter bytes after the serialized value 
when {{lineDelimiter != null}}.

{code:java}
@Override
public byte[] serialize(RowData row) {
    byte[] valueBytes = converter.convert(row);
    if (lineDelimiter == null || valueBytes == null) {
        return valueBytes;
    }
    byte[] delimiterBytes = 
lineDelimiter.getBytes(Charset.forName(charsetName));
    byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + 
delimiterBytes.length);
    System.arraycopy(delimiterBytes, 0, result, valueBytes.length, 
delimiterBytes.length);
    return result;
}
{code}

h3. RawFormatFactory.java

* Register {{LINE_DELIMITER}} in {{optionalOptions()}}.
* Read the option and pass it to both schema constructors.

----

h2. Compatibility, Deprecation, and Migration Plan

* *Fully backward compatible*: {{raw.line-delimiter}} is optional with no 
default value. Existing jobs without this option continue to behave identically.
* No deprecation or migration required.
* The change follows the existing pattern of optional format options in the Raw 
format (e.g., {{raw.charset}}, {{raw.endianness}}).

----

h2. Test Plan

A new test class {{RawFormatLineDelimiterTest}} covers the following scenarios:

|| Test Case || Description ||
| {{testDeserializeWithoutDelimiter_singleRow}} | Backward compatibility: no 
delimiter, single record emitted |
| {{testDeserializeWithNewlineDelimiter_multipleRows}} | Split on {{\\n}}, 
multiple records emitted |
| {{testDeserializeWithCustomMultiCharDelimiter}} | Split on custom multi-char 
delimiter {{||}} |
| {{testDeserializeWithNullMessage_noOutput}} | Null message is handled 
gracefully (no output) |
| {{testDeserializeWithGbkCharset}} | Multi-byte charset (GBK) support |
| {{testSerializeWithoutDelimiter_noAppend}} | Backward compatibility: no 
delimiter appended |
| {{testSerializeWithNewlineDelimiter_appendsDelimiter}} | Delimiter {{\\n}} 
appended after serialized value |
| {{testSerializeWithCustomDelimiter_appendsDelimiter}} | Custom delimiter 
{{||}} appended |
| {{testSerializeNullRow_returnsNull}} | Null row returns null (no delimiter 
appended) |

Factory wiring is validated in {{RawFormatFactoryTest#testLineDelimiterOption}}.

----

h2. Rejected Alternatives

* *Handling trailing empty strings from split*: Java's {{String.split(regex, 
-1)}} with a negative limit retains trailing empty strings, which may produce 
unexpected empty records when the input ends with the delimiter. The current 
implementation uses {{-1}} for correctness; filtering trailing empty segments 
could be considered as a follow-up improvement.
* *Defaulting to {{\\n}}*: Making {{\\n}} the default would break backward 
compatibility for existing users who rely on raw format treating the entire 
message as a single record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to