Hi Daniel, This is the user mailing list for Apache Hadoop, not Apache Spark. Please use <[email protected]> instead. https://spark.apache.org/community.html
-Akira On Tue, Dec 3, 2019 at 1:00 AM Daniel Zhang <[email protected]> wrote: > Hi, Spark Users: > > I have a question related to the way I use the spark Dataset API for my > case. > > If the "ds_old" dataset is having 100 records, with 10 unique $"col1", and > for the following pseudo-code: > > val ds_new = ds_old.repartition(5, > $"col1").sortWithinPartitions($"col2").mapPartitions(new MergeFuc) > class MergeFun extends MapPartitionsFunction[InputCaseClass, OutputCaseClass] > { > override def call(input: util.Iterator[InputCaseClass]): > util.Iterator[OutputCaseClass] = {}} > > > I have some questions related to "partition" defined in the above API, and > below is my understanding: > > 1) repartition(5, $"col1") means distributing all 100 records based on 10 > unique col1 values to 5 partitions. There is no guarantee each of these 5 > partitions will have how many/which unique col1 value, but in a > well-balanced hash algorithm, each partition will have close to the average > count (10/5 = 2) for a large unique count of values. > 2) sortWithPartitions($"col2) is one of the parts I want to clear out > here. What is exactly the sortWithPartitions meaning here? I want the data > sorted by "col2" within each unique value of "col1" here, but the Spark API > uses the "partition" term so much in this case. I DON'T WANT the 100 > records sorted within each of the 5 partitions, but within each unique of > "col1". I believe this assumption is right, as we use "repartition" with > "col1" first. Please confirm this. > 3) mapPartitions(new MergeFuc) is another part I want to clear out. I > originally assumed that my merge function will be called/invoked per unique > col1 value (in this case we have 10 partitions). But after the test, I > found out that indeed it is called ONCE per partition of the 5 partitions. > So in this sense, the partition meaning in this API (mapPartitions) IS > DIFFERENT as the partition meaning defined in "sortWithPartitions", > correct? Or my understanding of "partition" in sortWithPartitions is also > WRONG? > > In summary, here are my questions: > 1) We don't want to use "aggregation" API is due to that in my case, some > unique value of "col1" COULD contain a big number of records, and sorting > the data in a specified order per col1 helps our business case for the > merge logic a lot. > 2) We don't want to use "window" function, as the merge logic is indeed an > aggregation > logic. There will be only one record output as per grouping (col1). So even > "window" function comes with sorting, but it doesn't fit in this case. > 3) The unique value count of "col1" is unpredictable for spark, I > understand that. But I wonder if there is an API that can be used to be > called per grouping (per col1), instead of per partition (as defined as 5 > partitions in this case). > 4) If such API doesn't exist, and we have to use MapPartitionsFunction > (The Iterator is much preferred here, as we don't need to worry OOM due to > data skew), my following question is if Spark guarantees that the data > comes within each partition is (col1, col2) order, in the API usage shown > above? Or if Spark will delivery the data of each partition, sorted by > "col2" for the first unique value of col1; then sorted by "col2" for the > second unique value of col1, going forward, etc? > Another challenge is that if our merge function can expect the data in > this order, but have to generate the output per grouping of col1, in an > Iterator format, does Spark have an existing example to refer? > > Thanks > > Yong >
