[
https://issues.apache.org/jira/browse/SPARK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013623#comment-15013623
]
Herman van Hovell commented on SPARK-3947:
------------------------------------------
Hello Milad,
Could you be a bit more specific? What is the problem you are having? Is there
a difference between local mode and cluster mode? What version of spark are you
using?
I have adapted your code:
{noformat}
import java.math.BigDecimal
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, DataType,
DoubleType, LongType}
class GeometricMean extends UserDefinedAggregateFunction {
def inputSchema: StructType =
StructType(StructField("value", DoubleType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("count", LongType) ::
StructField("product", DoubleType) :: Nil
)
def dataType: DataType = DoubleType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 1.0
}
def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
buffer(0) = buffer.getAs[Long](0) + 1
buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
}
def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0))
}
}
sqlContext.udf.register("gm", new GeometricMean)
val df = Seq(
(1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"),
(2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"),
(3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"),
(4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"),
(5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"),
(6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")).
toDF("receipt_id", "store_country", "store_region", "store_id", "amount",
"seller_name")
df.registerTempTable("receipts")
val q = sql("""
select store_country,
store_region,
avg(amount),
sum(amount),
gm(amount)
from receipts
where amount > 50
and store_country = 'italy'
group by store_country, store_region
""")
q.show
// Result (SPARK 1.5.2):
+-------------+------------+----+--------------------+-----------------+
|store_country|store_region| _c2| _c3| _c4|
+-------------+------------+----+--------------------+-----------------+
| italy| emilia|87.5|175.0000000000000...|86.60254037844386|
| italy| toscana|50.5|50.50000000000000...| 50.5|
| italy| puglia| 70|70.00000000000000...| 70.0|
+-------------+------------+----+--------------------+-----------------+
{noformat}
And I really cannot find a problem.
> Support Scala/Java UDAF
> -----------------------
>
> Key: SPARK-3947
> URL: https://issues.apache.org/jira/browse/SPARK-3947
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Pei-Lun Lee
> Assignee: Yin Huai
> Fix For: 1.5.0
>
> Attachments: spark-udaf.zip
>
>
> Right now only Hive UDAFs are supported. It would be nice to have UDAF
> similar to UDF through SQLContext.registerFunction.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]