Hi Daniel,

This is the user mailing list for Apache Hadoop, not Apache Spark.
Please use <[email protected]> instead.
https://spark.apache.org/community.html

-Akira

On Tue, Dec 3, 2019 at 1:00 AM Daniel Zhang <[email protected]> wrote:

> Hi, Spark Users:
>
> I have a question related to the way I use the spark Dataset API for my
> case.
>
> If the "ds_old" dataset is having 100 records, with 10 unique $"col1", and
> for the following pseudo-code:
>
> val ds_new = ds_old.repartition(5, 
> $"col1").sortWithinPartitions($"col2").mapPartitions(new MergeFuc)
> class MergeFun extends MapPartitionsFunction[InputCaseClass, OutputCaseClass] 
> {
>   override def call(input: util.Iterator[InputCaseClass]): 
> util.Iterator[OutputCaseClass] = {}}
>
>
> I have some questions related to "partition" defined in the above API, and
> below is my understanding:
>
> 1) repartition(5, $"col1") means distributing all 100 records based on 10
> unique col1 values to 5 partitions. There is no guarantee each of these 5
> partitions will have how many/which unique col1 value, but in a
> well-balanced hash algorithm, each partition will have close to the average
> count (10/5 = 2) for a large unique count of values.
> 2) sortWithPartitions($"col2) is one of the parts I want to clear out
> here. What is exactly the sortWithPartitions meaning here? I want the data
> sorted by "col2" within each unique value of "col1" here, but the Spark API
> uses the "partition" term so much in this case. I DON'T WANT the 100
> records sorted within each of the 5 partitions, but within each unique of
> "col1". I believe this assumption is right, as we use "repartition" with
> "col1" first. Please confirm this.
> 3) mapPartitions(new MergeFuc) is another part I want to clear out. I
> originally assumed that my merge function will be called/invoked per unique
> col1 value (in this case we have 10 partitions). But after the test, I
> found out that indeed it is called ONCE per partition of the 5 partitions.
> So in this sense, the partition meaning in this API (mapPartitions) IS
> DIFFERENT as the partition meaning defined in "sortWithPartitions",
> correct? Or my understanding of "partition" in sortWithPartitions is also
> WRONG?
>
> In summary, here are my questions:
> 1) We don't want to use "aggregation" API is due to that in my case, some
> unique value of "col1" COULD contain a big number of records, and sorting
> the data in a specified order per col1 helps our business case for the
> merge logic a lot.
> 2) We don't want to use "window" function, as the merge logic is indeed an 
> aggregation
> logic. There will be only one record output as per grouping (col1). So even
> "window" function comes with sorting, but it doesn't fit in this case.
> 3) The unique value count of "col1" is unpredictable for spark, I
> understand that. But I wonder if there is an API that can be used to be
> called per grouping (per col1), instead of per partition (as defined as 5
> partitions in this case).
> 4) If such API doesn't exist, and we have to use MapPartitionsFunction
> (The Iterator is much preferred here, as we don't need to worry OOM due to
> data skew), my following question is if Spark guarantees that the data
> comes within each partition is (col1, col2) order, in the API usage shown
> above? Or if Spark will delivery the data of each partition, sorted by
> "col2" for the first unique value of col1; then sorted by "col2" for the
> second unique value of col1, going forward, etc?
> Another challenge is that if our merge function can expect the data in
> this order, but have to generate the output per grouping of col1, in an
> Iterator format, does Spark have an existing example to refer?
>
> Thanks
>
> Yong
>

Reply via email to