So I finally figured it out on my own. I had to basically write my custom
ordering function as a separate scala project and then call that in clojure.
I had my scala file written in this manner:
import org.apache.spark.Partitionerimport org.apache.spark.rdd.RDD
case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)class
RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be
negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}}object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}}
class rfmcSort {
def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts:
Int): RDD[(RFMCKey, String)] = {
val x = a.map(v => v match {
case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId,
rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
}).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
x
}}
I compiled it as ascala project and used it in my clojure code this way:
(:import [org.formcept.wisdom rfmcSort]
[org.apache.spark.rdd.RDD])
sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records)
num_partitions))
Please notice the way I am calling the sortWithRFMC function from the
rfmcSort object that I created. Also one very important thing to note here
is when you pass your JavaPairRDD to your scala function, you have to
convert it into a normal spark RDD first by calling the .rdd method on it.
And then you have to convert the spark RDD back to JavaPairRDD to work with
it in clojure.
And sorry that I got your name wrong *Blake :)
On Tuesday, July 12, 2016 at 12:16:42 AM UTC+5:30, Punit Naik wrote:
>
> Hi Black
>
> Thanks for the reply but figured it out on my own. Posting the answer
> after this.
>
> On Monday, July 11, 2016 at 11:42:10 PM UTC+5:30, Blake Miller wrote:
>>
>> Hi Punit
>>
>> The behavior you are referring to is a feature of the Scala compiler,
>> which is why it does not happen automatically when you try to use it from
>> Clojure.
>>
>> Please see the note here:
>>
>>
>> https://github.com/t6/from-scala/blob/4e1752aaa2ef835dd67a8404273bee067510a431/test/t6/from_scala/guide.clj#L161-L166
>>
>> You may find that library a useful resource, either as a dependency or
>> simply as reference material.
>>
>> What you want to do is find the full method signature, including the
>> implicits, and invoke _that_ from clojure, passing values for all implicit
>> parameters (in this case, your custom ordering function.
>>
>> HTH
>>
>> On Saturday, July 9, 2016 at 6:13:17 AM UTC, Punit Naik wrote:
>>>
>>> Hi Ashish
>>>
>>> The "package" is indeed the full package name.
>>> On 09-Jul-2016 11:02 AM, "Ashish Negi" <[email protected]> wrote:
>>>
>>>> Should not be `package` in `:import` be the actual package name of `
>>>> RFMCPartitioner` ?
>>>>
>>>> see examples at https://clojuredocs.org/clojure.core/import
>>>>
>>>> like :
>>>>
>>>> (ns foo.bar
>>>> (:import (java.util Date
>>>> Calendar)
>>>> (java.util.logging Logger
>>>> Level)))
>>>>
>>>>
>>>>
>>>> (ns xyz
>>>> (:import
>>>> [** RFMCPartitioner]
>>>> [** RFMCKey]
>>>> )
>>>> )
>>>>
>>>>
>>>> where ** is package full name.
>>>>
>>>>
>>>>
>>>> On Friday, 8 July 2016 21:31:27 UTC+5:30, Punit Naik wrote:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I have a scala program in which I have implemented a secondary sort
>>>>> which works perfectly. The way I have written that program is:
>>>>>
>>>>> object rfmc {
>>>>> // Custom Key and partitioner
>>>>>
>>>>> case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C:
>>>>> Double)
>>>>> class RFMCPartitioner(partitions: Int) extends Partitioner {
>>>>> require(partitions >= 0, "Number of partitions ($partitions) cannot
>>>>> be negative.")
>>>>> override def numPartitions: Int = partitions
>>>>> override def getPartition(key: Any): Int = {
>>>>> val k = key.asInstanceOf[RFMCKey]
>>>>> k.cId.hashCode() % numPartitions
>>>>> }
>>>>> }
>>>>> object RFMCKey {
>>>>> implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
>>>>> Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
>>>>> }
>>>>> }
>>>>> // The body of the code
>>>>> //
>>>>> //
>>>>> val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
>>>>> val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))}
>>>>>
>>>>> I wanted to implement the same thing using clojure's DSL for spark
>>>>> called flambo. Since I can't write partitioner using clojure, I re-used
>>>>> the
>>>>> code defind above, compiled it and used it as a dependency in my Clojure
>>>>> code.
>>>>>
>>>>> Now I am importing the partitioner and the key in my clojure code the
>>>>> following way:
>>>>>
>>>>> (ns xyz
>>>>> (:import
>>>>> [package RFMCPartitioner]
>>>>> [package RFMCKey]
>>>>> )
>>>>> )
>>>>>
>>>>> But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c),
>>>>> it throws the following error:
>>>>>
>>>>> java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast
>>>>> to java.lang.Comparable
>>>>> at
>>>>> org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
>>>>> at
>>>>> scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
>>>>> at
>>>>> org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
>>>>> at
>>>>> org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
>>>>> at
>>>>> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
>>>>> at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
>>>>> at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>>>>> at
>>>>> org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
>>>>> at
>>>>> org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
>>>>> at
>>>>> org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
>>>>> at
>>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
>>>>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>> at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> My guess is that its not able to find the ordering that I have defined
>>>>> after the partitioner. But if it works in Scala, why doesn't it work in
>>>>> Clojure?
>>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Clojure" group.
>>>> To post to this group, send email to [email protected]
>>>> Note that posts from new members are moderated - please be patient with
>>>> your first post.
>>>> To unsubscribe from this group, send email to
>>>> [email protected]
>>>> For more options, visit this group at
>>>> http://groups.google.com/group/clojure?hl=en
>>>> ---
>>>> You received this message because you are subscribed to a topic in the
>>>> Google Groups "Clojure" group.
>>>> To unsubscribe from this topic, visit
>>>> https://groups.google.com/d/topic/clojure/ZoLWl_vbcdU/unsubscribe.
>>>> To unsubscribe from this group and all its topics, send an email to
>>>> [email protected].
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.