agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3042443107


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,183 +160,841 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .required(true)
         .build();
 
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description("""
+                    The maximum amount of data to send in a single 
Elasticsearch _bulk API request. \
+                    For NDJSON and JSON Array modes, FlowFiles are accumulated 
until this threshold is reached, then flushed. \
+                    For Single JSON mode, this acts as a size-based safety 
limit: if the accumulated FlowFiles exceed this size \
+                    before Max FlowFiles Per Batch is reached, the request is 
flushed early. \
+                    Elasticsearch recommends 5-15 MB per bulk request for 
optimal performance. \
+                    To disable the size-based limit, check "Set Empty String".\
+                    """)
+            .defaultValue("10 MB")
+            .addValidator((subject, input, context) -> 
StringUtils.isBlank(input)
+                    ? new 
ValidationResult.Builder().subject(subject).valid(true).build()
+                    : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject, 
input, context))
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor INPUT_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Input Content Format")
+            .description("""
+                    The format of the JSON content in each FlowFile. \
+                    NDJSON: one JSON object per line (newline-delimited). \
+                    JSON Array: a top-level JSON array of objects, streamed 
element-by-element for memory efficiency. \
+                    Single JSON: the entire FlowFile is a single JSON 
document.\
+                    """)
+            .allowableValues(InputFormat.class)
+            .defaultValue(InputFormat.SINGLE_JSON.getValue())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+            .name("Max FlowFiles Per Batch")
+            .displayName("Max FlowFiles Per Batch")
+            .description("""
+                    The maximum number of FlowFiles to include in a single 
Elasticsearch _bulk API request. \
+                    If the accumulated FlowFiles exceed Max Batch Size before 
this count is reached, the request will be flushed early.\
+                    """)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor SUPPRESS_NULLS = new 
PropertyDescriptor.Builder()
+            .name("Suppress Nulls")
+            .description("""
+                    When set to Always Suppress, null and empty values are 
removed from documents before they are sent to Elasticsearch.
+                    This setting applies to NDJSON and JSON Array formats for 
Index and Create operations only. \
+                    For Single JSON, configure null suppression on the 
controller service instead.
+                    Performance note: for JSON Array the impact is negligible 
since documents are already being parsed. \
+                    For NDJSON, each line must be parsed and re-serialized 
when suppression is enabled, \
+                    which adds overhead compared to the default behaviour of 
passing lines through as raw bytes.\
+                    """)
+            .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS, 
ElasticSearchClientService.ALWAYS_SUPPRESS)
+            .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+            .required(true)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Identifier Attribute")
+            .description("""
+                    The name of the FlowFile attribute containing the 
identifier for the document. \
+                    If the Index Operation is "index", this property may be 
left empty or evaluate to an empty value, \
+                    in which case the document's identifier will be 
auto-generated by Elasticsearch. \
+                    For all other Index Operations, the attribute must 
evaluate to a non-empty value.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field")
+            .description("""
+                    The name of the field within each document to use as the 
Elasticsearch document ID. \
+                    If the field is not present in a document or this property 
is left blank, no document ID is set \
+                    and Elasticsearch will auto-generate one.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+            .name("bulk_request")
+            .description("When \"Output Bulk Request\" is enabled, the raw 
Elasticsearch _bulk API request body is written " +
+                    "to this relationship as a FlowFile for inspection or 
debugging.")
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BULK_REQUEST = new 
PropertyDescriptor.Builder()
+            .name("Output Bulk Request")
+            .description("If enabled, each Elasticsearch _bulk request body is 
written as a FlowFile to the \"" +
+                    REL_BULK_REQUEST.getName() + "\" relationship. Useful for 
debugging. " +
+                    "Each FlowFile contains the full NDJSON body exactly as 
sent to Elasticsearch.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = List.of(
-            ID_ATTRIBUTE,
             INDEX_OP,
             INDEX,
             TYPE,
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
+            SUPPRESS_NULLS,
+            ID_ATTRIBUTE,
+            IDENTIFIER_FIELD,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+    private ObjectWriter suppressingWriter;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
         config.renameProperty("put-es-json-scripted-upsert", 
SCRIPTED_UPSERT.getName());
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON.
+        // Set this before migrating BATCH_SIZE so its dependsOn condition is 
satisfied when NiFi processes the property.
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor.
+        // INPUT_FORMAT is set first above so its dependsOn condition is 
satisfied when NiFi processes BATCH_SIZE.
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // MAX_BATCH_SIZE is a new property — existing configurations had no 
byte-based limit.
+        // Preserve that behavior on upgrade by leaving the property blank 
(unbounded).
+        // The custom validator on MAX_BATCH_SIZE accepts blank values, so 
this will not be
+        // overwritten by the default of 10 MB. New processor instances get 
the 10 MB default.
+        if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+            config.setProperty(MAX_BATCH_SIZE.getName(), "");
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        if (inputFormat != InputFormat.SINGLE_JSON
+                && 
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
 {
+            final ObjectMapper suppressingMapper = mapper.copy()
+                    .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+                    
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
+            this.suppressingWriter = suppressingMapper.writer();
+        } else {
+            this.suppressingWriter = null;
+        }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
-        final List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
+        final String maxBatchSizeStr = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().getValue();
+        final long maxBatchBytes = StringUtils.isNotBlank(maxBatchSizeStr)
+                ? 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()
+                : Long.MAX_VALUE;
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        final String idAttribute = inputFormat == InputFormat.SINGLE_JSON
+                ? context.getProperty(ID_ATTRIBUTE).getValue()
+                : null;
+        final String documentIdField = inputFormat != InputFormat.SINGLE_JSON
+                ? context.getProperty(IDENTIFIER_FIELD).getValue()
+                : null;
+        final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+                ? 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+                : Integer.MAX_VALUE;
+        int flowFilesProcessed = 0;
+
+        // Tracks all FlowFiles that were successfully parsed and submitted 
(even partially)
+        final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+        // Tracks FlowFiles that had at least one Elasticsearch document error
+        final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+        // Deferred bulk-error attributes: applied after each FlowFile's 
InputStream is closed
+        final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new 
HashMap<>();
+        // Failed local record indices per FlowFile — O(error count) memory; 
used at finalization
+        // to re-read FlowFiles once and reconstruct error/success content 
without buffering all bytes
+        final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new 
LinkedHashMap<>();
+
+        // Current chunk accumulation — operationFlowFiles and 
operationRecordIndices are parallel to operations (same index)
+        final List<FlowFile> operationFlowFiles = new ArrayList<>();
+        final List<IndexOperationRequest> operations = new ArrayList<>();
+        final List<Integer> operationRecordIndices = new ArrayList<>();
+        long totalBytesAccumulated = 0;
+        long chunkBytes = 0;
+
+        while (flowFile != null) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+            final String flowFileIdAttribute = 
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
+
+            final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, flowFile);
+            final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+            final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+            final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, flowFile);
+            final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
+
+            boolean parseError = false;
+            try {
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                if (InputFormat.NDJSON == inputFormat) {
+                    // NDJSON: each non-blank line is one JSON document.
+                    // Index/Create operations pass raw UTF-8 bytes directly 
to avoid Map allocation.
+                    // Update/Delete/Upsert parse into a Map so the bulk 
serializer can wrap the payload.
+                    int localRecordIndex = 0;
+                    try (final BufferedReader reader = new BufferedReader(
+                            new InputStreamReader(session.read(flowFile), 
charset), READER_BUFFER_SIZE)) {
+                        String line;
+                        while ((line = reader.readLine()) != null) {
+                            final String trimmedLine = line.trim();
+                            if (trimmedLine.isEmpty()) {
+                                continue;
+                            }
+                            final IndexOperationRequest opRequest;
+                            final long docBytes;
+                            if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
+                                final String id = extractId(trimmedLine, 
documentIdField, flowFileIdAttribute);
+                                final byte[] rawJsonBytes;
+                                if (suppressingWriter != null) {
+                                    // Parse to Map so NON_NULL/NON_EMPTY 
inclusion filters apply during serialization.
+                                    // JsonNode tree serialization bypasses 
JsonInclude filters.
+                                    rawJsonBytes = 
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+                                } else {
+                                    rawJsonBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8);
+                                }
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = rawJsonBytes.length;
+                            } else {
+                                final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                final String id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+                            }
+                            operations.add(opRequest);
+                            operationFlowFiles.add(flowFile);
+                            operationRecordIndices.add(localRecordIndex++);
+                            chunkBytes += docBytes;
+                            totalBytesAccumulated += docBytes;
+                            if (chunkBytes >= maxBatchBytes) {
+                                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                operations.clear();
+                                operationFlowFiles.clear();
+                                operationRecordIndices.clear();
+                                chunkBytes = 0;
+                            }
+                        }
+                    }
+                } else if (InputFormat.JSON_ARRAY == inputFormat) {
+                    // JSON Array: the FlowFile may contain one or more 
top-level JSON arrays.
+                    // Arrays are read sequentially; elements within each 
array are streamed one at a
+                    // time via JsonParser to avoid loading the entire content 
into memory.
+                    // Each element is re-serialized to compact bytes for 
Index/Create,
+                    // or parsed into a Map for Update/Delete/Upsert.
+                    int localRecordIndex = 0;
+                    try (final InputStreamReader isr = new 
InputStreamReader(session.read(flowFile), charset);
+                         final JsonParser parser = 
mapper.getFactory().createParser(isr)) {
+                        JsonToken outerToken;
+                        while ((outerToken = parser.nextToken()) != null) {
+                            if (outerToken != JsonToken.START_ARRAY) {
+                                throw new IOException("Expected a JSON array 
but found: " + outerToken);
+                            }
+                            JsonToken token;
+                            while ((token = parser.nextToken()) != 
JsonToken.END_ARRAY) {
+                                if (token == null) {
+                                    throw new IOException("Malformed JSON 
Array: reached end of stream before the closing ']'. " +
+                                            "Verify the FlowFile content is a 
complete, valid JSON array.");
+                                }
+                                final long startOffset = 
parser.currentTokenLocation().getCharOffset();
+                                final IndexOperationRequest opRequest;
+                                if (o == IndexOperationRequest.Operation.Index 
|| o == IndexOperationRequest.Operation.Create) {
+                                    final long docBytes;
+                                    final byte[] rawJsonBytes;
+                                    final String id;
+                                    if (suppressingWriter != null) {
+                                        // Parse directly to Map so 
NON_NULL/NON_EMPTY inclusion filters apply during
+                                        // serialization. JsonNode tree 
serialization bypasses JsonInclude filters,
+                                        // and convertValue(node, Map) adds an 
extra serialization cycle.
+                                        final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                        docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                        rawJsonBytes = 
suppressingWriter.writeValueAsBytes(contentMap);
+                                        id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                    } else {
+                                        final JsonNode node = 
mapper.readTree(parser);
+                                        docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                        rawJsonBytes = 
mapper.writeValueAsBytes(node);
+                                        id = extractId(node, documentIdField, 
flowFileIdAttribute);
+                                    }
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                } else {
+                                    final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final String id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                }
+                                operations.add(opRequest);
+                                operationFlowFiles.add(flowFile);
+                                operationRecordIndices.add(localRecordIndex++);
+                                if (chunkBytes >= maxBatchBytes) {
+                                    flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                    operations.clear();
+                                    operationFlowFiles.clear();
+                                    operationRecordIndices.clear();
+                                    chunkBytes = 0;
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    // Single JSON: the entire FlowFile is one document parsed 
into a Map.
+                    // The client service serializes the Map, preserving 
null-suppression settings.
+                    try (final InputStream in = session.read(flowFile)) {
+                        final Map<String, Object> contentMap = 
mapReader.readValue(in);
+                        final String id = 
StringUtils.isNotBlank(flowFileIdAttribute) ? flowFileIdAttribute : null;
+                        final IndexOperationRequest opRequest = 
IndexOperationRequest.builder()
+                                .index(index)
+                                .type(type)
+                                .id(id)
+                                .fields(contentMap)
+                                .operation(o)
+                                .script(scriptMap)
+                                .scriptedUpsert(scriptedUpsert)
+                                .dynamicTemplates(dynamicTemplatesMap)
+                                .headerFields(bulkHeaderFields)
+                                .build();
+                        operations.add(opRequest);
+                        operationFlowFiles.add(flowFile);
+                        operationRecordIndices.add(0);
+                        final long docBytes = flowFile.getSize();
+                        chunkBytes += docBytes;
+                        totalBytesAccumulated += docBytes;
+                        if (chunkBytes >= maxBatchBytes) {
+                            flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                            operations.clear();
+                            operationFlowFiles.clear();
+                            operationRecordIndices.clear();
+                            chunkBytes = 0;
+                        }
+                    }
+                }
+            } catch (final ElasticsearchException ese) {
+                final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure.");
+                getLogger().error(msg, ese);
+                final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
+                // Route only the failing in-flight chunk to retry/failure. 
FlowFiles already
+                // successfully indexed by prior _bulk requests are routed 
normally to avoid
+                // duplicate indexing if those FlowFiles are re-processed on 
retry.
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ese, rel, session, true, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
+                return;
+            } catch (final IOException ioe) {
+                getLogger().error("Could not read FlowFile content as valid 
{}.", inputFormat, ioe);
+                removeFlowFileFromChunk(flowFile, operations, 
operationFlowFiles, operationRecordIndices);
+                flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ioe.getMessage());
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                parseError = true;
+            } catch (final Exception ex) {
+                getLogger().error("Failed processing records.", ex);
+                removeFlowFileFromChunk(flowFile, operations, 
operationFlowFiles, operationRecordIndices);
+                flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ex.getMessage());
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                parseError = true;
+            }
+
+            if (!parseError) {
+                allProcessedFlowFiles.add(flowFile);
+                // InputStream is now closed — safe to apply any deferred 
bulk-error attributes
+                applyPendingBulkErrors(flowFile, pendingBulkErrors, session);
+            }
 
-        final String idAttribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
+            flowFilesProcessed++;
 
-        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
-        final List<IndexOperationRequest> operations = new 
ArrayList<>(flowFiles.size());
+            // For Single JSON, stop when the count-based batch limit is 
reached; otherwise use size limit
+            if (InputFormat.SINGLE_JSON == inputFormat ? flowFilesProcessed >= 
batchSize : totalBytesAccumulated >= maxBatchBytes) {
+                break;
+            }
 
-        for (final FlowFile input : flowFiles) {
-            addOperation(operations, originals, idAttribute, context, session, 
input);
+            flowFile = session.get();
         }
 
-        if (!originals.isEmpty()) {
+        // Flush any remaining operations (all InputStreams are closed at this 
point)
+        if (!operations.isEmpty()) {
             try {
-                final List<FlowFile> errorDocuments = 
indexDocuments(operations, originals, context, session);
-                handleResponse(context, session, errorDocuments, originals);
-                session.transfer(originals, REL_ORIGINAL);
+                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, null, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
             } catch (final ElasticsearchException ese) {
                 final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
-                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure.");
                 getLogger().error(msg, ese);
                 final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
-                transferFlowFilesOnException(ese, rel, session, true, 
originals.toArray(new FlowFile[0]));
-            } catch (final JsonProcessingException jpe) {
-                getLogger().warn("Could not log Elasticsearch operation errors 
nor determine which documents errored.", jpe);
-                transferFlowFilesOnException(jpe, REL_ERRORS, session, true, 
originals.toArray(new FlowFile[0]));
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ese, rel, session, true, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
+                return;
             } catch (final Exception ex) {
                 getLogger().error("Could not index documents.", ex);
-                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
originals.toArray(new FlowFile[0]));
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
                 context.yield();
+                return;
             }
+        }
+
+        if (allProcessedFlowFiles.isEmpty()) {
+            getLogger().warn("No records successfully parsed for sending to 
Elasticsearch");
         } else {
-            getLogger().warn("No FlowFiles successfully parsed for sending to 
Elasticsearch");
+            handleFinalResponse(context, session, errorFlowFiles, 
allProcessedFlowFiles, pendingErrorRecordIndices, inputFormat);
         }
     }
 
-    private void addOperation(final List<IndexOperationRequest> operations, 
final List<FlowFile> originals, final String idAttribute,
-                              final ProcessContext context, final 
ProcessSession session, FlowFile input) {
-        final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
-        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
-        final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
-        final String id = StringUtils.isNotBlank(idAttribute) && 
StringUtils.isNotBlank(input.getAttribute(idAttribute)) ? 
input.getAttribute(idAttribute) : null;
-
-        final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, input);
-        final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean();
-        final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, input);
-
-        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, input);
-        final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
-
-        final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
-
-        try (final InputStream inStream = session.read(input)) {
-            final byte[] result = IOUtils.toByteArray(inStream);
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> contentMap = mapper.readValue(new 
String(result, charset), Map.class);
-
-            final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
-            operations.add(new IndexOperationRequest(index, type, id, 
contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap, 
bulkHeaderFields));
-
-            originals.add(input);
-        } catch (final IOException ioe) {
-            getLogger().error("Could not read FlowFile content valid JSON.", 
ioe);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ioe.getMessage());
-            session.penalize(input);
-            session.transfer(input, REL_FAILURE);
-        } catch (final Exception ex) {
-            getLogger().error("Could not index documents.", ex);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ex.getMessage());
-            session.penalize(input);
-            session.transfer(input, REL_FAILURE);
+    /**
+     * Removes all entries belonging to {@code target} from the three parallel 
lists, so that a
+     * FlowFile that failed mid-read does not leave stale references that 
would cause
+     * {@link org.apache.nifi.processor.exception.FlowFileHandlingException} 
on the next flush.
+     */
+    private void removeFlowFileFromChunk(final FlowFile target,
+                                          final List<IndexOperationRequest> 
operations,
+                                          final List<FlowFile> 
operationFlowFiles,
+                                          final List<Integer> 
operationRecordIndices) {
+        for (int i = operations.size() - 1; i >= 0; i--) {
+            if (operationFlowFiles.get(i) == target) {
+                operations.remove(i);
+                operationFlowFiles.remove(i);
+                operationRecordIndices.remove(i);
+            }
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getMapFromAttribute(final PropertyDescriptor 
propertyDescriptor, final ProcessContext context, final FlowFile input) {
-        final String propertyValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+    /**
+     * Applies any bulk-error attributes that were deferred for {@code 
flowFile} while its
+     * InputStream was open.  Must only be called after the InputStream has 
been closed.
+     */
+    private void applyPendingBulkErrors(final FlowFile flowFile,
+                                         final Map<FlowFile, List<Map<String, 
Object>>> pendingBulkErrors,
+                                         final ProcessSession session) {
+        final List<Map<String, Object>> errorList = 
pendingBulkErrors.remove(flowFile);
+        if (errorList == null) {
+            return;
+        }
         try {
-            return StringUtils.isNotBlank(propertyValue) ? 
mapper.readValue(propertyValue, Map.class) : Collections.emptyMap();
-        } catch (final JsonProcessingException jpe) {
-            throw new ProcessException(propertyDescriptor.getDisplayName() + " 
must be a String parsable into a JSON Object", jpe);
+            final Object errorObj = errorList.size() == 1 ? errorList.get(0) : 
errorList;
+            session.putAttribute(flowFile, "elasticsearch.bulk.error", 
mapper.writeValueAsString(errorObj));
+        } catch (final JsonProcessingException e) {
+            session.putAttribute(flowFile, "elasticsearch.bulk.error",
+                    String.format("{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                            e.getMessage().replace("\"", "\\\"")));
         }
     }
 
-    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> 
operations, final List<FlowFile> originals, final ProcessContext context, final 
ProcessSession session) throws IOException {
-        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, originals.getFirst());
+    /**
+     * Sends the accumulated batch of operations to Elasticsearch via the 
_bulk API and records
+     * any per-document errors. If {@code openStreamFlowFile} is non-null, 
error attributes for
+     * that FlowFile are deferred into {@code pendingBulkErrors} so they are 
applied after its
+     * InputStream is closed (calling {@code putAttribute} on an open FlowFile 
throws
+     * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}).
+     * <p>
+     * Only the local record index of each failed operation is stored (O(error 
count) memory).
+     * Error/success content is reconstructed by re-reading the FlowFile in
+     * {@link #handleFinalResponse}.
+     */
+    private void flushChunk(final List<IndexOperationRequest> operations, 
final List<FlowFile> operationFlowFiles,
+                             final List<Integer> operationRecordIndices,
+                             final Set<FlowFile> errorFlowFiles, final 
FlowFile openStreamFlowFile,
+                             final Map<FlowFile, List<Map<String, Object>>> 
pendingBulkErrors,
+                             final Map<FlowFile, Set<Integer>> 
pendingErrorRecordIndices,
+                             final ProcessContext context, final 
ProcessSession session) throws IOException {
+        if (outputBulkRequest) {
+            outputBulkRequestFlowFile(operations, operationFlowFiles, session);
+        }
+
+        final FlowFile firstFlowFile = operationFlowFiles.get(0);
+        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, firstFlowFile);
         final IndexOperationResponse response = 
clientService.get().bulk(operations,
-                new 
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), 
getRequestHeadersFromDynamicProperties(context, originals.getFirst())));
+                new 
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), 
getRequestHeadersFromDynamicProperties(context, firstFlowFile)));
 
         final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
-        final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
+
+        // Attach per-FlowFile error attributes; defer putAttribute for the 
FlowFile whose
+        // InputStream is currently open (putAttribute would throw 
FlowFileHandlingException).
+        // Record only the local record index of each failure — content is 
reconstructed at finalization.
+        final Map<FlowFile, List<Map<String, Object>>> flowFileErrors = new 
HashMap<>();
         errors.forEach((index, error) -> {
-            String errorMessage;
-            try {
-                errorMessage = mapper.writeValueAsString(error);
-            } catch (JsonProcessingException e) {
-                errorMessage = String.format(
-                        "{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
-                        e.getMessage().replace("\"", "\\\"")
-                );
+            final FlowFile flowFile = operationFlowFiles.get(index);
+            errorFlowFiles.add(flowFile);
+            flowFileErrors.computeIfAbsent(flowFile, k -> new 
ArrayList<>()).add(error);
+            pendingErrorRecordIndices.computeIfAbsent(flowFile, k -> new 
HashSet<>()).add(operationRecordIndices.get(index));
+        });
+        flowFileErrors.forEach((flowFile, errorList) -> {
+            if (flowFile == openStreamFlowFile) {
+                // Defer: InputStream still open — apply after the 
try-with-resources closes
+                pendingBulkErrors.computeIfAbsent(flowFile, k -> new 
ArrayList<>()).addAll(errorList);
+            } else {
+                try {
+                    final Object errorObj = errorList.size() == 1 ? 
errorList.get(0) : errorList;
+                    session.putAttribute(flowFile, "elasticsearch.bulk.error", 
mapper.writeValueAsString(errorObj));
+                } catch (final JsonProcessingException e) {
+                    session.putAttribute(flowFile, "elasticsearch.bulk.error",
+                            String.format("{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                                    e.getMessage().replace("\"", "\\\"")));
+                }
             }
-            errorDocuments.add(session.putAttribute(originals.get(index), 
"elasticsearch.bulk.error", errorMessage));
         });
 
         if (!errors.isEmpty()) {
-            handleElasticsearchDocumentErrors(errors, session, null);
+            handleElasticsearchDocumentErrors(errors, session, firstFlowFile);
         }
