Different modes is good idea. A. Current processors should continue work without any changes. Current flows should continue work without any changes. Means correlation Id is not required and not used. This is the default 'ordinary mode'. (I hope input port can have name which is visible in UI, but this doesn't affect functionality in any way)
B. Processors can marked as 'correlated inputs' (or whatever correct term we use). (probably implementing different abstract base class) In this case incoming FlowFile's should contain some correlation Id (exact name may be configurable using properties) And NiFi framework will use different Flow Controller (scheduler) for such processors. 2) I'd like to avoid pulling FlowFile from FlowFile Repository unless we are ready to consume it. Pull and put back is inefficient. Sorting all the input queues on correlation identifiers is better. But is it possible only to get event what new FlowFile is available for this processor? In this case Flow Controller (scheduler) just add event data to it's internal data structure. Event data should contain correlationId, dataType (should match port name) and FlowFile unique Id. Flow controller maintains Map where key is correlationId and value is ParametersReferencesList. ParametersReferencesList contains timestamp when its created, correlationId (maybe not needed because Map key is also correlationId) and placeholders for parameters references. Parameters references are FlowFile unique Id-s. Flow controller finds correct ParametersReferencesList or create new when not exist. Adds parameter reference for particular input parameters. And checks are all required parameters present. If so, actual FlowFiles can be pulled from FlowFile Repository and processor invoked. If not Flow Controller will do nothing and just returns. There are some special cases which must be handled. First is the case when data for some input port never arrives. One possibility is to use separate 'timer thread' which regularly scans Flow Controller ParametersReferencesList map and removes entries which are older than some configurable timeout value. Of cource some sort of error handling must be happen also. At least logging but also to send to processor 'Failed' output port some notices. Also during timeout step, unused FlowFiles should be discarded to be used for this processor. I don't know NiFi internals very well, so my of ideas maybe doesn't make much sense. Thanks Toivo -- View this message in context: http://apache-nifi-incubating-developer-list.39713.n7.nabble.com/How-to-implement-Scatter-Gather-tp1944p1997.html Sent from the Apache NiFi (incubating) Developer List mailing list archive at Nabble.com.
