pvillard31 commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3006003684


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,6 +165,55 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .required(true)
         .build();
 
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description("The maximum amount of data to send in a single 
Elasticsearch _bulk API request. " +
+                    "For NDJSON and JSON Array modes, FlowFiles are 
accumulated until this threshold is reached, then flushed. " +
+                    "For Single JSON mode, this acts as a size-based safety 
limit: if the accumulated FlowFiles exceed this size " +
+                    "before Max FlowFiles Per Batch is reached, the request is 
flushed early. " +
+                    "Elasticsearch recommends 5-15 MB per bulk request for 
optimal performance.")
+            .defaultValue("10 MB")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .required(true)
+            .build();

Review Comment:
   This is potentially a behavioral change, no? For example, a user with Batch 
Size = 100 and 100 FlowFiles of 200KB each (20MB) would previously send one 
bulk request but will now get 2+. Not against the change as it seems to be a 
reasonable default but just mentioning it.



##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -153,175 +222,705 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
         config.renameProperty("put-es-json-scripted-upsert", 
SCRIPTED_UPSERT.getName());
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
-        final List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
-
+        final long maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
         final String idAttribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+                ? 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+                : Integer.MAX_VALUE;
+        int flowFilesProcessed = 0;
+
+        // Tracks all FlowFiles that were successfully parsed and submitted 
(even partially)
+        final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+        // Tracks FlowFiles that had at least one Elasticsearch document error
+        final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+        // Deferred bulk-error attributes: applied after each FlowFile's 
InputStream is closed
+        final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new 
HashMap<>();
+        // Failed local record indices per FlowFile — O(error count) memory; 
used at finalization
+        // to re-read FlowFiles once and reconstruct error/success content 
without buffering all bytes
+        final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new 
LinkedHashMap<>();
+
+        // Current chunk accumulation — operationFlowFiles and 
operationRecordIndices are parallel to operations (same index)
+        final List<FlowFile> operationFlowFiles = new ArrayList<>();
+        final List<IndexOperationRequest> operations = new ArrayList<>();
+        final List<Integer> operationRecordIndices = new ArrayList<>();
+        long totalBytesAccumulated = 0;
+        long chunkBytes = 0;
 
-        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
-        final List<IndexOperationRequest> operations = new 
ArrayList<>(flowFiles.size());
+        while (flowFile != null) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+            final String flowFileIdAttribute = 
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
 
-        for (final FlowFile input : flowFiles) {
-            addOperation(operations, originals, idAttribute, context, session, 
input);
+            final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, flowFile);
+            final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+            final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+            final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, flowFile);
+            final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
+
+            boolean parseError = false;
+            try {
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                if (InputFormat.NDJSON == inputFormat) {
+                    // NDJSON: each non-blank line is one JSON document.
+                    // Index/Create operations pass raw UTF-8 bytes directly 
to avoid Map allocation.
+                    // Update/Delete/Upsert parse into a Map so the bulk 
serializer can wrap the payload.
+                    int localRecordIndex = 0;
+                    try (final BufferedReader reader = new BufferedReader(
+                            new InputStreamReader(session.read(flowFile), 
charset), READER_BUFFER_SIZE)) {
+                        String line;
+                        while ((line = reader.readLine()) != null) {
+                            final String trimmedLine = line.trim();
+                            if (trimmedLine.isEmpty()) {
+                                continue;
+                            }
+                            final IndexOperationRequest opRequest;
+                            final long docBytes;
+                            if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
+                                final String id = extractId(trimmedLine, 
idAttribute, flowFileIdAttribute);
+                                final byte[] rawJsonBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = rawJsonBytes.length;
+                            } else {
+                                final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                final String id = resolveId(contentMap, 
idAttribute, flowFileIdAttribute);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+                            }
+                            operations.add(opRequest);
+                            operationFlowFiles.add(flowFile);
+                            operationRecordIndices.add(localRecordIndex++);
+                            chunkBytes += docBytes;
+                            totalBytesAccumulated += docBytes;
+                            if (chunkBytes >= maxBatchBytes) {
+                                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                operations.clear();
+                                operationFlowFiles.clear();
+                                operationRecordIndices.clear();
+                                chunkBytes = 0;
+                            }
+                        }
+                    }
+                } else if (InputFormat.JSON_ARRAY == inputFormat) {
+                    // JSON Array: the FlowFile may contain one or more 
top-level JSON arrays.
+                    // Arrays are read sequentially; elements within each 
array are streamed one at a
+                    // time via JsonParser to avoid loading the entire content 
into memory.
+                    // Each element is re-serialized to compact bytes for 
Index/Create,
+                    // or parsed into a Map for Update/Delete/Upsert.
+                    int localRecordIndex = 0;
+                    try (final InputStreamReader isr = new 
InputStreamReader(session.read(flowFile), charset);
+                         final JsonParser parser = 
mapper.getFactory().createParser(isr)) {
+                        JsonToken outerToken;
+                        while ((outerToken = parser.nextToken()) != null) {
+                            if (outerToken != JsonToken.START_ARRAY) {
+                                throw new IOException("Expected a JSON array 
but found: " + outerToken);
+                            }
+                            JsonToken token;
+                            while ((token = parser.nextToken()) != 
JsonToken.END_ARRAY) {
+                                if (token == null) {
+                                    throw new IOException("Malformed JSON 
Array: reached end of stream before the closing ']'. " +
+                                            "Verify the FlowFile content is a 
complete, valid JSON array.");
+                                }
+                                final long startOffset = 
parser.currentTokenLocation().getCharOffset();
+                                final IndexOperationRequest opRequest;
+                                if (o == IndexOperationRequest.Operation.Index 
|| o == IndexOperationRequest.Operation.Create) {
+                                    final JsonNode node = 
mapper.readTree(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final byte[] rawJsonBytes = 
mapper.writeValueAsBytes(node);
+                                    final String id = extractId(node, 
idAttribute, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                } else {
+                                    final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final String id = resolveId(contentMap, 
idAttribute, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                }
+                                operations.add(opRequest);
+                                operationFlowFiles.add(flowFile);
+                                operationRecordIndices.add(localRecordIndex++);
+                                if (chunkBytes >= maxBatchBytes) {
+                                    flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                    operations.clear();
+                                    operationFlowFiles.clear();
+                                    operationRecordIndices.clear();
+                                    chunkBytes = 0;
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    // Single JSON: the entire FlowFile is one document.
+                    // For Index/Create, the input is parsed and re-serialized 
to compact JSON to strip
+                    // any embedded newlines from pretty-printed input. The 
Elasticsearch bulk NDJSON
+                    // protocol uses newlines as record delimiters, so 
embedded newlines would corrupt
+                    // the request. Update/Delete/Upsert parse directly into a 
Map.
+                    try (final InputStream in = session.read(flowFile)) {
+                        final IndexOperationRequest opRequest;
+                        if (o == IndexOperationRequest.Operation.Index || o == 
IndexOperationRequest.Operation.Create) {
+                            // Re-serialize via Jackson to produce compact 
single-line JSON.
+                            // Pretty-printed input would contain embedded 
newlines that break the
+                            // Elasticsearch bulk NDJSON protocol (each 
document must be one line).
+                            final JsonNode node = mapper.readTree(new 
InputStreamReader(in, charset));
+                            final byte[] rawJsonBytes = 
mapper.writeValueAsBytes(node);

Review Comment:
   Using rawJsonBytes here for Single JSON Index/Create bypasses the null 
suppression configuration on the ElasticSearchClientService. With this change, 
the bytes are written directly in ElasticSearchClientServiceImpl.buildRequest, 
skipping the client service's mapper entirely. This is a silent behavioral 
change for existing users who have null suppression enabled.
   
   For Single JSON mode, I'd recommend preserving the old Map-based path (parse 
into Map, store in fields) so the client service's null suppression continues 
to work. The rawJsonBytes optimization makes sense for NDJSON and JSON Array 
where avoiding the Map allocation matters for throughput.
   
   To be more specific:
   - Old path (current main):
   Processor parses JSON into Map<String, Object> (null JSON values become Java 
null in the Map)
   Map stored in IndexOperationRequest.fields
   Client service serializes the Map via 
mapper.writeValueAsString(request.getFields()) — client service's mapper 
applies null suppression here
   
   - New path (PR, Single JSON, Index/Create):
   Processor parses JSON into JsonNode, serializes back to byte[] via 
mapper.writeValueAsBytes(node) — this is the processor's mapper, which does not 
have null suppression
   Bytes stored in IndexOperationRequest.rawJsonBytes
   Client service writes bytes directly via 
out.write(request.getRawJsonBytes()) — client service's mapper is never used
   
   So a regular user with this configuration:
   - ElasticSearchClientServiceImpl with Suppress Nulls = Always Suppress
   - PutElasticsearchJson with Index operation
   - Input JSON: {"name": "John", "age": null}
   
   Before: Elasticsearch receives {"name":"John"} (null stripped by client 
service's mapper) After: Elasticsearch receives {"name":"John","age":null} 
(null preserved because client service's mapper is bypassed)



##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -153,175 +222,705 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
         config.renameProperty("put-es-json-scripted-upsert", 
SCRIPTED_UPSERT.getName());
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
-        final List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
-
+        final long maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
         final String idAttribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+                ? 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+                : Integer.MAX_VALUE;
+        int flowFilesProcessed = 0;
+
+        // Tracks all FlowFiles that were successfully parsed and submitted 
(even partially)
+        final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+        // Tracks FlowFiles that had at least one Elasticsearch document error
+        final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+        // Deferred bulk-error attributes: applied after each FlowFile's 
InputStream is closed
+        final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new 
HashMap<>();
+        // Failed local record indices per FlowFile — O(error count) memory; 
used at finalization
+        // to re-read FlowFiles once and reconstruct error/success content 
without buffering all bytes
+        final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new 
LinkedHashMap<>();
+
+        // Current chunk accumulation — operationFlowFiles and 
operationRecordIndices are parallel to operations (same index)
+        final List<FlowFile> operationFlowFiles = new ArrayList<>();
+        final List<IndexOperationRequest> operations = new ArrayList<>();
+        final List<Integer> operationRecordIndices = new ArrayList<>();
+        long totalBytesAccumulated = 0;
+        long chunkBytes = 0;
 
-        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
-        final List<IndexOperationRequest> operations = new 
ArrayList<>(flowFiles.size());
+        while (flowFile != null) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+            final String flowFileIdAttribute = 
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
 
-        for (final FlowFile input : flowFiles) {
-            addOperation(operations, originals, idAttribute, context, session, 
input);
+            final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, flowFile);
+            final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+            final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+            final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, flowFile);
+            final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
+
+            boolean parseError = false;
+            try {
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                if (InputFormat.NDJSON == inputFormat) {
+                    // NDJSON: each non-blank line is one JSON document.
+                    // Index/Create operations pass raw UTF-8 bytes directly 
to avoid Map allocation.
+                    // Update/Delete/Upsert parse into a Map so the bulk 
serializer can wrap the payload.
+                    int localRecordIndex = 0;
+                    try (final BufferedReader reader = new BufferedReader(
+                            new InputStreamReader(session.read(flowFile), 
charset), READER_BUFFER_SIZE)) {
+                        String line;
+                        while ((line = reader.readLine()) != null) {
+                            final String trimmedLine = line.trim();
+                            if (trimmedLine.isEmpty()) {
+                                continue;
+                            }
+                            final IndexOperationRequest opRequest;
+                            final long docBytes;
+                            if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
+                                final String id = extractId(trimmedLine, 
idAttribute, flowFileIdAttribute);
+                                final byte[] rawJsonBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = rawJsonBytes.length;
+                            } else {
+                                final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                final String id = resolveId(contentMap, 
idAttribute, flowFileIdAttribute);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+                            }
+                            operations.add(opRequest);
+                            operationFlowFiles.add(flowFile);
+                            operationRecordIndices.add(localRecordIndex++);
+                            chunkBytes += docBytes;
+                            totalBytesAccumulated += docBytes;
+                            if (chunkBytes >= maxBatchBytes) {
+                                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                operations.clear();
+                                operationFlowFiles.clear();
+                                operationRecordIndices.clear();
+                                chunkBytes = 0;
+                            }
+                        }
+                    }
+                } else if (InputFormat.JSON_ARRAY == inputFormat) {
+                    // JSON Array: the FlowFile may contain one or more 
top-level JSON arrays.
+                    // Arrays are read sequentially; elements within each 
array are streamed one at a
+                    // time via JsonParser to avoid loading the entire content 
into memory.
+                    // Each element is re-serialized to compact bytes for 
Index/Create,
+                    // or parsed into a Map for Update/Delete/Upsert.
+                    int localRecordIndex = 0;
+                    try (final InputStreamReader isr = new 
InputStreamReader(session.read(flowFile), charset);
+                         final JsonParser parser = 
mapper.getFactory().createParser(isr)) {
+                        JsonToken outerToken;
+                        while ((outerToken = parser.nextToken()) != null) {
+                            if (outerToken != JsonToken.START_ARRAY) {
+                                throw new IOException("Expected a JSON array 
but found: " + outerToken);
+                            }
+                            JsonToken token;
+                            while ((token = parser.nextToken()) != 
JsonToken.END_ARRAY) {
+                                if (token == null) {
+                                    throw new IOException("Malformed JSON 
Array: reached end of stream before the closing ']'. " +
+                                            "Verify the FlowFile content is a 
complete, valid JSON array.");
+                                }
+                                final long startOffset = 
parser.currentTokenLocation().getCharOffset();
+                                final IndexOperationRequest opRequest;
+                                if (o == IndexOperationRequest.Operation.Index 
|| o == IndexOperationRequest.Operation.Create) {
+                                    final JsonNode node = 
mapper.readTree(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final byte[] rawJsonBytes = 
mapper.writeValueAsBytes(node);
+                                    final String id = extractId(node, 
idAttribute, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                } else {
+                                    final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final String id = resolveId(contentMap, 
idAttribute, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                }
+                                operations.add(opRequest);
+                                operationFlowFiles.add(flowFile);
+                                operationRecordIndices.add(localRecordIndex++);
+                                if (chunkBytes >= maxBatchBytes) {
+                                    flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                    operations.clear();
+                                    operationFlowFiles.clear();
+                                    operationRecordIndices.clear();
+                                    chunkBytes = 0;
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    // Single JSON: the entire FlowFile is one document.
+                    // For Index/Create, the input is parsed and re-serialized 
to compact JSON to strip
+                    // any embedded newlines from pretty-printed input. The 
Elasticsearch bulk NDJSON
+                    // protocol uses newlines as record delimiters, so 
embedded newlines would corrupt
+                    // the request. Update/Delete/Upsert parse directly into a 
Map.
+                    try (final InputStream in = session.read(flowFile)) {
+                        final IndexOperationRequest opRequest;
+                        if (o == IndexOperationRequest.Operation.Index || o == 
IndexOperationRequest.Operation.Create) {
+                            // Re-serialize via Jackson to produce compact 
single-line JSON.
+                            // Pretty-printed input would contain embedded 
newlines that break the
+                            // Elasticsearch bulk NDJSON protocol (each 
document must be one line).
+                            final JsonNode node = mapper.readTree(new 
InputStreamReader(in, charset));
+                            final byte[] rawJsonBytes = 
mapper.writeValueAsBytes(node);
+                            final String id = extractId(node, idAttribute, 
flowFileIdAttribute);
+                            opRequest = IndexOperationRequest.builder()
+                                    .index(index)
+                                    .type(type)
+                                    .id(id)
+                                    .rawJsonBytes(rawJsonBytes)
+                                    .operation(o)
+                                    .script(scriptMap)
+                                    .scriptedUpsert(scriptedUpsert)
+                                    .dynamicTemplates(dynamicTemplatesMap)
+                                    .headerFields(bulkHeaderFields)
+                                    .build();
+                        } else {
+                            final Map<String, Object> contentMap = 
mapReader.readValue(in);
+                            final String id = 
StringUtils.isNotBlank(flowFileIdAttribute) ? flowFileIdAttribute : null;
+                            opRequest = IndexOperationRequest.builder()
+                                    .index(index)
+                                    .type(type)
+                                    .id(id)
+                                    .fields(contentMap)
+                                    .operation(o)
+                                    .script(scriptMap)
+                                    .scriptedUpsert(scriptedUpsert)
+                                    .dynamicTemplates(dynamicTemplatesMap)
+                                    .headerFields(bulkHeaderFields)
+                                    .build();
+                        }
+                        operations.add(opRequest);
+                        operationFlowFiles.add(flowFile);
+                        operationRecordIndices.add(0);
+                        final long docBytes = flowFile.getSize();
+                        chunkBytes += docBytes;
+                        totalBytesAccumulated += docBytes;
+                        if (chunkBytes >= maxBatchBytes) {
+                            flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                            operations.clear();
+                            operationFlowFiles.clear();
+                            operationRecordIndices.clear();
+                            chunkBytes = 0;
+                        }
+                    }
+                }
+            } catch (final ElasticsearchException ese) {
+                final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure.");
+                getLogger().error(msg, ese);
+                final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
+                // Route only the failing in-flight chunk to retry/failure. 
FlowFiles already
+                // successfully indexed by prior _bulk requests are routed 
normally to avoid
+                // duplicate indexing if those FlowFiles are re-processed on 
retry.
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ese, rel, session, true, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
+                return;
+            } catch (final IOException ioe) {
+                getLogger().error("Could not read FlowFile content as valid 
{}.", inputFormat, ioe);
+                removeFlowFileFromChunk(flowFile, operations, 
operationFlowFiles, operationRecordIndices);
+                flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ioe.getMessage());
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                parseError = true;
+            } catch (final Exception ex) {
+                getLogger().error("Failed processing records.", ex);
+                removeFlowFileFromChunk(flowFile, operations, 
operationFlowFiles, operationRecordIndices);
+                flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ex.getMessage());
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                parseError = true;
+            }
+
+            if (!parseError) {
+                allProcessedFlowFiles.add(flowFile);
+                // InputStream is now closed — safe to apply any deferred 
bulk-error attributes
+                applyPendingBulkErrors(flowFile, pendingBulkErrors, session);
+            }
+
+            flowFilesProcessed++;
+
+            // For Single JSON, stop when the count-based batch limit is 
reached; otherwise use size limit
+            if (InputFormat.SINGLE_JSON == inputFormat ? flowFilesProcessed >= 
batchSize : totalBytesAccumulated >= maxBatchBytes) {
+                break;
+            }
+
+            flowFile = session.get();
         }
 
-        if (!originals.isEmpty()) {
+        // Flush any remaining operations (all InputStreams are closed at this 
point)
+        if (!operations.isEmpty()) {
             try {
-                final List<FlowFile> errorDocuments = 
indexDocuments(operations, originals, context, session);
-                handleResponse(context, session, errorDocuments, originals);
-                session.transfer(originals, REL_ORIGINAL);
+                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, null, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
             } catch (final ElasticsearchException ese) {
                 final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
-                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure.");
                 getLogger().error(msg, ese);
                 final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
-                transferFlowFilesOnException(ese, rel, session, true, 
originals.toArray(new FlowFile[0]));
-            } catch (final JsonProcessingException jpe) {
-                getLogger().warn("Could not log Elasticsearch operation errors 
nor determine which documents errored.", jpe);
-                transferFlowFilesOnException(jpe, REL_ERRORS, session, true, 
originals.toArray(new FlowFile[0]));
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ese, rel, session, true, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
+                return;
             } catch (final Exception ex) {
                 getLogger().error("Could not index documents.", ex);
-                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
originals.toArray(new FlowFile[0]));
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
                 context.yield();
+                return;
             }
+        }
+
+        if (allProcessedFlowFiles.isEmpty()) {
+            getLogger().warn("No records successfully parsed for sending to 
Elasticsearch");
         } else {
-            getLogger().warn("No FlowFiles successfully parsed for sending to 
Elasticsearch");
+            handleFinalResponse(context, session, errorFlowFiles, 
allProcessedFlowFiles, pendingErrorRecordIndices, inputFormat);
         }
     }
 
-    private void addOperation(final List<IndexOperationRequest> operations, 
final List<FlowFile> originals, final String idAttribute,
-                              final ProcessContext context, final 
ProcessSession session, FlowFile input) {
-        final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
-        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
-        final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
-        final String id = StringUtils.isNotBlank(idAttribute) && 
StringUtils.isNotBlank(input.getAttribute(idAttribute)) ? 
input.getAttribute(idAttribute) : null;
-
-        final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, input);
-        final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean();
-        final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, input);
-
-        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, input);
-        final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
-
-        final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
-
-        try (final InputStream inStream = session.read(input)) {
-            final byte[] result = IOUtils.toByteArray(inStream);
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> contentMap = mapper.readValue(new 
String(result, charset), Map.class);
-
-            final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
-            operations.add(new IndexOperationRequest(index, type, id, 
contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap, 
bulkHeaderFields));
-
-            originals.add(input);
-        } catch (final IOException ioe) {
-            getLogger().error("Could not read FlowFile content valid JSON.", 
ioe);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ioe.getMessage());
-            session.penalize(input);
-            session.transfer(input, REL_FAILURE);
-        } catch (final Exception ex) {
-            getLogger().error("Could not index documents.", ex);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ex.getMessage());
-            session.penalize(input);
-            session.transfer(input, REL_FAILURE);
+    /**
+     * Removes all entries belonging to {@code target} from the three parallel 
lists, so that a
+     * FlowFile that failed mid-read does not leave stale references that 
would cause
+     * {@link org.apache.nifi.processor.exception.FlowFileHandlingException} 
on the next flush.
+     */
+    private void removeFlowFileFromChunk(final FlowFile target,
+                                          final List<IndexOperationRequest> 
operations,
+                                          final List<FlowFile> 
operationFlowFiles,
+                                          final List<Integer> 
operationRecordIndices) {
+        for (int i = operations.size() - 1; i >= 0; i--) {
+            if (operationFlowFiles.get(i) == target) {
+                operations.remove(i);
+                operationFlowFiles.remove(i);
+                operationRecordIndices.remove(i);
+            }
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getMapFromAttribute(final PropertyDescriptor 
propertyDescriptor, final ProcessContext context, final FlowFile input) {
-        final String propertyValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+    /**
+     * Applies any bulk-error attributes that were deferred for {@code 
flowFile} while its
+     * InputStream was open.  Must only be called after the InputStream has 
been closed.
+     */
+    private void applyPendingBulkErrors(final FlowFile flowFile,
+                                         final Map<FlowFile, List<Map<String, 
Object>>> pendingBulkErrors,
+                                         final ProcessSession session) {
+        final List<Map<String, Object>> errorList = 
pendingBulkErrors.remove(flowFile);
+        if (errorList == null) {
+            return;
+        }
         try {
-            return StringUtils.isNotBlank(propertyValue) ? 
mapper.readValue(propertyValue, Map.class) : Collections.emptyMap();
-        } catch (final JsonProcessingException jpe) {
-            throw new ProcessException(propertyDescriptor.getDisplayName() + " 
must be a String parsable into a JSON Object", jpe);
+            final Object errorObj = errorList.size() == 1 ? errorList.get(0) : 
errorList;
+            session.putAttribute(flowFile, "elasticsearch.bulk.error", 
mapper.writeValueAsString(errorObj));
+        } catch (final JsonProcessingException e) {
+            session.putAttribute(flowFile, "elasticsearch.bulk.error",
+                    String.format("{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                            e.getMessage().replace("\"", "\\\"")));
         }
     }
 
-    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> 
operations, final List<FlowFile> originals, final ProcessContext context, final 
ProcessSession session) throws IOException {
-        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, originals.getFirst());
+    /**
+     * Sends the accumulated batch of operations to Elasticsearch via the 
_bulk API and records
+     * any per-document errors. If {@code openStreamFlowFile} is non-null, 
error attributes for
+     * that FlowFile are deferred into {@code pendingBulkErrors} so they are 
applied after its
+     * InputStream is closed (calling {@code putAttribute} on an open FlowFile 
throws
+     * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}).
+     * <p>
+     * Only the local record index of each failed operation is stored (O(error 
count) memory).
+     * Error/success content is reconstructed by re-reading the FlowFile in
+     * {@link #handleFinalResponse}.
+     */
+    private void flushChunk(final List<IndexOperationRequest> operations, 
final List<FlowFile> operationFlowFiles,
+                             final List<Integer> operationRecordIndices,
+                             final Set<FlowFile> errorFlowFiles, final 
FlowFile openStreamFlowFile,
+                             final Map<FlowFile, List<Map<String, Object>>> 
pendingBulkErrors,
+                             final Map<FlowFile, Set<Integer>> 
pendingErrorRecordIndices,
+                             final ProcessContext context, final 
ProcessSession session) throws IOException {
+        if (outputBulkRequest) {
+            outputBulkRequestFlowFile(operations, operationFlowFiles, session);
+        }
+
+        final FlowFile firstFlowFile = operationFlowFiles.get(0);
+        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, firstFlowFile);
         final IndexOperationResponse response = 
clientService.get().bulk(operations,
-                new 
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), 
getRequestHeadersFromDynamicProperties(context, originals.getFirst())));
+                new 
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), 
getRequestHeadersFromDynamicProperties(context, firstFlowFile)));
 
         final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
-        final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
+
+        // Attach per-FlowFile error attributes; defer putAttribute for the 
FlowFile whose
+        // InputStream is currently open (putAttribute would throw 
FlowFileHandlingException).
+        // Record only the local record index of each failure — content is 
reconstructed at finalization.
+        final Map<FlowFile, List<Map<String, Object>>> flowFileErrors = new 
HashMap<>();
         errors.forEach((index, error) -> {
-            String errorMessage;
-            try {
-                errorMessage = mapper.writeValueAsString(error);
-            } catch (JsonProcessingException e) {
-                errorMessage = String.format(
-                        "{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
-                        e.getMessage().replace("\"", "\\\"")
-                );
+            final FlowFile flowFile = operationFlowFiles.get(index);
+            errorFlowFiles.add(flowFile);
+            flowFileErrors.computeIfAbsent(flowFile, k -> new 
ArrayList<>()).add(error);
+            pendingErrorRecordIndices.computeIfAbsent(flowFile, k -> new 
HashSet<>()).add(operationRecordIndices.get(index));
+        });
+        flowFileErrors.forEach((flowFile, errorList) -> {
+            if (flowFile == openStreamFlowFile) {
+                // Defer: InputStream still open — apply after the 
try-with-resources closes
+                pendingBulkErrors.computeIfAbsent(flowFile, k -> new 
ArrayList<>()).addAll(errorList);
+            } else {
+                try {
+                    final Object errorObj = errorList.size() == 1 ? 
errorList.get(0) : errorList;
+                    session.putAttribute(flowFile, "elasticsearch.bulk.error", 
mapper.writeValueAsString(errorObj));
+                } catch (final JsonProcessingException e) {
+                    session.putAttribute(flowFile, "elasticsearch.bulk.error",
+                            String.format("{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                                    e.getMessage().replace("\"", "\\\"")));
+                }
             }
-            errorDocuments.add(session.putAttribute(originals.get(index), 
"elasticsearch.bulk.error", errorMessage));
         });
 
         if (!errors.isEmpty()) {
-            handleElasticsearchDocumentErrors(errors, session, null);
+            handleElasticsearchDocumentErrors(errors, session, firstFlowFile);
         }
+    }
+
+    /**
+     * Serializes the pending batch into an Elasticsearch _bulk NDJSON body 
and writes it to a
+     * new FlowFile on the {@code bulk_request} relationship for inspection or 
debugging.
+     * Each operation produces an action metadata line followed by a document 
line
+     * (Delete operations have no document line). Failures here are logged and 
swallowed so
+     * that a debug-output failure does not abort the actual indexing.
+     */
+    private void outputBulkRequestFlowFile(final List<IndexOperationRequest> 
operations,
+                                            final List<FlowFile> 
operationFlowFiles,
+                                            final ProcessSession session) {
+        final FlowFile parent = operationFlowFiles.get(0);
+        FlowFile bulkRequestFF = session.create(parent);
+        try (final OutputStream out = session.write(bulkRequestFF)) {
+            for (final IndexOperationRequest op : operations) {
+                // Action metadata line
+                final Map<String, Object> actionBody = new LinkedHashMap<>();
+                if (StringUtils.isNotBlank(op.getIndex())) {
+                    actionBody.put("_index", op.getIndex());
+                }
+                if (StringUtils.isNotBlank(op.getType())) {
+                    actionBody.put("_type", op.getType());
+                }
+                if (StringUtils.isNotBlank(op.getId())) {
+                    actionBody.put("_id", op.getId());
+                }
+                if (op.getDynamicTemplates() != null && 
!op.getDynamicTemplates().isEmpty()) {
+                    actionBody.put("dynamic_templates", 
op.getDynamicTemplates());
+                }
+                if (op.getHeaderFields() != null) {
+                    actionBody.putAll(op.getHeaderFields());
+                }
+
+                // Upsert maps to "update" in the ES bulk API
+                final String actionName = op.getOperation() == 
IndexOperationRequest.Operation.Upsert
+                        ? "update" : op.getOperation().getValue();
+                final Map<String, Object> actionLine = Map.of(actionName, 
actionBody);
+                out.write(mapper.writeValueAsBytes(actionLine));
+                out.write('\n');
 
-        return errorDocuments;
+                // Document line (delete has no document)
+                if (op.getOperation() != 
IndexOperationRequest.Operation.Delete) {
+                    if (op.getRawJsonBytes() != null) {
+                        // Index/Create with rawJson path — write directly, no 
Map round-trip
+                        out.write(op.getRawJsonBytes());
+                    } else {
+                        final Map<String, Object> docLine = new 
LinkedHashMap<>();
+                        if (op.getOperation() == 
IndexOperationRequest.Operation.Update
+                                || op.getOperation() == 
IndexOperationRequest.Operation.Upsert) {
+                            if (op.getScript() != null && 
!op.getScript().isEmpty()) {
+                                docLine.put("script", op.getScript());
+                                if (op.isScriptedUpsert()) {
+                                    docLine.put("scripted_upsert", true);
+                                }
+                                if (op.getFields() != null && 
!op.getFields().isEmpty()) {
+                                    docLine.put("upsert", op.getFields());
+                                }
+                            } else {
+                                docLine.put("doc", op.getFields());
+                                if (op.getOperation() == 
IndexOperationRequest.Operation.Upsert) {
+                                    docLine.put("doc_as_upsert", true);
+                                }
+                            }
+                        } else {
+                            docLine.putAll(op.getFields());
+                        }
+                        out.write(mapper.writeValueAsBytes(docLine));
+                    }
+                    out.write('\n');
+                }
+            }
+        } catch (final IOException e) {
+            getLogger().warn("Failed to write bulk request FlowFile for 
inspection.", e);
+            session.remove(bulkRequestFF);
+            return;
+        }
+        bulkRequestFF = session.putAttribute(bulkRequestFF, 
"elasticsearch.bulk.operation.count", String.valueOf(operations.size()));
+        session.transfer(bulkRequestFF, REL_BULK_REQUEST);
+    }
+
+    /**
+     * Extracts the document ID from a raw JSON string using a streaming 
parser.
+     * Stops as soon as the target field is found, avoiding a full tree parse.
+     * Used for Index/Create operations to avoid the Map allocation overhead.
+     */
+    private String extractId(final String rawJson, final String idAttribute, 
final String flowFileIdAttribute) throws IOException {
+        if (StringUtils.isBlank(idAttribute)) {
+            return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+        }
+        try (final JsonParser p = mapper.getFactory().createParser(rawJson)) {
+            while (p.nextToken() != null) {
+                if (idAttribute.equals(p.currentName()) && p.nextToken() != 
null && !p.currentToken().isStructStart()) {
+                    final String value = p.getText();
+                    return StringUtils.isNotBlank(value) ? value
+                            : (StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null);
+                }
+            }
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Extracts the document ID from a pre-parsed JsonNode.
+     * Used for JSON Array Index/Create operations where the node is already 
available.
+     */
+    private String extractId(final JsonNode node, final String idAttribute, 
final String flowFileIdAttribute) {
+        if (StringUtils.isBlank(idAttribute)) {
+            return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+        }
+        final JsonNode idNode = node.get(idAttribute);
+        if (idNode != null && !idNode.isNull()) {
+            return idNode.asText();
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
     }

Review Comment:
   This changes the semantics of Identifier Attribute for existing Single JSON 
users. Currently, the property is documented as "The name of the FlowFile 
attribute containing the identifier for the document" and the code only reads 
the FlowFile attribute:
   
   ```java
   final String id = StringUtils.isNotBlank(idAttribute) && 
StringUtils.isNotBlank(input.getAttribute(idAttribute))
       ? input.getAttribute(idAttribute) : null;
   ```
   
   With this change, for Index/Create operations, the code first searches 
inside the JSON document content for a field with that name 
(node.get(idAttribute)), then falls back to the FlowFile attribute. If a user 
has Identifier Attribute = "id" and their JSON contains an "id" field, the 
document's _id will now come from the JSON content instead of the FlowFile 
attribute — a silent behavioral change.
   
   For NDJSON/JSON Array this makes sense (a FlowFile attribute can't 
distinguish multiple documents), but for Single JSON the old behavior should be 
preserved. Consider using the old FlowFile-attribute-only logic in the Single 
JSON path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to