pvillard31 commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3006003684
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,6 +165,55 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.required(true)
.build();
+ static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Batch Size")
+ .description("The maximum amount of data to send in a single
Elasticsearch _bulk API request. " +
+ "For NDJSON and JSON Array modes, FlowFiles are
accumulated until this threshold is reached, then flushed. " +
+ "For Single JSON mode, this acts as a size-based safety
limit: if the accumulated FlowFiles exceed this size " +
+ "before Max FlowFiles Per Batch is reached, the request is
flushed early. " +
+ "Elasticsearch recommends 5-15 MB per bulk request for
optimal performance.")
+ .defaultValue("10 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .required(true)
+ .build();
Review Comment:
This is potentially a behavioral change, no? For example, a user with Batch
Size = 100 and 100 FlowFiles of 200KB each (20MB) would previously send one
bulk request but will now get 2+. Not against the change as it seems to be a
reasonable default but just mentioning it.
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -153,175 +222,705 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
SCRIPT,
SCRIPTED_UPSERT,
DYNAMIC_TEMPLATES,
+ INPUT_FORMAT,
BATCH_SIZE,
+ MAX_BATCH_SIZE,
CHARSET,
MAX_JSON_FIELD_STRING_LENGTH,
CLIENT_SERVICE,
LOG_ERROR_RESPONSES,
OUTPUT_ERROR_RESPONSES,
+ OUTPUT_BULK_REQUEST,
NOT_FOUND_IS_SUCCESSFUL
);
+
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL,
REL_ERRORS);
+ private static final int READER_BUFFER_SIZE = 65536;
+
+ private final AtomicBoolean bulkRequestOutputEnabled = new
AtomicBoolean(false);
+ private boolean outputBulkRequest;
+ private ObjectReader mapReader;
+
@Override
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+ if (bulkRequestOutputEnabled.get()) {
+ rels.add(REL_BULK_REQUEST);
+ }
+ return rels;
+ }
+
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+ bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+ }
+ }
+
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
+ // Migrate legacy property names from PutElasticsearchJson
config.removeProperty("put-es-json-error-documents");
config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
config.renameProperty("put-es-json-script", SCRIPT.getName());
config.renameProperty("put-es-json-scripted-upsert",
SCRIPTED_UPSERT.getName());
config.renameProperty("put-es-json-dynamic_templates",
DYNAMIC_TEMPLATES.getName());
config.renameProperty("put-es-json-charset", CHARSET.getName());
config.renameProperty("put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+ // Migrate "Batch Size" (from PutElasticsearchJson) to the new name
used in this processor
+ config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(),
BATCH_SIZE.getName());
+
+ // If INPUT_FORMAT was not explicitly set, this flow was migrated from
PutElasticsearchJson — default to Single JSON
+ if (!config.hasProperty(INPUT_FORMAT.getName())) {
+ config.setProperty(INPUT_FORMAT.getName(),
InputFormat.SINGLE_JSON.getValue());
+ }
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
+ // PutElasticsearchJson used "success" before it was renamed to
"original"
config.renameRelationship("success",
AbstractPutElasticsearch.REL_ORIGINAL.getName());
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
-
this.notFoundIsSuccessful =
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+ this.outputBulkRequest =
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+ this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+ this.mapReader = mapper.readerFor(new TypeReference<Map<String,
Object>>() { });
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final int batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
- final List<FlowFile> flowFiles = session.get(batchSize);
- if (flowFiles.isEmpty()) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
return;
}
-
+ final long maxBatchBytes =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
final String idAttribute =
context.getProperty(ID_ATTRIBUTE).getValue();
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+ ?
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+ : Integer.MAX_VALUE;
+ int flowFilesProcessed = 0;
+
+ // Tracks all FlowFiles that were successfully parsed and submitted
(even partially)
+ final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+ // Tracks FlowFiles that had at least one Elasticsearch document error
+ final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+ // Deferred bulk-error attributes: applied after each FlowFile's
InputStream is closed
+ final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new
HashMap<>();
+ // Failed local record indices per FlowFile — O(error count) memory;
used at finalization
+ // to re-read FlowFiles once and reconstruct error/success content
without buffering all bytes
+ final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new
LinkedHashMap<>();
+
+ // Current chunk accumulation — operationFlowFiles and
operationRecordIndices are parallel to operations (same index)
+ final List<FlowFile> operationFlowFiles = new ArrayList<>();
+ final List<IndexOperationRequest> operations = new ArrayList<>();
+ final List<Integer> operationRecordIndices = new ArrayList<>();
+ long totalBytesAccumulated = 0;
+ long chunkBytes = 0;
- final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
- final List<IndexOperationRequest> operations = new
ArrayList<>(flowFiles.size());
+ while (flowFile != null) {
+ final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ final String flowFileIdAttribute =
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
- for (final FlowFile input : flowFiles) {
- addOperation(operations, originals, idAttribute, context, session,
input);
+ final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, flowFile);
+ final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+ final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, flowFile);
+ final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
+
+ boolean parseError = false;
+ try {
+ final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
+ if (InputFormat.NDJSON == inputFormat) {
+ // NDJSON: each non-blank line is one JSON document.
+ // Index/Create operations pass raw UTF-8 bytes directly
to avoid Map allocation.
+ // Update/Delete/Upsert parse into a Map so the bulk
serializer can wrap the payload.
+ int localRecordIndex = 0;
+ try (final BufferedReader reader = new BufferedReader(
+ new InputStreamReader(session.read(flowFile),
charset), READER_BUFFER_SIZE)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String trimmedLine = line.trim();
+ if (trimmedLine.isEmpty()) {
+ continue;
+ }
+ final IndexOperationRequest opRequest;
+ final long docBytes;
+ if (o == IndexOperationRequest.Operation.Index ||
o == IndexOperationRequest.Operation.Create) {
+ final String id = extractId(trimmedLine,
idAttribute, flowFileIdAttribute);
+ final byte[] rawJsonBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes = rawJsonBytes.length;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(trimmedLine);
+ final String id = resolveId(contentMap,
idAttribute, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } else if (InputFormat.JSON_ARRAY == inputFormat) {
+ // JSON Array: the FlowFile may contain one or more
top-level JSON arrays.
+ // Arrays are read sequentially; elements within each
array are streamed one at a
+ // time via JsonParser to avoid loading the entire content
into memory.
+ // Each element is re-serialized to compact bytes for
Index/Create,
+ // or parsed into a Map for Update/Delete/Upsert.
+ int localRecordIndex = 0;
+ try (final InputStreamReader isr = new
InputStreamReader(session.read(flowFile), charset);
+ final JsonParser parser =
mapper.getFactory().createParser(isr)) {
+ JsonToken outerToken;
+ while ((outerToken = parser.nextToken()) != null) {
+ if (outerToken != JsonToken.START_ARRAY) {
+ throw new IOException("Expected a JSON array
but found: " + outerToken);
+ }
+ JsonToken token;
+ while ((token = parser.nextToken()) !=
JsonToken.END_ARRAY) {
+ if (token == null) {
+ throw new IOException("Malformed JSON
Array: reached end of stream before the closing ']'. " +
+ "Verify the FlowFile content is a
complete, valid JSON array.");
+ }
+ final long startOffset =
parser.currentTokenLocation().getCharOffset();
+ final IndexOperationRequest opRequest;
+ if (o == IndexOperationRequest.Operation.Index
|| o == IndexOperationRequest.Operation.Create) {
+ final JsonNode node =
mapper.readTree(parser);
+ final long docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ final byte[] rawJsonBytes =
mapper.writeValueAsBytes(node);
+ final String id = extractId(node,
idAttribute, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(parser);
+ final long docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ final String id = resolveId(contentMap,
idAttribute, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ }
+ } else {
+ // Single JSON: the entire FlowFile is one document.
+ // For Index/Create, the input is parsed and re-serialized
to compact JSON to strip
+ // any embedded newlines from pretty-printed input. The
Elasticsearch bulk NDJSON
+ // protocol uses newlines as record delimiters, so
embedded newlines would corrupt
+ // the request. Update/Delete/Upsert parse directly into a
Map.
+ try (final InputStream in = session.read(flowFile)) {
+ final IndexOperationRequest opRequest;
+ if (o == IndexOperationRequest.Operation.Index || o ==
IndexOperationRequest.Operation.Create) {
+ // Re-serialize via Jackson to produce compact
single-line JSON.
+ // Pretty-printed input would contain embedded
newlines that break the
+ // Elasticsearch bulk NDJSON protocol (each
document must be one line).
+ final JsonNode node = mapper.readTree(new
InputStreamReader(in, charset));
+ final byte[] rawJsonBytes =
mapper.writeValueAsBytes(node);
Review Comment:
Using rawJsonBytes here for Single JSON Index/Create bypasses the null
suppression configuration on the ElasticSearchClientService. With this change,
the bytes are written directly in ElasticSearchClientServiceImpl.buildRequest,
skipping the client service's mapper entirely. This is a silent behavioral
change for existing users who have null suppression enabled.
For Single JSON mode, I'd recommend preserving the old Map-based path (parse
into Map, store in fields) so the client service's null suppression continues
to work. The rawJsonBytes optimization makes sense for NDJSON and JSON Array
where avoiding the Map allocation matters for throughput.
To be more specific:
- Old path (current main):
Processor parses JSON into Map<String, Object> (null JSON values become Java
null in the Map)
Map stored in IndexOperationRequest.fields
Client service serializes the Map via
mapper.writeValueAsString(request.getFields()) — client service's mapper
applies null suppression here
- New path (PR, Single JSON, Index/Create):
Processor parses JSON into JsonNode, serializes back to byte[] via
mapper.writeValueAsBytes(node) — this is the processor's mapper, which does not
have null suppression
Bytes stored in IndexOperationRequest.rawJsonBytes
Client service writes bytes directly via
out.write(request.getRawJsonBytes()) — client service's mapper is never used
So a regular user with this configuration:
- ElasticSearchClientServiceImpl with Suppress Nulls = Always Suppress
- PutElasticsearchJson with Index operation
- Input JSON: {"name": "John", "age": null}
Before: Elasticsearch receives {"name":"John"} (null stripped by client
service's mapper) After: Elasticsearch receives {"name":"John","age":null}
(null preserved because client service's mapper is bypassed)
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -153,175 +222,705 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
SCRIPT,
SCRIPTED_UPSERT,
DYNAMIC_TEMPLATES,
+ INPUT_FORMAT,
BATCH_SIZE,
+ MAX_BATCH_SIZE,
CHARSET,
MAX_JSON_FIELD_STRING_LENGTH,
CLIENT_SERVICE,
LOG_ERROR_RESPONSES,
OUTPUT_ERROR_RESPONSES,
+ OUTPUT_BULK_REQUEST,
NOT_FOUND_IS_SUCCESSFUL
);
+
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL,
REL_ERRORS);
+ private static final int READER_BUFFER_SIZE = 65536;
+
+ private final AtomicBoolean bulkRequestOutputEnabled = new
AtomicBoolean(false);
+ private boolean outputBulkRequest;
+ private ObjectReader mapReader;
+
@Override
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+ if (bulkRequestOutputEnabled.get()) {
+ rels.add(REL_BULK_REQUEST);
+ }
+ return rels;
+ }
+
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+ bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+ }
+ }
+
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
+ // Migrate legacy property names from PutElasticsearchJson
config.removeProperty("put-es-json-error-documents");
config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
config.renameProperty("put-es-json-script", SCRIPT.getName());
config.renameProperty("put-es-json-scripted-upsert",
SCRIPTED_UPSERT.getName());
config.renameProperty("put-es-json-dynamic_templates",
DYNAMIC_TEMPLATES.getName());
config.renameProperty("put-es-json-charset", CHARSET.getName());
config.renameProperty("put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+ // Migrate "Batch Size" (from PutElasticsearchJson) to the new name
used in this processor
+ config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(),
BATCH_SIZE.getName());
+
+ // If INPUT_FORMAT was not explicitly set, this flow was migrated from
PutElasticsearchJson — default to Single JSON
+ if (!config.hasProperty(INPUT_FORMAT.getName())) {
+ config.setProperty(INPUT_FORMAT.getName(),
InputFormat.SINGLE_JSON.getValue());
+ }
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
+ // PutElasticsearchJson used "success" before it was renamed to
"original"
config.renameRelationship("success",
AbstractPutElasticsearch.REL_ORIGINAL.getName());
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
-
this.notFoundIsSuccessful =
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+ this.outputBulkRequest =
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+ this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+ this.mapReader = mapper.readerFor(new TypeReference<Map<String,
Object>>() { });
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final int batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
- final List<FlowFile> flowFiles = session.get(batchSize);
- if (flowFiles.isEmpty()) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
return;
}
-
+ final long maxBatchBytes =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
final String idAttribute =
context.getProperty(ID_ATTRIBUTE).getValue();
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+ ?
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+ : Integer.MAX_VALUE;
+ int flowFilesProcessed = 0;
+
+ // Tracks all FlowFiles that were successfully parsed and submitted
(even partially)
+ final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+ // Tracks FlowFiles that had at least one Elasticsearch document error
+ final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+ // Deferred bulk-error attributes: applied after each FlowFile's
InputStream is closed
+ final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new
HashMap<>();
+ // Failed local record indices per FlowFile — O(error count) memory;
used at finalization
+ // to re-read FlowFiles once and reconstruct error/success content
without buffering all bytes
+ final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new
LinkedHashMap<>();
+
+ // Current chunk accumulation — operationFlowFiles and
operationRecordIndices are parallel to operations (same index)
+ final List<FlowFile> operationFlowFiles = new ArrayList<>();
+ final List<IndexOperationRequest> operations = new ArrayList<>();
+ final List<Integer> operationRecordIndices = new ArrayList<>();
+ long totalBytesAccumulated = 0;
+ long chunkBytes = 0;
- final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
- final List<IndexOperationRequest> operations = new
ArrayList<>(flowFiles.size());
+ while (flowFile != null) {
+ final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ final String flowFileIdAttribute =
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
- for (final FlowFile input : flowFiles) {
- addOperation(operations, originals, idAttribute, context, session,
input);
+ final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, flowFile);
+ final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+ final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, flowFile);
+ final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
+
+ boolean parseError = false;
+ try {
+ final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
+ if (InputFormat.NDJSON == inputFormat) {
+ // NDJSON: each non-blank line is one JSON document.
+ // Index/Create operations pass raw UTF-8 bytes directly
to avoid Map allocation.
+ // Update/Delete/Upsert parse into a Map so the bulk
serializer can wrap the payload.
+ int localRecordIndex = 0;
+ try (final BufferedReader reader = new BufferedReader(
+ new InputStreamReader(session.read(flowFile),
charset), READER_BUFFER_SIZE)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String trimmedLine = line.trim();
+ if (trimmedLine.isEmpty()) {
+ continue;
+ }
+ final IndexOperationRequest opRequest;
+ final long docBytes;
+ if (o == IndexOperationRequest.Operation.Index ||
o == IndexOperationRequest.Operation.Create) {
+ final String id = extractId(trimmedLine,
idAttribute, flowFileIdAttribute);
+ final byte[] rawJsonBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes = rawJsonBytes.length;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(trimmedLine);
+ final String id = resolveId(contentMap,
idAttribute, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } else if (InputFormat.JSON_ARRAY == inputFormat) {
+ // JSON Array: the FlowFile may contain one or more
top-level JSON arrays.
+ // Arrays are read sequentially; elements within each
array are streamed one at a
+ // time via JsonParser to avoid loading the entire content
into memory.
+ // Each element is re-serialized to compact bytes for
Index/Create,
+ // or parsed into a Map for Update/Delete/Upsert.
+ int localRecordIndex = 0;
+ try (final InputStreamReader isr = new
InputStreamReader(session.read(flowFile), charset);
+ final JsonParser parser =
mapper.getFactory().createParser(isr)) {
+ JsonToken outerToken;
+ while ((outerToken = parser.nextToken()) != null) {
+ if (outerToken != JsonToken.START_ARRAY) {
+ throw new IOException("Expected a JSON array
but found: " + outerToken);
+ }
+ JsonToken token;
+ while ((token = parser.nextToken()) !=
JsonToken.END_ARRAY) {
+ if (token == null) {
+ throw new IOException("Malformed JSON
Array: reached end of stream before the closing ']'. " +
+ "Verify the FlowFile content is a
complete, valid JSON array.");
+ }
+ final long startOffset =
parser.currentTokenLocation().getCharOffset();
+ final IndexOperationRequest opRequest;
+ if (o == IndexOperationRequest.Operation.Index
|| o == IndexOperationRequest.Operation.Create) {
+ final JsonNode node =
mapper.readTree(parser);
+ final long docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ final byte[] rawJsonBytes =
mapper.writeValueAsBytes(node);
+ final String id = extractId(node,
idAttribute, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(parser);
+ final long docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ final String id = resolveId(contentMap,
idAttribute, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ }
+ } else {
+ // Single JSON: the entire FlowFile is one document.
+ // For Index/Create, the input is parsed and re-serialized
to compact JSON to strip
+ // any embedded newlines from pretty-printed input. The
Elasticsearch bulk NDJSON
+ // protocol uses newlines as record delimiters, so
embedded newlines would corrupt
+ // the request. Update/Delete/Upsert parse directly into a
Map.
+ try (final InputStream in = session.read(flowFile)) {
+ final IndexOperationRequest opRequest;
+ if (o == IndexOperationRequest.Operation.Index || o ==
IndexOperationRequest.Operation.Create) {
+ // Re-serialize via Jackson to produce compact
single-line JSON.
+ // Pretty-printed input would contain embedded
newlines that break the
+ // Elasticsearch bulk NDJSON protocol (each
document must be one line).
+ final JsonNode node = mapper.readTree(new
InputStreamReader(in, charset));
+ final byte[] rawJsonBytes =
mapper.writeValueAsBytes(node);
+ final String id = extractId(node, idAttribute,
flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(in);
+ final String id =
StringUtils.isNotBlank(flowFileIdAttribute) ? flowFileIdAttribute : null;
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(0);
+ final long docBytes = flowFile.getSize();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } catch (final ElasticsearchException ese) {
+ final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure.");
+ getLogger().error(msg, ese);
+ final Relationship rel = ese.isElastic() ? REL_RETRY :
REL_FAILURE;
+ // Route only the failing in-flight chunk to retry/failure.
FlowFiles already
+ // successfully indexed by prior _bulk requests are routed
normally to avoid
+ // duplicate indexing if those FlowFiles are re-processed on
retry.
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ese, rel, session, true,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
+ return;
+ } catch (final IOException ioe) {
+ getLogger().error("Could not read FlowFile content as valid
{}.", inputFormat, ioe);
+ removeFlowFileFromChunk(flowFile, operations,
operationFlowFiles, operationRecordIndices);
+ flowFile = session.putAttribute(flowFile,
"elasticsearch.put.error", ioe.getMessage());
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ parseError = true;
+ } catch (final Exception ex) {
+ getLogger().error("Failed processing records.", ex);
+ removeFlowFileFromChunk(flowFile, operations,
operationFlowFiles, operationRecordIndices);
+ flowFile = session.putAttribute(flowFile,
"elasticsearch.put.error", ex.getMessage());
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ parseError = true;
+ }
+
+ if (!parseError) {
+ allProcessedFlowFiles.add(flowFile);
+ // InputStream is now closed — safe to apply any deferred
bulk-error attributes
+ applyPendingBulkErrors(flowFile, pendingBulkErrors, session);
+ }
+
+ flowFilesProcessed++;
+
+ // For Single JSON, stop when the count-based batch limit is
reached; otherwise use size limit
+ if (InputFormat.SINGLE_JSON == inputFormat ? flowFilesProcessed >=
batchSize : totalBytesAccumulated >= maxBatchBytes) {
+ break;
+ }
+
+ flowFile = session.get();
}
- if (!originals.isEmpty()) {
+ // Flush any remaining operations (all InputStreams are closed at this
point)
+ if (!operations.isEmpty()) {
try {
- final List<FlowFile> errorDocuments =
indexDocuments(operations, originals, context, session);
- handleResponse(context, session, errorDocuments, originals);
- session.transfer(originals, REL_ORIGINAL);
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, null, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
- ese.isElastic() ? "Routing to retry." : "Routing to
failure");
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure.");
getLogger().error(msg, ese);
final Relationship rel = ese.isElastic() ? REL_RETRY :
REL_FAILURE;
- transferFlowFilesOnException(ese, rel, session, true,
originals.toArray(new FlowFile[0]));
- } catch (final JsonProcessingException jpe) {
- getLogger().warn("Could not log Elasticsearch operation errors
nor determine which documents errored.", jpe);
- transferFlowFilesOnException(jpe, REL_ERRORS, session, true,
originals.toArray(new FlowFile[0]));
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ese, rel, session, true,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
+ return;
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
- transferFlowFilesOnException(ex, REL_FAILURE, session, false,
originals.toArray(new FlowFile[0]));
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ex, REL_FAILURE, session, false,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
context.yield();
+ return;
}
+ }
+
+ if (allProcessedFlowFiles.isEmpty()) {
+ getLogger().warn("No records successfully parsed for sending to
Elasticsearch");
} else {
- getLogger().warn("No FlowFiles successfully parsed for sending to
Elasticsearch");
+ handleFinalResponse(context, session, errorFlowFiles,
allProcessedFlowFiles, pendingErrorRecordIndices, inputFormat);
}
}
- private void addOperation(final List<IndexOperationRequest> operations,
final List<FlowFile> originals, final String idAttribute,
- final ProcessContext context, final
ProcessSession session, FlowFile input) {
- final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
- final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
- final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
- final String id = StringUtils.isNotBlank(idAttribute) &&
StringUtils.isNotBlank(input.getAttribute(idAttribute)) ?
input.getAttribute(idAttribute) : null;
-
- final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, input);
- final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean();
- final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, input);
-
- final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, input);
- final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
-
- final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
-
- try (final InputStream inStream = session.read(input)) {
- final byte[] result = IOUtils.toByteArray(inStream);
- @SuppressWarnings("unchecked")
- final Map<String, Object> contentMap = mapper.readValue(new
String(result, charset), Map.class);
-
- final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
- operations.add(new IndexOperationRequest(index, type, id,
contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap,
bulkHeaderFields));
-
- originals.add(input);
- } catch (final IOException ioe) {
- getLogger().error("Could not read FlowFile content valid JSON.",
ioe);
- input = session.putAttribute(input, "elasticsearch.put.error",
ioe.getMessage());
- session.penalize(input);
- session.transfer(input, REL_FAILURE);
- } catch (final Exception ex) {
- getLogger().error("Could not index documents.", ex);
- input = session.putAttribute(input, "elasticsearch.put.error",
ex.getMessage());
- session.penalize(input);
- session.transfer(input, REL_FAILURE);
+ /**
+ * Removes all entries belonging to {@code target} from the three parallel
lists, so that a
+ * FlowFile that failed mid-read does not leave stale references that
would cause
+ * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}
on the next flush.
+ */
+ private void removeFlowFileFromChunk(final FlowFile target,
+ final List<IndexOperationRequest>
operations,
+ final List<FlowFile>
operationFlowFiles,
+ final List<Integer>
operationRecordIndices) {
+ for (int i = operations.size() - 1; i >= 0; i--) {
+ if (operationFlowFiles.get(i) == target) {
+ operations.remove(i);
+ operationFlowFiles.remove(i);
+ operationRecordIndices.remove(i);
+ }
}
}
- @SuppressWarnings("unchecked")
- private Map<String, Object> getMapFromAttribute(final PropertyDescriptor
propertyDescriptor, final ProcessContext context, final FlowFile input) {
- final String propertyValue =
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+ /**
+ * Applies any bulk-error attributes that were deferred for {@code
flowFile} while its
+ * InputStream was open. Must only be called after the InputStream has
been closed.
+ */
+ private void applyPendingBulkErrors(final FlowFile flowFile,
+ final Map<FlowFile, List<Map<String,
Object>>> pendingBulkErrors,
+ final ProcessSession session) {
+ final List<Map<String, Object>> errorList =
pendingBulkErrors.remove(flowFile);
+ if (errorList == null) {
+ return;
+ }
try {
- return StringUtils.isNotBlank(propertyValue) ?
mapper.readValue(propertyValue, Map.class) : Collections.emptyMap();
- } catch (final JsonProcessingException jpe) {
- throw new ProcessException(propertyDescriptor.getDisplayName() + "
must be a String parsable into a JSON Object", jpe);
+ final Object errorObj = errorList.size() == 1 ? errorList.get(0) :
errorList;
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
mapper.writeValueAsString(errorObj));
+ } catch (final JsonProcessingException e) {
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
+ String.format("{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")));
}
}
- private List<FlowFile> indexDocuments(final List<IndexOperationRequest>
operations, final List<FlowFile> originals, final ProcessContext context, final
ProcessSession session) throws IOException {
- final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, originals.getFirst());
+ /**
+ * Sends the accumulated batch of operations to Elasticsearch via the
_bulk API and records
+ * any per-document errors. If {@code openStreamFlowFile} is non-null,
error attributes for
+ * that FlowFile are deferred into {@code pendingBulkErrors} so they are
applied after its
+ * InputStream is closed (calling {@code putAttribute} on an open FlowFile
throws
+ * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}).
+ * <p>
+ * Only the local record index of each failed operation is stored (O(error
count) memory).
+ * Error/success content is reconstructed by re-reading the FlowFile in
+ * {@link #handleFinalResponse}.
+ */
+ private void flushChunk(final List<IndexOperationRequest> operations,
final List<FlowFile> operationFlowFiles,
+ final List<Integer> operationRecordIndices,
+ final Set<FlowFile> errorFlowFiles, final
FlowFile openStreamFlowFile,
+ final Map<FlowFile, List<Map<String, Object>>>
pendingBulkErrors,
+ final Map<FlowFile, Set<Integer>>
pendingErrorRecordIndices,
+ final ProcessContext context, final
ProcessSession session) throws IOException {
+ if (outputBulkRequest) {
+ outputBulkRequestFlowFile(operations, operationFlowFiles, session);
+ }
+
+ final FlowFile firstFlowFile = operationFlowFiles.get(0);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, firstFlowFile);
final IndexOperationResponse response =
clientService.get().bulk(operations,
- new
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties),
getRequestHeadersFromDynamicProperties(context, originals.getFirst())));
+ new
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties),
getRequestHeadersFromDynamicProperties(context, firstFlowFile)));
final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
- final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
+
+ // Attach per-FlowFile error attributes; defer putAttribute for the
FlowFile whose
+ // InputStream is currently open (putAttribute would throw
FlowFileHandlingException).
+ // Record only the local record index of each failure — content is
reconstructed at finalization.
+ final Map<FlowFile, List<Map<String, Object>>> flowFileErrors = new
HashMap<>();
errors.forEach((index, error) -> {
- String errorMessage;
- try {
- errorMessage = mapper.writeValueAsString(error);
- } catch (JsonProcessingException e) {
- errorMessage = String.format(
- "{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
- e.getMessage().replace("\"", "\\\"")
- );
+ final FlowFile flowFile = operationFlowFiles.get(index);
+ errorFlowFiles.add(flowFile);
+ flowFileErrors.computeIfAbsent(flowFile, k -> new
ArrayList<>()).add(error);
+ pendingErrorRecordIndices.computeIfAbsent(flowFile, k -> new
HashSet<>()).add(operationRecordIndices.get(index));
+ });
+ flowFileErrors.forEach((flowFile, errorList) -> {
+ if (flowFile == openStreamFlowFile) {
+ // Defer: InputStream still open — apply after the
try-with-resources closes
+ pendingBulkErrors.computeIfAbsent(flowFile, k -> new
ArrayList<>()).addAll(errorList);
+ } else {
+ try {
+ final Object errorObj = errorList.size() == 1 ?
errorList.get(0) : errorList;
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
mapper.writeValueAsString(errorObj));
+ } catch (final JsonProcessingException e) {
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
+ String.format("{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")));
+ }
}
- errorDocuments.add(session.putAttribute(originals.get(index),
"elasticsearch.bulk.error", errorMessage));
});
if (!errors.isEmpty()) {
- handleElasticsearchDocumentErrors(errors, session, null);
+ handleElasticsearchDocumentErrors(errors, session, firstFlowFile);
}
+ }
+
+ /**
+ * Serializes the pending batch into an Elasticsearch _bulk NDJSON body
and writes it to a
+ * new FlowFile on the {@code bulk_request} relationship for inspection or
debugging.
+ * Each operation produces an action metadata line followed by a document
line
+ * (Delete operations have no document line). Failures here are logged and
swallowed so
+ * that a debug-output failure does not abort the actual indexing.
+ */
+ private void outputBulkRequestFlowFile(final List<IndexOperationRequest>
operations,
+ final List<FlowFile>
operationFlowFiles,
+ final ProcessSession session) {
+ final FlowFile parent = operationFlowFiles.get(0);
+ FlowFile bulkRequestFF = session.create(parent);
+ try (final OutputStream out = session.write(bulkRequestFF)) {
+ for (final IndexOperationRequest op : operations) {
+ // Action metadata line
+ final Map<String, Object> actionBody = new LinkedHashMap<>();
+ if (StringUtils.isNotBlank(op.getIndex())) {
+ actionBody.put("_index", op.getIndex());
+ }
+ if (StringUtils.isNotBlank(op.getType())) {
+ actionBody.put("_type", op.getType());
+ }
+ if (StringUtils.isNotBlank(op.getId())) {
+ actionBody.put("_id", op.getId());
+ }
+ if (op.getDynamicTemplates() != null &&
!op.getDynamicTemplates().isEmpty()) {
+ actionBody.put("dynamic_templates",
op.getDynamicTemplates());
+ }
+ if (op.getHeaderFields() != null) {
+ actionBody.putAll(op.getHeaderFields());
+ }
+
+ // Upsert maps to "update" in the ES bulk API
+ final String actionName = op.getOperation() ==
IndexOperationRequest.Operation.Upsert
+ ? "update" : op.getOperation().getValue();
+ final Map<String, Object> actionLine = Map.of(actionName,
actionBody);
+ out.write(mapper.writeValueAsBytes(actionLine));
+ out.write('\n');
- return errorDocuments;
+ // Document line (delete has no document)
+ if (op.getOperation() !=
IndexOperationRequest.Operation.Delete) {
+ if (op.getRawJsonBytes() != null) {
+ // Index/Create with rawJson path — write directly, no
Map round-trip
+ out.write(op.getRawJsonBytes());
+ } else {
+ final Map<String, Object> docLine = new
LinkedHashMap<>();
+ if (op.getOperation() ==
IndexOperationRequest.Operation.Update
+ || op.getOperation() ==
IndexOperationRequest.Operation.Upsert) {
+ if (op.getScript() != null &&
!op.getScript().isEmpty()) {
+ docLine.put("script", op.getScript());
+ if (op.isScriptedUpsert()) {
+ docLine.put("scripted_upsert", true);
+ }
+ if (op.getFields() != null &&
!op.getFields().isEmpty()) {
+ docLine.put("upsert", op.getFields());
+ }
+ } else {
+ docLine.put("doc", op.getFields());
+ if (op.getOperation() ==
IndexOperationRequest.Operation.Upsert) {
+ docLine.put("doc_as_upsert", true);
+ }
+ }
+ } else {
+ docLine.putAll(op.getFields());
+ }
+ out.write(mapper.writeValueAsBytes(docLine));
+ }
+ out.write('\n');
+ }
+ }
+ } catch (final IOException e) {
+ getLogger().warn("Failed to write bulk request FlowFile for
inspection.", e);
+ session.remove(bulkRequestFF);
+ return;
+ }
+ bulkRequestFF = session.putAttribute(bulkRequestFF,
"elasticsearch.bulk.operation.count", String.valueOf(operations.size()));
+ session.transfer(bulkRequestFF, REL_BULK_REQUEST);
+ }
+
+ /**
+ * Extracts the document ID from a raw JSON string using a streaming
parser.
+ * Stops as soon as the target field is found, avoiding a full tree parse.
+ * Used for Index/Create operations to avoid the Map allocation overhead.
+ */
+ private String extractId(final String rawJson, final String idAttribute,
final String flowFileIdAttribute) throws IOException {
+ if (StringUtils.isBlank(idAttribute)) {
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+ try (final JsonParser p = mapper.getFactory().createParser(rawJson)) {
+ while (p.nextToken() != null) {
+ if (idAttribute.equals(p.currentName()) && p.nextToken() !=
null && !p.currentToken().isStructStart()) {
+ final String value = p.getText();
+ return StringUtils.isNotBlank(value) ? value
+ : (StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null);
+ }
+ }
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Extracts the document ID from a pre-parsed JsonNode.
+ * Used for JSON Array Index/Create operations where the node is already
available.
+ */
+ private String extractId(final JsonNode node, final String idAttribute,
final String flowFileIdAttribute) {
+ if (StringUtils.isBlank(idAttribute)) {
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+ final JsonNode idNode = node.get(idAttribute);
+ if (idNode != null && !idNode.isNull()) {
+ return idNode.asText();
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
}
Review Comment:
This changes the semantics of Identifier Attribute for existing Single JSON
users. Currently, the property is documented as "The name of the FlowFile
attribute containing the identifier for the document" and the code only reads
the FlowFile attribute:
```java
final String id = StringUtils.isNotBlank(idAttribute) &&
StringUtils.isNotBlank(input.getAttribute(idAttribute))
? input.getAttribute(idAttribute) : null;
```
With this change, for Index/Create operations, the code first searches
inside the JSON document content for a field with that name
(node.get(idAttribute)), then falls back to the FlowFile attribute. If a user
has Identifier Attribute = "id" and their JSON contains an "id" field, the
document's _id will now come from the JSON content instead of the FlowFile
attribute — a silent behavioral change.
For NDJSON/JSON Array this makes sense (a FlowFile attribute can't
distinguish multiple documents), but for Single JSON the old behavior should be
preserved. Consider using the old FlowFile-attribute-only logic in the Single
JSON path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]