+    }
+
+    /**
+     * Serializes the pending batch into an Elasticsearch _bulk NDJSON body 
and writes it to a
+     * new FlowFile on the {@code bulk_request} relationship for inspection or 
debugging.
+     * Each operation produces an action metadata line followed by a document 
line
+     * (Delete operations have no document line). Failures here are logged and 
swallowed so
+     * that a debug-output failure does not abort the actual indexing.
+     */
+    private void outputBulkRequestFlowFile(final List<IndexOperationRequest> 
operations,
+                                            final List<FlowFile> 
operationFlowFiles,
+                                            final ProcessSession session) {
+        final FlowFile parent = operationFlowFiles.get(0);
+        FlowFile bulkRequestFF = session.create(parent);
+        try (final OutputStream out = session.write(bulkRequestFF)) {
+            for (final IndexOperationRequest op : operations) {
+                // Action metadata line
+                final Map<String, Object> actionBody = new LinkedHashMap<>();
+                if (StringUtils.isNotBlank(op.getIndex())) {
+                    actionBody.put("_index", op.getIndex());
+                }
+                if (StringUtils.isNotBlank(op.getType())) {
+                    actionBody.put("_type", op.getType());
+                }
+                if (StringUtils.isNotBlank(op.getId())) {
+                    actionBody.put("_id", op.getId());
+                }
+                if (op.getDynamicTemplates() != null && 
!op.getDynamicTemplates().isEmpty()) {
+                    actionBody.put("dynamic_templates", 
op.getDynamicTemplates());
+                }
+                if (op.getHeaderFields() != null) {
+                    actionBody.putAll(op.getHeaderFields());
+                }
 
-        return errorDocuments;
+                // Upsert maps to "update" in the ES bulk API
+                final String actionName = op.getOperation() == 
IndexOperationRequest.Operation.Upsert
+                        ? "update" : op.getOperation().getValue();
+                final Map<String, Object> actionLine = Map.of(actionName, 
actionBody);
+                out.write(mapper.writeValueAsBytes(actionLine));
+                out.write('\n');
+
+                // Document line (delete has no document)
+                if (op.getOperation() != 
IndexOperationRequest.Operation.Delete) {
+                    if (op.getRawJsonBytes() != null) {
+                        // Index/Create with rawJson path — write directly, no 
Map round-trip
+                        out.write(op.getRawJsonBytes());
+                    } else {
+                        final Map<String, Object> docLine = new 
LinkedHashMap<>();
+                        if (op.getOperation() == 
IndexOperationRequest.Operation.Update
+                                || op.getOperation() == 
IndexOperationRequest.Operation.Upsert) {
+                            if (op.getScript() != null && 
!op.getScript().isEmpty()) {
+                                docLine.put("script", op.getScript());
+                                if (op.isScriptedUpsert()) {
+                                    docLine.put("scripted_upsert", true);
+                                }
+                                if (op.getFields() != null && 
!op.getFields().isEmpty()) {
+                                    docLine.put("upsert", op.getFields());
+                                }
+                            } else {
+                                docLine.put("doc", op.getFields());
+                                if (op.getOperation() == 
IndexOperationRequest.Operation.Upsert) {
+                                    docLine.put("doc_as_upsert", true);
+                                }
+                            }
+                        } else {
+                            docLine.putAll(op.getFields());
+                        }
+                        out.write(mapper.writeValueAsBytes(docLine));
+                    }
+                    out.write('\n');
+                }
+            }
+        } catch (final IOException e) {
+            getLogger().warn("Failed to write bulk request FlowFile for 
inspection.", e);
+            session.remove(bulkRequestFF);
+            return;
+        }
+        bulkRequestFF = session.putAttribute(bulkRequestFF, 
"elasticsearch.bulk.operation.count", String.valueOf(operations.size()));
+        session.transfer(bulkRequestFF, REL_BULK_REQUEST);
     }
 
-    private void handleResponse(final ProcessContext context, final 
ProcessSession session, final List<FlowFile> errorDocuments, final 
List<FlowFile> originals) {
-        // clone FlowFiles to be transferred to errors/successful as the 
originals are pass through to REL_ORIGINAL
-        final List<FlowFile> copiedErrors = 
errorDocuments.stream().map(session::clone).toList();
+    /**
+     * Extracts the document ID from a raw JSON string using a streaming 
parser.
+     * Stops as soon as the target field is found, avoiding a full tree parse.
+     * Used for Index/Create operations to avoid the Map allocation overhead.
+     */
+    private String extractId(final String rawJson, final String idAttribute, 
final String flowFileIdAttribute) throws IOException {
+        if (StringUtils.isBlank(idAttribute)) {
+            return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+        }
+        try (final JsonParser p = mapper.getFactory().createParser(rawJson)) {
+            while (p.nextToken() != null) {
+                if (idAttribute.equals(p.currentName()) && p.nextToken() != 
null && !p.currentToken().isStructStart()) {
+                    final String value = p.getText();
+                    return StringUtils.isNotBlank(value) ? value
+                            : (StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null);
+                }
+            }
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Extracts the document ID from a pre-parsed JsonNode.
+     * Used for JSON Array Index/Create operations where the node is already 
available.
+     */
+    private String extractId(final JsonNode node, final String idAttribute, 
final String flowFileIdAttribute) {
+        if (StringUtils.isBlank(idAttribute)) {
+            return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+        }
+        final JsonNode idNode = node.get(idAttribute);
+        if (idNode != null && !idNode.isNull()) {
+            return idNode.asText();
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Resolves the document ID for Update/Delete/Upsert operations from the 
already-parsed content Map.
+     * Falls back to the FlowFile attribute value when the field is absent 
from the document.
+     */
+    private String resolveId(final Map<String, Object> contentMap, final 
String idAttribute, final String flowFileIdAttribute) {
+        if (StringUtils.isBlank(idAttribute)) {
+            return null;
+        }
+        final Object idObj = contentMap.get(idAttribute);
+        if (idObj != null) {
+            return idObj.toString();
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Reads a processor property as a JSON Object string and deserializes it 
into a Map.
+     * Returns an empty Map when the property is blank. Throws {@link 
ProcessException} if the
+     * value is not valid JSON or not a JSON Object.
+     */
+    private Map<String, Object> getMapFromAttribute(final PropertyDescriptor 
propertyDescriptor, final ProcessContext context, final FlowFile input) {
+        final String propertyValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+        try {
+            return StringUtils.isNotBlank(propertyValue) ? 
mapper.readValue(propertyValue, new TypeReference<Map<String, Object>>() { }) : 
Collections.emptyMap();
+        } catch (final JsonProcessingException jpe) {
+            throw new ProcessException(propertyDescriptor.getDisplayName() + " 
must be a String parsable into a JSON Object", jpe);
+        }
+    }
+
+    /**
+     * Routes all successfully processed FlowFiles to their final 
relationships after all
+     * _bulk requests for the trigger have completed.
+     * <ul>
+     *   <li>FlowFiles with no errors are cloned to {@code REL_SUCCESSFUL} 
without re-reading.</li>
+     *   <li>FlowFiles with errors are re-read exactly once to split records 
by failed indices:
+     *       failed records go to a clone on {@code REL_ERRORS}; successful 
records (if any) go
+     *       to a clone on {@code REL_SUCCESSFUL}. For SINGLE_JSON the 
FlowFile is always a single
+     *       document so a direct clone is used instead of re-reading.</li>
+     *   <li>All FlowFiles go to {@code REL_ORIGINAL}.</li>
+     * </ul>
+     */
+    private void handleFinalResponse(final ProcessContext context, final 
ProcessSession session,
+                                     final Set<FlowFile> errorFlowFiles, final 
Set<FlowFile> allFlowFiles,
+                                     final Map<FlowFile, Set<Integer>> 
pendingErrorRecordIndices,
+                                     final InputFormat inputFormat) {
+        final List<FlowFile> copiedErrors = new ArrayList<>();
+        final List<FlowFile> successfulDocuments = new ArrayList<>();
+
+        for (final FlowFile ff : allFlowFiles) {
+            if (!errorFlowFiles.contains(ff)) {
+                // All records succeeded: clone the original without re-reading
+                successfulDocuments.add(session.clone(ff));
+            } else if (inputFormat == InputFormat.SINGLE_JSON) {
+                // One document per FlowFile — it failed entirely; clone 
directly
+                copiedErrors.add(session.clone(ff));
+            } else {
+                // NDJSON or JSON Array: re-read once and split records into 
error/success by index
+                final Set<Integer> failedIndices = 
pendingErrorRecordIndices.getOrDefault(ff, Collections.emptySet());
+                final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(ff).getValue();
+                final ByteArrayOutputStream errorBaos = new 
ByteArrayOutputStream();
+                final ByteArrayOutputStream successBaos = new 
ByteArrayOutputStream();
+                session.read(ff, in -> splitRecords(in, failedIndices, 
charset, inputFormat, errorBaos, successBaos));
+
+                if (errorBaos.size() > 0) {
+                    final byte[] errorBytes = errorBaos.toByteArray();
+                    FlowFile errorFf = session.clone(ff);
+                    errorFf = session.write(errorFf, out -> 
out.write(errorBytes));
+                    copiedErrors.add(errorFf);
+                }
+                if (successBaos.size() > 0) {
+                    final byte[] successBytes = successBaos.toByteArray();
+                    FlowFile successFf = session.clone(ff);

Review Comment:
   fixed by removing elasticsearch.bulk.error from the success clone after 
splitting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to