Github user pferrel commented on a diff in the pull request:
https://github.com/apache/incubator-predictionio/pull/269#discussion_r73793158
--- Diff:
core/src/main/scala/io/prediction/core/SelfCleaningDataSource.scala ---
@@ -0,0 +1,313 @@
+package io.prediction.core
+
+import grizzled.slf4j.Logger
+import io.prediction.annotation.DeveloperApi
+import io.prediction.data.storage.{DataMap, Event,Storage}
+import io.prediction.data.store.{Common, LEventStore, PEventStore}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.joda.time.DateTime
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+
+/** :: DeveloperApi ::
+ * Base class of cleaned data source.
+ *
+ * A cleaned data source consists tools for cleaning events that happened
earlier that
+ * specified duration in seconds from train moment. Also it can remove
duplicates and compress
+ * properties(flat set/unset events to one)
+ *
+ */
+@DeveloperApi
+trait SelfCleaningDataSource {
+
+ implicit object DateTimeOrdering extends Ordering[DateTime] {
+ def compare(d1: DateTime, d2: DateTime) = d2.compareTo(d1)
+ }
+
+
+ @transient lazy private val pEventsDb = Storage.getPEvents()
+ @transient lazy private val lEventsDb = Storage.getLEvents()
+
+ /** :: DeveloperApi ::
+ * Current App name which events will be cleaned.
+ *
+ * @return App name
+ */
+ @DeveloperApi
+ def appName: String
+
+ /** :: DeveloperApi ::
+ * Param list that used for cleanup.
+ *
+ * @return current event windows that will be used to clean up events.
+ */
+ @DeveloperApi
+ def eventWindow: Option[EventWindow] = None
+
+ @transient lazy val logger = Logger[this.type]
+
+ /** :: DeveloperApi ::
+ *
+ * Returns RDD of events happend after duration in event window params.
+ *
+ * @return RDD[Event] most recent PEvents.
+ */
+ @DeveloperApi
+ def getCleanedPEvents(pEvents: RDD[Event]): RDD[Event] = {
+
+ eventWindow
+ .flatMap(_.duration)
+ .map { duration =>
+ val fd = Duration(duration)
+ pEvents.filter(e =>
+ e.eventTime.isAfter(DateTime.now().minus(fd.toMillis))
+ )
+ }.getOrElse(pEvents)
+ }
+
+ /** :: DeveloperApi ::
+ *
+ * Returns Iterator of events happend after duration in event window
params.
+ *
+ * @return Iterator[Event] most recent LEvents.
+ */
+ @DeveloperApi
+ def getCleanedLEvents(lEvents: Iterable[Event]): Iterable[Event] = {
+
+ eventWindow
+ .flatMap(_.duration)
+ .map { duration =>
+ val fd = Duration(duration)
+ lEvents.filter(e =>
+ e.eventTime.isAfter(DateTime.now().minus(fd.toMillis))
+ )
+ }.getOrElse(lEvents).toIterable
+ }
+
+ def compressPProperties(sc: SparkContext, rdd: RDD[Event]): RDD[Event] =
{
+ rdd.filter(isSetEvent)
+ .groupBy(_.entityType)
+ .flatMap { pair =>
+ val (_, ls) = pair
+ ls.groupBy(_.entityId).map { anotherpair =>
+ val (_, anotherls) = anotherpair
+ compress(anotherls)
+ }
+ } ++ rdd.filter(!isSetEvent(_))
+ }
+
+ def compressLProperties(events: Iterable[Event]): Iterable[Event] = {
+ events.filter(isSetEvent).toIterable
+ .groupBy(_.entityType)
+ .map { pair =>
+ val (_, ls) = pair
+ compress(ls)
+ } ++ events.filter(!isSetEvent(_))
+ }
+
+ def removePDuplicates(sc: SparkContext, rdd: RDD[Event]): RDD[Event] = {
+ val now = DateTime.now()
+ rdd.map(x =>
+ (recreateEvent(x, None, now), (x.eventId, x.eventTime)))
+ .groupByKey
+ .map{case (x, y) => recreateEvent(x, y.head._1, y.head._2)}
+
+ }
+
+ def recreateEvent(x: Event, eventId: Option[String], creationTime:
DateTime): Event = {
+ Event(eventId = eventId, event = x.event, entityType = x.entityType,
+ entityId = x.entityId, targetEntityType = x.targetEntityType,
+ targetEntityId = x.targetEntityId, properties = x.properties,
+ eventTime = creationTime, tags = x.tags, prId= x.prId,
+ creationTime = creationTime)
+ }
+
+ def removeLDuplicates(ls: Iterable[Event]): Iterable[Event] = {
+ val now = DateTime.now()
+ ls.toList.map(x =>
+ (recreateEvent(x, None, now), (x.eventId, x.eventTime)))
+ .groupBy(_._1).mapValues( _.map( _._2 ) )
+ .map(x => recreateEvent(x._1, x._2.head._1, x._2.head._2))
+
+ }
+
+ /** :: DeveloperApi ::
+ *
+ * Filters most recent, compress properties and removes duplicates of
PEvents
+ *
+ * @return RDD[Event] most recent PEvents
+ */
+ @DeveloperApi
+ def cleanPersistedPEvents(sc: SparkContext): Unit ={
+ eventWindow match {
+ case Some(ew) =>
+ val result = cleanPEvents(sc)
+ val originalEvents = PEventStore.find(appName)(sc)
+ val newEvents = result subtract originalEvents
+ val eventsToRemove = (originalEvents subtract result).map { case e
=>
+ e.eventId.getOrElse("")
+ }
+
+ wipePEvents(newEvents, eventsToRemove, sc)
+ case None =>
+ }
+ }
+
+ /**
+ * Replace events in Event Store
+ *
+ */
+
+ def wipePEvents(
+ newEvents: RDD[Event],
+ eventsToRemove: RDD[String],
+ sc: SparkContext
+ ): Unit = {
+ val (appId, channelId) = Common.appNameToId(appName, None)
+
+ pEventsDb.write(newEvents, appId)(sc)
+
+ removePEvents(eventsToRemove, appId, sc)
+ }
+
+ def removeEvents(eventsToRemove: Set[String], appId: Int) {
+ val listOfFuture: List[Future[Boolean]] = eventsToRemove.filter(x =>
x != "").toList.map { case eventId =>
+ lEventsDb.futureDelete(eventId, appId)
+ }
+
+ val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture)
+ Await.result(futureOfList, scala.concurrent.duration.Duration(60,
"minutes"))
+ }
+
+ def removePEvents(eventsToRemove: RDD[String], appId: Int, sc:
SparkContext) {
+ pEventsDb.delete(eventsToRemove.filter(x => x != ""), appId, None)(sc)
+ }
+
+
+ /**
+ * Replace events in Event Store
+ *
+ * @param events new events
+ * @param appId delete all events of appId
+ * @param channelId delete all events of channelId
+ */
+ def wipe(
+ newEvents: Set[Event],
+ eventsToRemove: Set[String]
+ ): Unit = {
+ val (appId, channelId) = Common.appNameToId(appName, None)
+
+ val listOfFutureNewEvents: List[Future[String]] = newEvents.toList.map
{ case event =>
+ lEventsDb.futureInsert(recreateEvent(event, None,
event.eventTime), appId)
+ }
+
+ val futureOfListNewEvents: Future[List[String]] =
Future.sequence(listOfFutureNewEvents)
+ Await.result(futureOfListNewEvents,
scala.concurrent.duration.Duration(60, "minutes"))
+
+ removeEvents(eventsToRemove, appId)
+ }
+
+
+ /** :: DeveloperApi ::
+ *
+ * Filters most recent, compress properties of PEvents
+ */
+ @DeveloperApi
+ def cleanPEvents(sc: SparkContext): RDD[Event] = {
+ val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime)
+
+ val rdd = eventWindow match {
+ case Some(ew) =>
+ var updated =
+ if (ew.compressProperties) compressPProperties(sc, pEvents) else
pEvents
+
+ val deduped = if (ew.removeDuplicates)
removePDuplicates(sc,updated) else updated
+ deduped
+ case None =>
+ pEvents
+ }
+ getCleanedPEvents(rdd)
+ }
+
+ /** :: DeveloperApi ::
+ *
+ * Filters most recent, compress properties and removes duplicates of
LEvents
+ *
+ * @return Iterator[Event] most recent LEvents
+ */
+ @DeveloperApi
+ def cleanPersistedLEvents: Unit = {
+ eventWindow match {
+ case Some(ew) =>
+
+ val result = cleanLEvents().toSet
+ val originalEvents = LEventStore.find(appName).toSet
+ val newEvents = result -- originalEvents
+ val eventsToRemove = (originalEvents -- result).map { case e =>
+ e.eventId.getOrElse("")
+ }
+
+ wipe(newEvents, eventsToRemove)
+
+ case None =>
+ }
+ }
+
+ /** :: DeveloperApi ::
+ *
+ * Filters most recent, compress properties of LEvents
+ */
+ @DeveloperApi
+ def cleanLEvents(): Iterable[Event] = {
+ val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime)
+
+ val events = eventWindow match {
+ case Some(ew) =>
+ var updated =
+ if (ew.compressProperties) compressLProperties(lEvents) else
lEvents
+ val deduped = if (ew.removeDuplicates)
removeLDuplicates(updated) else updated
+ deduped
+ case None =>
+ lEvents
+ }
+ getCleanedLEvents(events)
+ }
+
+
+ private def isSetEvent(e: Event): Boolean = {
+ e.event == "$set" || e.event == "$unset"
+ }
+
+ private def compress(events: Iterable[Event]): Event = {
+ events.find(_.event == "$set") match {
+
+ case Some(first) =>
+ events.reduce { (e1, e2) =>
+ val props = e2.event match {
+ case "$set" =>
+ e1.properties.fields ++ e2.properties.fields
+ case "$unset" =>
+ e1.properties.fields
+ .filterKeys(f => !e2.properties.fields.contains(f))
+ }
+ e1.copy(properties = DataMap(props))
+ }
+
+ case None =>
+ events.reduce { (e1, e2) =>
+ e1.copy(properties =
+ DataMap(e1.properties.fields ++ e2.properties.fields)
+ )
+ }
+ }
+ }
+}
+
+case class EventWindow(
+ duration: Option[String] = None,
+ removeDuplicates: Boolean = false,
+ compressProperties: Boolean = false
+)
--- End diff --
Add the notion of an EventWindow to the Datasource. This provides for
deduping events, compressing $set/$unset events, and trimming older events at
the desired watermark.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---