agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3042443107
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -145,183 +160,841 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.required(true)
.build();
+ static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Batch Size")
+ .description("""
+ The maximum amount of data to send in a single
Elasticsearch _bulk API request. \
+ For NDJSON and JSON Array modes, FlowFiles are accumulated
until this threshold is reached, then flushed. \
+ For Single JSON mode, this acts as a size-based safety
limit: if the accumulated FlowFiles exceed this size \
+ before Max FlowFiles Per Batch is reached, the request is
flushed early. \
+ Elasticsearch recommends 5-15 MB per bulk request for
optimal performance. \
+ To disable the size-based limit, check "Set Empty String".\
+ """)
+ .defaultValue("10 MB")
+ .addValidator((subject, input, context) ->
StringUtils.isBlank(input)
+ ? new
ValidationResult.Builder().subject(subject).valid(true).build()
+ : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject,
input, context))
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor INPUT_FORMAT = new
PropertyDescriptor.Builder()
+ .name("Input Content Format")
+ .description("""
+ The format of the JSON content in each FlowFile. \
+ NDJSON: one JSON object per line (newline-delimited). \
+ JSON Array: a top-level JSON array of objects, streamed
element-by-element for memory efficiency. \
+ Single JSON: the entire FlowFile is a single JSON
document.\
+ """)
+ .allowableValues(InputFormat.class)
+ .defaultValue(InputFormat.SINGLE_JSON.getValue())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+ .name("Max FlowFiles Per Batch")
+ .displayName("Max FlowFiles Per Batch")
+ .description("""
+ The maximum number of FlowFiles to include in a single
Elasticsearch _bulk API request. \
+ If the accumulated FlowFiles exceed Max Batch Size before
this count is reached, the request will be flushed early.\
+ """)
+ .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+ .build();
+
+ static final PropertyDescriptor SUPPRESS_NULLS = new
PropertyDescriptor.Builder()
+ .name("Suppress Nulls")
+ .description("""
+ When set to Always Suppress, null and empty values are
removed from documents before they are sent to Elasticsearch.
+ This setting applies to NDJSON and JSON Array formats for
Index and Create operations only. \
+ For Single JSON, configure null suppression on the
controller service instead.
+ Performance note: for JSON Array the impact is negligible
since documents are already being parsed. \
+ For NDJSON, each line must be parsed and re-serialized
when suppression is enabled, \
+ which adds overhead compared to the default behaviour of
passing lines through as raw bytes.\
+ """)
+ .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS,
ElasticSearchClientService.ALWAYS_SUPPRESS)
+ .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+ .required(true)
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final PropertyDescriptor ID_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("Identifier Attribute")
+ .description("""
+ The name of the FlowFile attribute containing the
identifier for the document. \
+ If the Index Operation is "index", this property may be
left empty or evaluate to an empty value, \
+ in which case the document's identifier will be
auto-generated by Elasticsearch. \
+ For all other Index Operations, the attribute must
evaluate to a non-empty value.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+ .build();
+
+ static final PropertyDescriptor IDENTIFIER_FIELD = new
PropertyDescriptor.Builder()
+ .name("Identifier Field")
+ .description("""
+ The name of the field within each document to use as the
Elasticsearch document ID. \
+ If the field is not present in a document or this property
is left blank, no document ID is set \
+ and Elasticsearch will auto-generate one.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+ .name("bulk_request")
+ .description("When \"Output Bulk Request\" is enabled, the raw
Elasticsearch _bulk API request body is written " +
+ "to this relationship as a FlowFile for inspection or
debugging.")
+ .build();
+
+ static final PropertyDescriptor OUTPUT_BULK_REQUEST = new
PropertyDescriptor.Builder()
+ .name("Output Bulk Request")
+ .description("If enabled, each Elasticsearch _bulk request body is
written as a FlowFile to the \"" +
+ REL_BULK_REQUEST.getName() + "\" relationship. Useful for
debugging. " +
+ "Each FlowFile contains the full NDJSON body exactly as
sent to Elasticsearch.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
static final List<PropertyDescriptor> DESCRIPTORS = List.of(
- ID_ATTRIBUTE,
INDEX_OP,
INDEX,
TYPE,
SCRIPT,
SCRIPTED_UPSERT,
DYNAMIC_TEMPLATES,
+ INPUT_FORMAT,
BATCH_SIZE,
+ MAX_BATCH_SIZE,
+ SUPPRESS_NULLS,
+ ID_ATTRIBUTE,
+ IDENTIFIER_FIELD,
CHARSET,
MAX_JSON_FIELD_STRING_LENGTH,
CLIENT_SERVICE,
LOG_ERROR_RESPONSES,
OUTPUT_ERROR_RESPONSES,
+ OUTPUT_BULK_REQUEST,
NOT_FOUND_IS_SUCCESSFUL
);
+
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL,
REL_ERRORS);
+ private static final int READER_BUFFER_SIZE = 65536;
+
+ private final AtomicBoolean bulkRequestOutputEnabled = new
AtomicBoolean(false);
+ private boolean outputBulkRequest;
+ private ObjectReader mapReader;
+ private ObjectWriter suppressingWriter;
+
@Override
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+ if (bulkRequestOutputEnabled.get()) {
+ rels.add(REL_BULK_REQUEST);
+ }
+ return rels;
+ }
+
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+ bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+ }
+ }
+
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
+ // Migrate legacy property names from PutElasticsearchJson
config.removeProperty("put-es-json-error-documents");
config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
config.renameProperty("put-es-json-script", SCRIPT.getName());
config.renameProperty("put-es-json-scripted-upsert",
SCRIPTED_UPSERT.getName());
config.renameProperty("put-es-json-dynamic_templates",
DYNAMIC_TEMPLATES.getName());
config.renameProperty("put-es-json-charset", CHARSET.getName());
config.renameProperty("put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+ // If INPUT_FORMAT was not explicitly set, this flow was migrated from
PutElasticsearchJson — default to Single JSON.
+ // Set this before migrating BATCH_SIZE so its dependsOn condition is
satisfied when NiFi processes the property.
+ if (!config.hasProperty(INPUT_FORMAT.getName())) {
+ config.setProperty(INPUT_FORMAT.getName(),
InputFormat.SINGLE_JSON.getValue());
+ }
+
+ // Migrate "Batch Size" (from PutElasticsearchJson) to the new name
used in this processor.
+ // INPUT_FORMAT is set first above so its dependsOn condition is
satisfied when NiFi processes BATCH_SIZE.
+ config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(),
BATCH_SIZE.getName());
+
+ // MAX_BATCH_SIZE is a new property — existing configurations had no
byte-based limit.
+ // Preserve that behavior on upgrade by leaving the property blank
(unbounded).
+ // The custom validator on MAX_BATCH_SIZE accepts blank values, so
this will not be
+ // overwritten by the default of 10 MB. New processor instances get
the 10 MB default.
+ if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+ config.setProperty(MAX_BATCH_SIZE.getName(), "");
+ }
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
+ // PutElasticsearchJson used "success" before it was renamed to
"original"
config.renameRelationship("success",
AbstractPutElasticsearch.REL_ORIGINAL.getName());
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
-
this.notFoundIsSuccessful =
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+ this.outputBulkRequest =
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+ this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+ this.mapReader = mapper.readerFor(new TypeReference<Map<String,
Object>>() { });
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ if (inputFormat != InputFormat.SINGLE_JSON
+ &&
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
{
+ final ObjectMapper suppressingMapper = mapper.copy()
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
+ this.suppressingWriter = suppressingMapper.writer();
+ } else {
+ this.suppressingWriter = null;
+ }
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final int batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
- final List<FlowFile> flowFiles = session.get(batchSize);
- if (flowFiles.isEmpty()) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
return;
}
+ final String maxBatchSizeStr =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().getValue();
+ final long maxBatchBytes = StringUtils.isNotBlank(maxBatchSizeStr)
+ ?
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()
+ : Long.MAX_VALUE;
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ final String idAttribute = inputFormat == InputFormat.SINGLE_JSON
+ ? context.getProperty(ID_ATTRIBUTE).getValue()
+ : null;
+ final String documentIdField = inputFormat != InputFormat.SINGLE_JSON
+ ? context.getProperty(IDENTIFIER_FIELD).getValue()
+ : null;
+ final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+ ?
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+ : Integer.MAX_VALUE;
+ int flowFilesProcessed = 0;
+
+ // Tracks all FlowFiles that were successfully parsed and submitted
(even partially)
+ final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+ // Tracks FlowFiles that had at least one Elasticsearch document error
+ final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+ // Deferred bulk-error attributes: applied after each FlowFile's
InputStream is closed
+ final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new
HashMap<>();
+ // Failed local record indices per FlowFile — O(error count) memory;
used at finalization
+ // to re-read FlowFiles once and reconstruct error/success content
without buffering all bytes
+ final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new
LinkedHashMap<>();
+
+ // Current chunk accumulation — operationFlowFiles and
operationRecordIndices are parallel to operations (same index)
+ final List<FlowFile> operationFlowFiles = new ArrayList<>();
+ final List<IndexOperationRequest> operations = new ArrayList<>();
+ final List<Integer> operationRecordIndices = new ArrayList<>();
+ long totalBytesAccumulated = 0;
+ long chunkBytes = 0;
+
+ while (flowFile != null) {
+ final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ final String flowFileIdAttribute =
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
+
+ final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, flowFile);
+ final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+ final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, flowFile);
+ final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
+
+ boolean parseError = false;
+ try {
+ final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
+ if (InputFormat.NDJSON == inputFormat) {
+ // NDJSON: each non-blank line is one JSON document.
+ // Index/Create operations pass raw UTF-8 bytes directly
to avoid Map allocation.
+ // Update/Delete/Upsert parse into a Map so the bulk
serializer can wrap the payload.
+ int localRecordIndex = 0;
+ try (final BufferedReader reader = new BufferedReader(
+ new InputStreamReader(session.read(flowFile),
charset), READER_BUFFER_SIZE)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String trimmedLine = line.trim();
+ if (trimmedLine.isEmpty()) {
+ continue;
+ }
+ final IndexOperationRequest opRequest;
+ final long docBytes;
+ if (o == IndexOperationRequest.Operation.Index ||
o == IndexOperationRequest.Operation.Create) {
+ final String id = extractId(trimmedLine,
documentIdField, flowFileIdAttribute);
+ final byte[] rawJsonBytes;
+ if (suppressingWriter != null) {
+ // Parse to Map so NON_NULL/NON_EMPTY
inclusion filters apply during serialization.
+ // JsonNode tree serialization bypasses
JsonInclude filters.
+ rawJsonBytes =
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+ } else {
+ rawJsonBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8);
+ }
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes = rawJsonBytes.length;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(trimmedLine);
+ final String id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } else if (InputFormat.JSON_ARRAY == inputFormat) {
+ // JSON Array: the FlowFile may contain one or more
top-level JSON arrays.
+ // Arrays are read sequentially; elements within each
array are streamed one at a
+ // time via JsonParser to avoid loading the entire content
into memory.
+ // Each element is re-serialized to compact bytes for
Index/Create,
+ // or parsed into a Map for Update/Delete/Upsert.
+ int localRecordIndex = 0;
+ try (final InputStreamReader isr = new
InputStreamReader(session.read(flowFile), charset);
+ final JsonParser parser =
mapper.getFactory().createParser(isr)) {
+ JsonToken outerToken;
+ while ((outerToken = parser.nextToken()) != null) {
+ if (outerToken != JsonToken.START_ARRAY) {
+ throw new IOException("Expected a JSON array
but found: " + outerToken);
+ }
+ JsonToken token;
+ while ((token = parser.nextToken()) !=
JsonToken.END_ARRAY) {
+ if (token == null) {
+ throw new IOException("Malformed JSON
Array: reached end of stream before the closing ']'. " +
+ "Verify the FlowFile content is a
complete, valid JSON array.");
+ }
+ final long startOffset =
parser.currentTokenLocation().getCharOffset();
+ final IndexOperationRequest opRequest;
+ if (o == IndexOperationRequest.Operation.Index
|| o == IndexOperationRequest.Operation.Create) {
+ final long docBytes;
+ final byte[] rawJsonBytes;
+ final String id;
+ if (suppressingWriter != null) {
+ // Parse directly to Map so
NON_NULL/NON_EMPTY inclusion filters apply during
+ // serialization. JsonNode tree
serialization bypasses JsonInclude filters,
+ // and convertValue(node, Map) adds an
extra serialization cycle.
+ final Map<String, Object> contentMap =
mapReader.readValue(parser);
+ docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ rawJsonBytes =
suppressingWriter.writeValueAsBytes(contentMap);
+ id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ } else {
+ final JsonNode node =
mapper.readTree(parser);
+ docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ rawJsonBytes =
mapper.writeValueAsBytes(node);
+ id = extractId(node, documentIdField,
flowFileIdAttribute);
+ }
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(parser);
+ final long docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ final String id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ }
+ } else {
+ // Single JSON: the entire FlowFile is one document parsed
into a Map.
+ // The client service serializes the Map, preserving
null-suppression settings.
+ try (final InputStream in = session.read(flowFile)) {
+ final Map<String, Object> contentMap =
mapReader.readValue(in);
+ final String id =
StringUtils.isNotBlank(flowFileIdAttribute) ? flowFileIdAttribute : null;
+ final IndexOperationRequest opRequest =
IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(0);
+ final long docBytes = flowFile.getSize();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } catch (final ElasticsearchException ese) {
+ final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure.");
+ getLogger().error(msg, ese);
+ final Relationship rel = ese.isElastic() ? REL_RETRY :
REL_FAILURE;
+ // Route only the failing in-flight chunk to retry/failure.
FlowFiles already
+ // successfully indexed by prior _bulk requests are routed
normally to avoid
+ // duplicate indexing if those FlowFiles are re-processed on
retry.
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ese, rel, session, true,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
+ return;
+ } catch (final IOException ioe) {
+ getLogger().error("Could not read FlowFile content as valid
{}.", inputFormat, ioe);
+ removeFlowFileFromChunk(flowFile, operations,
operationFlowFiles, operationRecordIndices);
+ flowFile = session.putAttribute(flowFile,
"elasticsearch.put.error", ioe.getMessage());
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ parseError = true;
+ } catch (final Exception ex) {
+ getLogger().error("Failed processing records.", ex);
+ removeFlowFileFromChunk(flowFile, operations,
operationFlowFiles, operationRecordIndices);
+ flowFile = session.putAttribute(flowFile,
"elasticsearch.put.error", ex.getMessage());
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ parseError = true;
+ }
+
+ if (!parseError) {
+ allProcessedFlowFiles.add(flowFile);
+ // InputStream is now closed — safe to apply any deferred
bulk-error attributes
+ applyPendingBulkErrors(flowFile, pendingBulkErrors, session);
+ }
- final String idAttribute =
context.getProperty(ID_ATTRIBUTE).getValue();
+ flowFilesProcessed++;
- final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
- final List<IndexOperationRequest> operations = new
ArrayList<>(flowFiles.size());
+ // For Single JSON, stop when the count-based batch limit is
reached; otherwise use size limit
+ if (InputFormat.SINGLE_JSON == inputFormat ? flowFilesProcessed >=
batchSize : totalBytesAccumulated >= maxBatchBytes) {
+ break;
+ }
- for (final FlowFile input : flowFiles) {
- addOperation(operations, originals, idAttribute, context, session,
input);
+ flowFile = session.get();
}
- if (!originals.isEmpty()) {
+ // Flush any remaining operations (all InputStreams are closed at this
point)
+ if (!operations.isEmpty()) {
try {
- final List<FlowFile> errorDocuments =
indexDocuments(operations, originals, context, session);
- handleResponse(context, session, errorDocuments, originals);
- session.transfer(originals, REL_ORIGINAL);
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, null, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
- ese.isElastic() ? "Routing to retry." : "Routing to
failure");
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure.");
getLogger().error(msg, ese);
final Relationship rel = ese.isElastic() ? REL_RETRY :
REL_FAILURE;
- transferFlowFilesOnException(ese, rel, session, true,
originals.toArray(new FlowFile[0]));
- } catch (final JsonProcessingException jpe) {
- getLogger().warn("Could not log Elasticsearch operation errors
nor determine which documents errored.", jpe);
- transferFlowFilesOnException(jpe, REL_ERRORS, session, true,
originals.toArray(new FlowFile[0]));
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ese, rel, session, true,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
+ return;
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
- transferFlowFilesOnException(ex, REL_FAILURE, session, false,
originals.toArray(new FlowFile[0]));
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ex, REL_FAILURE, session, false,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
context.yield();
+ return;
}
+ }
+
+ if (allProcessedFlowFiles.isEmpty()) {
+ getLogger().warn("No records successfully parsed for sending to
Elasticsearch");
} else {
- getLogger().warn("No FlowFiles successfully parsed for sending to
Elasticsearch");
+ handleFinalResponse(context, session, errorFlowFiles,
allProcessedFlowFiles, pendingErrorRecordIndices, inputFormat);
}
}
- private void addOperation(final List<IndexOperationRequest> operations,
final List<FlowFile> originals, final String idAttribute,
- final ProcessContext context, final
ProcessSession session, FlowFile input) {
- final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
- final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
- final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
- final String id = StringUtils.isNotBlank(idAttribute) &&
StringUtils.isNotBlank(input.getAttribute(idAttribute)) ?
input.getAttribute(idAttribute) : null;
-
- final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, input);
- final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean();
- final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, input);
-
- final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, input);
- final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
-
- final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
-
- try (final InputStream inStream = session.read(input)) {
- final byte[] result = IOUtils.toByteArray(inStream);
- @SuppressWarnings("unchecked")
- final Map<String, Object> contentMap = mapper.readValue(new
String(result, charset), Map.class);
-
- final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
- operations.add(new IndexOperationRequest(index, type, id,
contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap,
bulkHeaderFields));
-
- originals.add(input);
- } catch (final IOException ioe) {
- getLogger().error("Could not read FlowFile content valid JSON.",
ioe);
- input = session.putAttribute(input, "elasticsearch.put.error",
ioe.getMessage());
- session.penalize(input);
- session.transfer(input, REL_FAILURE);
- } catch (final Exception ex) {
- getLogger().error("Could not index documents.", ex);
- input = session.putAttribute(input, "elasticsearch.put.error",
ex.getMessage());
- session.penalize(input);
- session.transfer(input, REL_FAILURE);
+ /**
+ * Removes all entries belonging to {@code target} from the three parallel
lists, so that a
+ * FlowFile that failed mid-read does not leave stale references that
would cause
+ * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}
on the next flush.
+ */
+ private void removeFlowFileFromChunk(final FlowFile target,
+ final List<IndexOperationRequest>
operations,
+ final List<FlowFile>
operationFlowFiles,
+ final List<Integer>
operationRecordIndices) {
+ for (int i = operations.size() - 1; i >= 0; i--) {
+ if (operationFlowFiles.get(i) == target) {
+ operations.remove(i);
+ operationFlowFiles.remove(i);
+ operationRecordIndices.remove(i);
+ }
}
}
- @SuppressWarnings("unchecked")
- private Map<String, Object> getMapFromAttribute(final PropertyDescriptor
propertyDescriptor, final ProcessContext context, final FlowFile input) {
- final String propertyValue =
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+ /**
+ * Applies any bulk-error attributes that were deferred for {@code
flowFile} while its
+ * InputStream was open. Must only be called after the InputStream has
been closed.
+ */
+ private void applyPendingBulkErrors(final FlowFile flowFile,
+ final Map<FlowFile, List<Map<String,
Object>>> pendingBulkErrors,
+ final ProcessSession session) {
+ final List<Map<String, Object>> errorList =
pendingBulkErrors.remove(flowFile);
+ if (errorList == null) {
+ return;
+ }
try {
- return StringUtils.isNotBlank(propertyValue) ?
mapper.readValue(propertyValue, Map.class) : Collections.emptyMap();
- } catch (final JsonProcessingException jpe) {
- throw new ProcessException(propertyDescriptor.getDisplayName() + "
must be a String parsable into a JSON Object", jpe);
+ final Object errorObj = errorList.size() == 1 ? errorList.get(0) :
errorList;
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
mapper.writeValueAsString(errorObj));
+ } catch (final JsonProcessingException e) {
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
+ String.format("{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")));
}
}
- private List<FlowFile> indexDocuments(final List<IndexOperationRequest>
operations, final List<FlowFile> originals, final ProcessContext context, final
ProcessSession session) throws IOException {
- final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, originals.getFirst());
+ /**
+ * Sends the accumulated batch of operations to Elasticsearch via the
_bulk API and records
+ * any per-document errors. If {@code openStreamFlowFile} is non-null,
error attributes for
+ * that FlowFile are deferred into {@code pendingBulkErrors} so they are
applied after its
+ * InputStream is closed (calling {@code putAttribute} on an open FlowFile
throws
+ * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}).
+ * <p>
+ * Only the local record index of each failed operation is stored (O(error
count) memory).
+ * Error/success content is reconstructed by re-reading the FlowFile in
+ * {@link #handleFinalResponse}.
+ */
+ private void flushChunk(final List<IndexOperationRequest> operations,
final List<FlowFile> operationFlowFiles,
+ final List<Integer> operationRecordIndices,
+ final Set<FlowFile> errorFlowFiles, final
FlowFile openStreamFlowFile,
+ final Map<FlowFile, List<Map<String, Object>>>
pendingBulkErrors,
+ final Map<FlowFile, Set<Integer>>
pendingErrorRecordIndices,
+ final ProcessContext context, final
ProcessSession session) throws IOException {
+ if (outputBulkRequest) {
+ outputBulkRequestFlowFile(operations, operationFlowFiles, session);
+ }
+
+ final FlowFile firstFlowFile = operationFlowFiles.get(0);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, firstFlowFile);
final IndexOperationResponse response =
clientService.get().bulk(operations,
- new
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties),
getRequestHeadersFromDynamicProperties(context, originals.getFirst())));
+ new
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties),
getRequestHeadersFromDynamicProperties(context, firstFlowFile)));
final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
- final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
+
+ // Attach per-FlowFile error attributes; defer putAttribute for the
FlowFile whose
+ // InputStream is currently open (putAttribute would throw
FlowFileHandlingException).
+ // Record only the local record index of each failure — content is
reconstructed at finalization.
+ final Map<FlowFile, List<Map<String, Object>>> flowFileErrors = new
HashMap<>();
errors.forEach((index, error) -> {
- String errorMessage;
- try {
- errorMessage = mapper.writeValueAsString(error);
- } catch (JsonProcessingException e) {
- errorMessage = String.format(
- "{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
- e.getMessage().replace("\"", "\\\"")
- );
+ final FlowFile flowFile = operationFlowFiles.get(index);
+ errorFlowFiles.add(flowFile);
+ flowFileErrors.computeIfAbsent(flowFile, k -> new
ArrayList<>()).add(error);
+ pendingErrorRecordIndices.computeIfAbsent(flowFile, k -> new
HashSet<>()).add(operationRecordIndices.get(index));
+ });
+ flowFileErrors.forEach((flowFile, errorList) -> {
+ if (flowFile == openStreamFlowFile) {
+ // Defer: InputStream still open — apply after the
try-with-resources closes
+ pendingBulkErrors.computeIfAbsent(flowFile, k -> new
ArrayList<>()).addAll(errorList);
+ } else {
+ try {
+ final Object errorObj = errorList.size() == 1 ?
errorList.get(0) : errorList;
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
mapper.writeValueAsString(errorObj));
+ } catch (final JsonProcessingException e) {
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
+ String.format("{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")));
+ }
}
- errorDocuments.add(session.putAttribute(originals.get(index),
"elasticsearch.bulk.error", errorMessage));
});
if (!errors.isEmpty()) {
- handleElasticsearchDocumentErrors(errors, session, null);
+ handleElasticsearchDocumentErrors(errors, session, firstFlowFile);
}
+ }
+
+ /**
+ * Serializes the pending batch into an Elasticsearch _bulk NDJSON body
and writes it to a
+ * new FlowFile on the {@code bulk_request} relationship for inspection or
debugging.
+ * Each operation produces an action metadata line followed by a document
line
+ * (Delete operations have no document line). Failures here are logged and
swallowed so
+ * that a debug-output failure does not abort the actual indexing.
+ */
+ private void outputBulkRequestFlowFile(final List<IndexOperationRequest>
operations,
+ final List<FlowFile>
operationFlowFiles,
+ final ProcessSession session) {
+ final FlowFile parent = operationFlowFiles.get(0);
+ FlowFile bulkRequestFF = session.create(parent);
+ try (final OutputStream out = session.write(bulkRequestFF)) {
+ for (final IndexOperationRequest op : operations) {
+ // Action metadata line
+ final Map<String, Object> actionBody = new LinkedHashMap<>();
+ if (StringUtils.isNotBlank(op.getIndex())) {
+ actionBody.put("_index", op.getIndex());
+ }
+ if (StringUtils.isNotBlank(op.getType())) {
+ actionBody.put("_type", op.getType());
+ }
+ if (StringUtils.isNotBlank(op.getId())) {
+ actionBody.put("_id", op.getId());
+ }
+ if (op.getDynamicTemplates() != null &&
!op.getDynamicTemplates().isEmpty()) {
+ actionBody.put("dynamic_templates",
op.getDynamicTemplates());
+ }
+ if (op.getHeaderFields() != null) {
+ actionBody.putAll(op.getHeaderFields());
+ }
- return errorDocuments;
+ // Upsert maps to "update" in the ES bulk API
+ final String actionName = op.getOperation() ==
IndexOperationRequest.Operation.Upsert
+ ? "update" : op.getOperation().getValue();
+ final Map<String, Object> actionLine = Map.of(actionName,
actionBody);
+ out.write(mapper.writeValueAsBytes(actionLine));
+ out.write('\n');
+
+ // Document line (delete has no document)
+ if (op.getOperation() !=
IndexOperationRequest.Operation.Delete) {
+ if (op.getRawJsonBytes() != null) {
+ // Index/Create with rawJson path — write directly, no
Map round-trip
+ out.write(op.getRawJsonBytes());
+ } else {
+ final Map<String, Object> docLine = new
LinkedHashMap<>();
+ if (op.getOperation() ==
IndexOperationRequest.Operation.Update
+ || op.getOperation() ==
IndexOperationRequest.Operation.Upsert) {
+ if (op.getScript() != null &&
!op.getScript().isEmpty()) {
+ docLine.put("script", op.getScript());
+ if (op.isScriptedUpsert()) {
+ docLine.put("scripted_upsert", true);
+ }
+ if (op.getFields() != null &&
!op.getFields().isEmpty()) {
+ docLine.put("upsert", op.getFields());
+ }
+ } else {
+ docLine.put("doc", op.getFields());
+ if (op.getOperation() ==
IndexOperationRequest.Operation.Upsert) {
+ docLine.put("doc_as_upsert", true);
+ }
+ }
+ } else {
+ docLine.putAll(op.getFields());
+ }
+ out.write(mapper.writeValueAsBytes(docLine));
+ }
+ out.write('\n');
+ }
+ }
+ } catch (final IOException e) {
+ getLogger().warn("Failed to write bulk request FlowFile for
inspection.", e);
+ session.remove(bulkRequestFF);
+ return;
+ }
+ bulkRequestFF = session.putAttribute(bulkRequestFF,
"elasticsearch.bulk.operation.count", String.valueOf(operations.size()));
+ session.transfer(bulkRequestFF, REL_BULK_REQUEST);
}
- private void handleResponse(final ProcessContext context, final
ProcessSession session, final List<FlowFile> errorDocuments, final
List<FlowFile> originals) {
- // clone FlowFiles to be transferred to errors/successful as the
originals are pass through to REL_ORIGINAL
- final List<FlowFile> copiedErrors =
errorDocuments.stream().map(session::clone).toList();
+ /**
+ * Extracts the document ID from a raw JSON string using a streaming
parser.
+ * Stops as soon as the target field is found, avoiding a full tree parse.
+ * Used for Index/Create operations to avoid the Map allocation overhead.
+ */
+ private String extractId(final String rawJson, final String idAttribute,
final String flowFileIdAttribute) throws IOException {
+ if (StringUtils.isBlank(idAttribute)) {
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+ try (final JsonParser p = mapper.getFactory().createParser(rawJson)) {
+ while (p.nextToken() != null) {
+ if (idAttribute.equals(p.currentName()) && p.nextToken() !=
null && !p.currentToken().isStructStart()) {
+ final String value = p.getText();
+ return StringUtils.isNotBlank(value) ? value
+ : (StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null);
+ }
+ }
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Extracts the document ID from a pre-parsed JsonNode.
+ * Used for JSON Array Index/Create operations where the node is already
available.
+ */
+ private String extractId(final JsonNode node, final String idAttribute,
final String flowFileIdAttribute) {
+ if (StringUtils.isBlank(idAttribute)) {
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+ final JsonNode idNode = node.get(idAttribute);
+ if (idNode != null && !idNode.isNull()) {
+ return idNode.asText();
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Resolves the document ID for Update/Delete/Upsert operations from the
already-parsed content Map.
+ * Falls back to the FlowFile attribute value when the field is absent
from the document.
+ */
+ private String resolveId(final Map<String, Object> contentMap, final
String idAttribute, final String flowFileIdAttribute) {
+ if (StringUtils.isBlank(idAttribute)) {
+ return null;
+ }
+ final Object idObj = contentMap.get(idAttribute);
+ if (idObj != null) {
+ return idObj.toString();
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Reads a processor property as a JSON Object string and deserializes it
into a Map.
+ * Returns an empty Map when the property is blank. Throws {@link
ProcessException} if the
+ * value is not valid JSON or not a JSON Object.
+ */
+ private Map<String, Object> getMapFromAttribute(final PropertyDescriptor
propertyDescriptor, final ProcessContext context, final FlowFile input) {
+ final String propertyValue =
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+ try {
+ return StringUtils.isNotBlank(propertyValue) ?
mapper.readValue(propertyValue, new TypeReference<Map<String, Object>>() { }) :
Collections.emptyMap();
+ } catch (final JsonProcessingException jpe) {
+ throw new ProcessException(propertyDescriptor.getDisplayName() + "
must be a String parsable into a JSON Object", jpe);
+ }
+ }
+
+ /**
+ * Routes all successfully processed FlowFiles to their final
relationships after all
+ * _bulk requests for the trigger have completed.
+ * <ul>
+ * <li>FlowFiles with no errors are cloned to {@code REL_SUCCESSFUL}
without re-reading.</li>
+ * <li>FlowFiles with errors are re-read exactly once to split records
by failed indices:
+ * failed records go to a clone on {@code REL_ERRORS}; successful
records (if any) go
+ * to a clone on {@code REL_SUCCESSFUL}. For SINGLE_JSON the
FlowFile is always a single
+ * document so a direct clone is used instead of re-reading.</li>
+ * <li>All FlowFiles go to {@code REL_ORIGINAL}.</li>
+ * </ul>
+ */
+ private void handleFinalResponse(final ProcessContext context, final
ProcessSession session,
+ final Set<FlowFile> errorFlowFiles, final
Set<FlowFile> allFlowFiles,
+ final Map<FlowFile, Set<Integer>>
pendingErrorRecordIndices,
+ final InputFormat inputFormat) {
+ final List<FlowFile> copiedErrors = new ArrayList<>();
+ final List<FlowFile> successfulDocuments = new ArrayList<>();
+
+ for (final FlowFile ff : allFlowFiles) {
+ if (!errorFlowFiles.contains(ff)) {
+ // All records succeeded: clone the original without re-reading
+ successfulDocuments.add(session.clone(ff));
+ } else if (inputFormat == InputFormat.SINGLE_JSON) {
+ // One document per FlowFile — it failed entirely; clone
directly
+ copiedErrors.add(session.clone(ff));
+ } else {
+ // NDJSON or JSON Array: re-read once and split records into
error/success by index
+ final Set<Integer> failedIndices =
pendingErrorRecordIndices.getOrDefault(ff, Collections.emptySet());
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(ff).getValue();
+ final ByteArrayOutputStream errorBaos = new
ByteArrayOutputStream();
+ final ByteArrayOutputStream successBaos = new
ByteArrayOutputStream();
+ session.read(ff, in -> splitRecords(in, failedIndices,
charset, inputFormat, errorBaos, successBaos));
+
+ if (errorBaos.size() > 0) {
+ final byte[] errorBytes = errorBaos.toByteArray();
+ FlowFile errorFf = session.clone(ff);
+ errorFf = session.write(errorFf, out ->
out.write(errorBytes));
+ copiedErrors.add(errorFf);
+ }
+ if (successBaos.size() > 0) {
+ final byte[] successBytes = successBaos.toByteArray();
+ FlowFile successFf = session.clone(ff);
Review Comment:
fixed by removing elasticsearch.bulk.error from the success clone after
splitting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]