agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3042481728


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,183 +160,841 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .required(true)
         .build();
 
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description("""
+                    The maximum amount of data to send in a single 
Elasticsearch _bulk API request. \
+                    For NDJSON and JSON Array modes, FlowFiles are accumulated 
until this threshold is reached, then flushed. \
+                    For Single JSON mode, this acts as a size-based safety 
limit: if the accumulated FlowFiles exceed this size \
+                    before Max FlowFiles Per Batch is reached, the request is 
flushed early. \
+                    Elasticsearch recommends 5-15 MB per bulk request for 
optimal performance. \
+                    To disable the size-based limit, check "Set Empty String".\
+                    """)
+            .defaultValue("10 MB")
+            .addValidator((subject, input, context) -> 
StringUtils.isBlank(input)
+                    ? new 
ValidationResult.Builder().subject(subject).valid(true).build()
+                    : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject, 
input, context))
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor INPUT_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Input Content Format")
+            .description("""
+                    The format of the JSON content in each FlowFile. \
+                    NDJSON: one JSON object per line (newline-delimited). \
+                    JSON Array: a top-level JSON array of objects, streamed 
element-by-element for memory efficiency. \
+                    Single JSON: the entire FlowFile is a single JSON 
document.\
+                    """)
+            .allowableValues(InputFormat.class)
+            .defaultValue(InputFormat.SINGLE_JSON.getValue())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+            .name("Max FlowFiles Per Batch")
+            .displayName("Max FlowFiles Per Batch")
+            .description("""
+                    The maximum number of FlowFiles to include in a single 
Elasticsearch _bulk API request. \
+                    If the accumulated FlowFiles exceed Max Batch Size before 
this count is reached, the request will be flushed early.\
+                    """)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor SUPPRESS_NULLS = new 
PropertyDescriptor.Builder()
+            .name("Suppress Nulls")
+            .description("""
+                    When set to Always Suppress, null and empty values are 
removed from documents before they are sent to Elasticsearch.
+                    This setting applies to NDJSON and JSON Array formats for 
Index and Create operations only. \
+                    For Single JSON, configure null suppression on the 
controller service instead.
+                    Performance note: for JSON Array the impact is negligible 
since documents are already being parsed. \
+                    For NDJSON, each line must be parsed and re-serialized 
when suppression is enabled, \
+                    which adds overhead compared to the default behaviour of 
passing lines through as raw bytes.\
+                    """)
+            .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS, 
ElasticSearchClientService.ALWAYS_SUPPRESS)
+            .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+            .required(true)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Identifier Attribute")
+            .description("""
+                    The name of the FlowFile attribute containing the 
identifier for the document. \
+                    If the Index Operation is "index", this property may be 
left empty or evaluate to an empty value, \
+                    in which case the document's identifier will be 
auto-generated by Elasticsearch. \
+                    For all other Index Operations, the attribute must 
evaluate to a non-empty value.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field")
+            .description("""
+                    The name of the field within each document to use as the 
Elasticsearch document ID. \
+                    If the field is not present in a document or this property 
is left blank, no document ID is set \
+                    and Elasticsearch will auto-generate one.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+            .name("bulk_request")
+            .description("When \"Output Bulk Request\" is enabled, the raw 
Elasticsearch _bulk API request body is written " +
+                    "to this relationship as a FlowFile for inspection or 
debugging.")
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BULK_REQUEST = new 
PropertyDescriptor.Builder()
+            .name("Output Bulk Request")
+            .description("If enabled, each Elasticsearch _bulk request body is 
written as a FlowFile to the \"" +
+                    REL_BULK_REQUEST.getName() + "\" relationship. Useful for 
debugging. " +
+                    "Each FlowFile contains the full NDJSON body exactly as 
sent to Elasticsearch.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = List.of(
-            ID_ATTRIBUTE,
             INDEX_OP,
             INDEX,
             TYPE,
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
+            SUPPRESS_NULLS,
+            ID_ATTRIBUTE,
+            IDENTIFIER_FIELD,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+    private ObjectWriter suppressingWriter;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
         config.renameProperty("put-es-json-scripted-upsert", 
SCRIPTED_UPSERT.getName());
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON.
+        // Set this before migrating BATCH_SIZE so its dependsOn condition is 
satisfied when NiFi processes the property.
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor.
+        // INPUT_FORMAT is set first above so its dependsOn condition is 
satisfied when NiFi processes BATCH_SIZE.
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // MAX_BATCH_SIZE is a new property — existing configurations had no 
byte-based limit.
+        // Preserve that behavior on upgrade by leaving the property blank 
(unbounded).
+        // The custom validator on MAX_BATCH_SIZE accepts blank values, so 
this will not be
+        // overwritten by the default of 10 MB. New processor instances get 
the 10 MB default.
+        if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+            config.setProperty(MAX_BATCH_SIZE.getName(), "");
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        if (inputFormat != InputFormat.SINGLE_JSON
+                && 
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
 {
+            final ObjectMapper suppressingMapper = mapper.copy()
+                    .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+                    
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
+            this.suppressingWriter = suppressingMapper.writer();
+        } else {
+            this.suppressingWriter = null;
+        }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
-        final List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
+        final String maxBatchSizeStr = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().getValue();
+        final long maxBatchBytes = StringUtils.isNotBlank(maxBatchSizeStr)
+                ? 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()
+                : Long.MAX_VALUE;
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        final String idAttribute = inputFormat == InputFormat.SINGLE_JSON
+                ? context.getProperty(ID_ATTRIBUTE).getValue()
+                : null;
+        final String documentIdField = inputFormat != InputFormat.SINGLE_JSON
+                ? context.getProperty(IDENTIFIER_FIELD).getValue()
+                : null;
+        final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+                ? 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+                : Integer.MAX_VALUE;
+        int flowFilesProcessed = 0;
+
+        // Tracks all FlowFiles that were successfully parsed and submitted 
(even partially)
+        final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+        // Tracks FlowFiles that had at least one Elasticsearch document error
+        final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+        // Deferred bulk-error attributes: applied after each FlowFile's 
InputStream is closed
+        final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new 
HashMap<>();
+        // Failed local record indices per FlowFile — O(error count) memory; 
used at finalization
+        // to re-read FlowFiles once and reconstruct error/success content 
without buffering all bytes
+        final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new 
LinkedHashMap<>();
+
+        // Current chunk accumulation — operationFlowFiles and 
operationRecordIndices are parallel to operations (same index)
+        final List<FlowFile> operationFlowFiles = new ArrayList<>();
+        final List<IndexOperationRequest> operations = new ArrayList<>();
+        final List<Integer> operationRecordIndices = new ArrayList<>();
+        long totalBytesAccumulated = 0;
+        long chunkBytes = 0;
+
+        while (flowFile != null) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+            final String flowFileIdAttribute = 
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
+
+            final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, flowFile);
+            final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+            final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+            final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, flowFile);
+            final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
+
+            boolean parseError = false;
+            try {
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                if (InputFormat.NDJSON == inputFormat) {
+                    // NDJSON: each non-blank line is one JSON document.
+                    // Index/Create operations pass raw UTF-8 bytes directly 
to avoid Map allocation.
+                    // Update/Delete/Upsert parse into a Map so the bulk 
serializer can wrap the payload.
+                    int localRecordIndex = 0;
+                    try (final BufferedReader reader = new BufferedReader(
+                            new InputStreamReader(session.read(flowFile), 
charset), READER_BUFFER_SIZE)) {
+                        String line;
+                        while ((line = reader.readLine()) != null) {
+                            final String trimmedLine = line.trim();
+                            if (trimmedLine.isEmpty()) {
+                                continue;
+                            }
+                            final IndexOperationRequest opRequest;
+                            final long docBytes;
+                            if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
+                                final String id = extractId(trimmedLine, 
documentIdField, flowFileIdAttribute);
+                                final byte[] rawJsonBytes;
+                                if (suppressingWriter != null) {
+                                    // Parse to Map so NON_NULL/NON_EMPTY 
inclusion filters apply during serialization.
+                                    // JsonNode tree serialization bypasses 
JsonInclude filters.
+                                    rawJsonBytes = 
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+                                } else {
+                                    rawJsonBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8);
+                                }
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = rawJsonBytes.length;
+                            } else {
+                                final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                final String id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8).length;

Review Comment:
   agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to