agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3042436976


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,183 +160,841 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .required(true)
         .build();
 
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description("""
+                    The maximum amount of data to send in a single 
Elasticsearch _bulk API request. \
+                    For NDJSON and JSON Array modes, FlowFiles are accumulated 
until this threshold is reached, then flushed. \
+                    For Single JSON mode, this acts as a size-based safety 
limit: if the accumulated FlowFiles exceed this size \
+                    before Max FlowFiles Per Batch is reached, the request is 
flushed early. \
+                    Elasticsearch recommends 5-15 MB per bulk request for 
optimal performance. \
+                    To disable the size-based limit, check "Set Empty String".\
+                    """)
+            .defaultValue("10 MB")
+            .addValidator((subject, input, context) -> 
StringUtils.isBlank(input)
+                    ? new 
ValidationResult.Builder().subject(subject).valid(true).build()
+                    : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject, 
input, context))
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor INPUT_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Input Content Format")
+            .description("""
+                    The format of the JSON content in each FlowFile. \
+                    NDJSON: one JSON object per line (newline-delimited). \
+                    JSON Array: a top-level JSON array of objects, streamed 
element-by-element for memory efficiency. \
+                    Single JSON: the entire FlowFile is a single JSON 
document.\
+                    """)
+            .allowableValues(InputFormat.class)
+            .defaultValue(InputFormat.SINGLE_JSON.getValue())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+            .name("Max FlowFiles Per Batch")
+            .displayName("Max FlowFiles Per Batch")
+            .description("""
+                    The maximum number of FlowFiles to include in a single 
Elasticsearch _bulk API request. \
+                    If the accumulated FlowFiles exceed Max Batch Size before 
this count is reached, the request will be flushed early.\
+                    """)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor SUPPRESS_NULLS = new 
PropertyDescriptor.Builder()
+            .name("Suppress Nulls")
+            .description("""
+                    When set to Always Suppress, null and empty values are 
removed from documents before they are sent to Elasticsearch.
+                    This setting applies to NDJSON and JSON Array formats for 
Index and Create operations only. \
+                    For Single JSON, configure null suppression on the 
controller service instead.
+                    Performance note: for JSON Array the impact is negligible 
since documents are already being parsed. \
+                    For NDJSON, each line must be parsed and re-serialized 
when suppression is enabled, \
+                    which adds overhead compared to the default behaviour of 
passing lines through as raw bytes.\
+                    """)
+            .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS, 
ElasticSearchClientService.ALWAYS_SUPPRESS)
+            .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+            .required(true)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Identifier Attribute")
+            .description("""
+                    The name of the FlowFile attribute containing the 
identifier for the document. \
+                    If the Index Operation is "index", this property may be 
left empty or evaluate to an empty value, \
+                    in which case the document's identifier will be 
auto-generated by Elasticsearch. \
+                    For all other Index Operations, the attribute must 
evaluate to a non-empty value.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field")
+            .description("""
+                    The name of the field within each document to use as the 
Elasticsearch document ID. \
+                    If the field is not present in a document or this property 
is left blank, no document ID is set \
+                    and Elasticsearch will auto-generate one.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+            .name("bulk_request")
+            .description("When \"Output Bulk Request\" is enabled, the raw 
Elasticsearch _bulk API request body is written " +
+                    "to this relationship as a FlowFile for inspection or 
debugging.")
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BULK_REQUEST = new 
PropertyDescriptor.Builder()
+            .name("Output Bulk Request")
+            .description("If enabled, each Elasticsearch _bulk request body is 
written as a FlowFile to the \"" +
+                    REL_BULK_REQUEST.getName() + "\" relationship. Useful for 
debugging. " +
+                    "Each FlowFile contains the full NDJSON body exactly as 
sent to Elasticsearch.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = List.of(
-            ID_ATTRIBUTE,
             INDEX_OP,
             INDEX,
             TYPE,
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
+            SUPPRESS_NULLS,
+            ID_ATTRIBUTE,
+            IDENTIFIER_FIELD,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+    private ObjectWriter suppressingWriter;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
         config.renameProperty("put-es-json-scripted-upsert", 
SCRIPTED_UPSERT.getName());
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON.
+        // Set this before migrating BATCH_SIZE so its dependsOn condition is 
satisfied when NiFi processes the property.
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor.
+        // INPUT_FORMAT is set first above so its dependsOn condition is 
satisfied when NiFi processes BATCH_SIZE.
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // MAX_BATCH_SIZE is a new property — existing configurations had no 
byte-based limit.
+        // Preserve that behavior on upgrade by leaving the property blank 
(unbounded).
+        // The custom validator on MAX_BATCH_SIZE accepts blank values, so 
this will not be
+        // overwritten by the default of 10 MB. New processor instances get 
the 10 MB default.
+        if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+            config.setProperty(MAX_BATCH_SIZE.getName(), "");
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        if (inputFormat != InputFormat.SINGLE_JSON
+                && 
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
 {
+            final ObjectMapper suppressingMapper = mapper.copy()
+                    .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+                    
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);

Review Comment:
   removed the redundant NON_NULL call since NON_EMPTY overwrites it anyway. 
The client service had the same double-call pattern in 
ElasticSearchClientServiceImpl.createObjectMapper() so both were already 
behaving identically; cleaned that up too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to