Different modes is good idea.

A. Current processors should continue work without any changes. 
Current flows should continue work without any changes.
Means correlation Id is not required and not used.
This is the default 'ordinary mode'.
(I hope input port can have name which is visible in UI, but this doesn't
affect functionality in any way)

B. Processors can marked as 'correlated inputs' (or whatever correct term we
use).
(probably implementing different abstract base class)
In this case incoming FlowFile's should contain some correlation Id (exact
name may be configurable using properties)
And NiFi framework will use different Flow Controller (scheduler) for such
processors.

2) I'd like to avoid pulling FlowFile from FlowFile Repository unless we are
ready to consume it.
Pull and put back is inefficient.
Sorting all the input queues on correlation identifiers is better.

But is it possible only to get event what new FlowFile is available for this
processor?
In this case Flow Controller (scheduler) just add event data to it's
internal data structure.
Event data should contain correlationId, dataType (should match port name)
and FlowFile unique Id.
Flow controller maintains Map where key is correlationId and value is
ParametersReferencesList.
ParametersReferencesList contains timestamp when its created, correlationId
(maybe not needed because Map key is also correlationId) and placeholders
for parameters references.
Parameters references are FlowFile unique Id-s.

Flow controller finds correct  ParametersReferencesList or create new when
not exist. Adds parameter reference for particular input parameters. And
checks are all required parameters present. If so, actual FlowFiles can be
pulled from  FlowFile Repository and processor invoked.
If not  Flow Controller will do nothing and just returns.

There are some special cases which must be handled. 
First is the case when data for some input port never arrives.
One possibility is to use separate 'timer thread' which regularly scans Flow
Controller  ParametersReferencesList map and removes entries which are older
than some configurable timeout value. Of cource some sort of error handling
must be happen also. At least logging but also to send to processor 'Failed'
output port some notices.
Also during timeout step, unused FlowFiles should be discarded to be used
for this processor.

I don't know NiFi internals very well, so my of ideas maybe doesn't make
much sense.

Thanks
Toivo



--
View this message in context: 
http://apache-nifi-incubating-developer-list.39713.n7.nabble.com/How-to-implement-Scatter-Gather-tp1944p1997.html
Sent from the Apache NiFi (incubating) Developer List mailing list archive at 
Nabble.com.

Reply via email to