agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3042436976
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,183 +160,841 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.required(true)
.build();
+ static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Batch Size")
+ .description("""
+ The maximum amount of data to send in a single
Elasticsearch _bulk API request. \
+ For NDJSON and JSON Array modes, FlowFiles are accumulated
until this threshold is reached, then flushed. \
+ For Single JSON mode, this acts as a size-based safety
limit: if the accumulated FlowFiles exceed this size \
+ before Max FlowFiles Per Batch is reached, the request is
flushed early. \
+ Elasticsearch recommends 5-15 MB per bulk request for
optimal performance. \
+ To disable the size-based limit, check "Set Empty String".\
+ """)
+ .defaultValue("10 MB")
+ .addValidator((subject, input, context) ->
StringUtils.isBlank(input)
+ ? new
ValidationResult.Builder().subject(subject).valid(true).build()
+ : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject,
input, context))
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor INPUT_FORMAT = new
PropertyDescriptor.Builder()
+ .name("Input Content Format")
+ .description("""
+ The format of the JSON content in each FlowFile. \
+ NDJSON: one JSON object per line (newline-delimited). \
+ JSON Array: a top-level JSON array of objects, streamed
element-by-element for memory efficiency. \
+ Single JSON: the entire FlowFile is a single JSON
document.\
+ """)
+ .allowableValues(InputFormat.class)
+ .defaultValue(InputFormat.SINGLE_JSON.getValue())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+ .name("Max FlowFiles Per Batch")
+ .displayName("Max FlowFiles Per Batch")
+ .description("""
+ The maximum number of FlowFiles to include in a single
Elasticsearch _bulk API request. \
+ If the accumulated FlowFiles exceed Max Batch Size before
this count is reached, the request will be flushed early.\
+ """)
+ .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+ .build();
+
+ static final PropertyDescriptor SUPPRESS_NULLS = new
PropertyDescriptor.Builder()
+ .name("Suppress Nulls")
+ .description("""
+ When set to Always Suppress, null and empty values are
removed from documents before they are sent to Elasticsearch.
+ This setting applies to NDJSON and JSON Array formats for
Index and Create operations only. \
+ For Single JSON, configure null suppression on the
controller service instead.
+ Performance note: for JSON Array the impact is negligible
since documents are already being parsed. \
+ For NDJSON, each line must be parsed and re-serialized
when suppression is enabled, \
+ which adds overhead compared to the default behaviour of
passing lines through as raw bytes.\
+ """)
+ .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS,
ElasticSearchClientService.ALWAYS_SUPPRESS)
+ .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+ .required(true)
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final PropertyDescriptor ID_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("Identifier Attribute")
+ .description("""
+ The name of the FlowFile attribute containing the
identifier for the document. \
+ If the Index Operation is "index", this property may be
left empty or evaluate to an empty value, \
+ in which case the document's identifier will be
auto-generated by Elasticsearch. \
+ For all other Index Operations, the attribute must
evaluate to a non-empty value.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+ .build();
+
+ static final PropertyDescriptor IDENTIFIER_FIELD = new
PropertyDescriptor.Builder()
+ .name("Identifier Field")
+ .description("""
+ The name of the field within each document to use as the
Elasticsearch document ID. \
+ If the field is not present in a document or this property
is left blank, no document ID is set \
+ and Elasticsearch will auto-generate one.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+ .name("bulk_request")
+ .description("When \"Output Bulk Request\" is enabled, the raw
Elasticsearch _bulk API request body is written " +
+ "to this relationship as a FlowFile for inspection or
debugging.")
+ .build();
+
+ static final PropertyDescriptor OUTPUT_BULK_REQUEST = new
PropertyDescriptor.Builder()
+ .name("Output Bulk Request")
+ .description("If enabled, each Elasticsearch _bulk request body is
written as a FlowFile to the \"" +
+ REL_BULK_REQUEST.getName() + "\" relationship. Useful for
debugging. " +
+ "Each FlowFile contains the full NDJSON body exactly as
sent to Elasticsearch.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
static final List<PropertyDescriptor> DESCRIPTORS = List.of(
- ID_ATTRIBUTE,
INDEX_OP,
INDEX,
TYPE,
SCRIPT,
SCRIPTED_UPSERT,
DYNAMIC_TEMPLATES,
+ INPUT_FORMAT,
BATCH_SIZE,
+ MAX_BATCH_SIZE,
+ SUPPRESS_NULLS,
+ ID_ATTRIBUTE,
+ IDENTIFIER_FIELD,
CHARSET,
MAX_JSON_FIELD_STRING_LENGTH,
CLIENT_SERVICE,
LOG_ERROR_RESPONSES,
OUTPUT_ERROR_RESPONSES,
+ OUTPUT_BULK_REQUEST,
NOT_FOUND_IS_SUCCESSFUL
);
+
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL,
REL_ERRORS);
+ private static final int READER_BUFFER_SIZE = 65536;
+
+ private final AtomicBoolean bulkRequestOutputEnabled = new
AtomicBoolean(false);
+ private boolean outputBulkRequest;
+ private ObjectReader mapReader;
+ private ObjectWriter suppressingWriter;
+
@Override
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+ if (bulkRequestOutputEnabled.get()) {
+ rels.add(REL_BULK_REQUEST);
+ }
+ return rels;
+ }
+
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+ bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+ }
+ }
+
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
+ // Migrate legacy property names from PutElasticsearchJson
config.removeProperty("put-es-json-error-documents");
config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
config.renameProperty("put-es-json-script", SCRIPT.getName());
config.renameProperty("put-es-json-scripted-upsert",
SCRIPTED_UPSERT.getName());
config.renameProperty("put-es-json-dynamic_templates",
DYNAMIC_TEMPLATES.getName());
config.renameProperty("put-es-json-charset", CHARSET.getName());
config.renameProperty("put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+ // If INPUT_FORMAT was not explicitly set, this flow was migrated from
PutElasticsearchJson — default to Single JSON.
+ // Set this before migrating BATCH_SIZE so its dependsOn condition is
satisfied when NiFi processes the property.
+ if (!config.hasProperty(INPUT_FORMAT.getName())) {
+ config.setProperty(INPUT_FORMAT.getName(),
InputFormat.SINGLE_JSON.getValue());
+ }
+
+ // Migrate "Batch Size" (from PutElasticsearchJson) to the new name
used in this processor.
+ // INPUT_FORMAT is set first above so its dependsOn condition is
satisfied when NiFi processes BATCH_SIZE.
+ config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(),
BATCH_SIZE.getName());
+
+ // MAX_BATCH_SIZE is a new property — existing configurations had no
byte-based limit.
+ // Preserve that behavior on upgrade by leaving the property blank
(unbounded).
+ // The custom validator on MAX_BATCH_SIZE accepts blank values, so
this will not be
+ // overwritten by the default of 10 MB. New processor instances get
the 10 MB default.
+ if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+ config.setProperty(MAX_BATCH_SIZE.getName(), "");
+ }
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
+ // PutElasticsearchJson used "success" before it was renamed to
"original"
config.renameRelationship("success",
AbstractPutElasticsearch.REL_ORIGINAL.getName());
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
-
this.notFoundIsSuccessful =
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+ this.outputBulkRequest =
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+ this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+ this.mapReader = mapper.readerFor(new TypeReference<Map<String,
Object>>() { });
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ if (inputFormat != InputFormat.SINGLE_JSON
+ &&
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
{
+ final ObjectMapper suppressingMapper = mapper.copy()
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
Review Comment:
removed the redundant NON_NULL call since NON_EMPTY overwrites it anyway.
The client service had the same double-call pattern in
ElasticSearchClientServiceImpl.createObjectMapper() so both were already
behaving identically; cleaned that up too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]