[
https://issues.apache.org/jira/browse/PIG-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100087#comment-16100087
]
Adam Szita commented on PIG-3655:
---------------------------------
It seems like this is an effect of code in JoinGroupSparkConverter.java and
PairRDDFunctions.scala.
If the POPackage operator returns a
[POStatus.STATUS_NULL|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java#L211],
instead of skipping and awaiting the next tuple [as MR mode
does|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java#L438]
the method in Spark converter returns with null. This null will be picked up
by the caller in Spark code and then [gets written into the (FS)
outputstream|https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1113].
This is how we end up with NULLs in the files.
What I can think of as a fix is to return with a special kind of Tuple which
the record writer knows it has to skip; as it can be seen in
[^PIG-3655.sparkNulls.2.patch]. Although this seems like a hack, the other
option I can think of is to skip NULLs while reading in InterRecordReader - but
then it's just handling the sympthoms and not the root cause.
Any ideas [~rohini], [~kellyzly]?
> BinStorage and InterStorage approach to record markers is broken
> ----------------------------------------------------------------
>
> Key: PIG-3655
> URL: https://issues.apache.org/jira/browse/PIG-3655
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.2.0, 0.3.0, 0.4.0, 0.5.0, 0.6.0, 0.7.0, 0.8.0, 0.8.1,
> 0.9.0, 0.9.1, 0.9.2, 0.10.0, 0.11, 0.10.1, 0.12.0, 0.11.1
> Reporter: Jeff Plaisance
> Assignee: Adam Szita
> Fix For: 0.18.0
>
> Attachments: PIG-3655.0.patch, PIG-3655.1.patch, PIG-3655.2.patch,
> PIG-3655.3.patch, PIG-3655.4.patch, PIG-3655.5.patch,
> PIG-3655.sparkNulls.2.patch, PIG-3655.sparkNulls.patch
>
>
> The way that the record readers for these storage formats seek to the first
> record in an input split is to find the byte sequence 1 2 3 110 for
> BinStorage or 1 2 3 19-21|28-30|36-45 for InterStorage. If this sequence
> occurs in the data for any reason (for example the integer 16909166 stored
> big endian encodes to the byte sequence for BinStorage) other than to mark
> the start of a tuple it can cause mysterious failures in pig jobs because the
> record reader will try to decode garbage and fail.
> For this approach of using an unlikely sequence to mark record boundaries, it
> is important to reduce the probability of the sequence occuring naturally in
> the data by ensuring that your record marker is sufficiently long. Hadoop
> SequenceFile uses 128 bits for this and randomly generates the sequence for
> each file (selecting a fixed, predetermined value opens up the possibility of
> a mean person intentionally sending you that value). This makes it extremely
> unlikely that collisions will occur. In the long run I think that pig should
> also be doing this.
> As a quick fix it might be good to save the current position in the file
> before entering readDatum, and if an exception is thrown seek back to the
> saved position and resume trying to find the next record marker.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)