[ 
https://issues.apache.org/jira/browse/PIO-105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090851#comment-16090851
 ] 

ASF GitHub Bot commented on PIO-105:
------------------------------------

Github user takezoe commented on a diff in the pull request:

    
https://github.com/apache/incubator-predictionio/pull/412#discussion_r127853016
  
    --- Diff: 
core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.predictionio.workflow
    +
    +import java.io.Serializable
    +
    +import com.twitter.bijection.Injection
    +import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
    +import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
    +import grizzled.slf4j.Logging
    +import org.apache.predictionio.controller.{Engine, Utils}
    +import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
    +import org.apache.predictionio.data.storage.{EngineInstance, Storage}
    +import 
org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
    +import org.apache.spark.rdd.RDD
    +import org.json4s._
    +import org.json4s.native.JsonMethods._
    +import scala.language.existentials
    +
    +case class BatchPredictConfig(
    +  inputFilePath: String = "batchpredict-input.json",
    +  outputFilePath: String = "batchpredict-output.json",
    +  queryPartitions: Option[Int] = None,
    +  engineInstanceId: String = "",
    +  engineId: Option[String] = None,
    +  engineVersion: Option[String] = None,
    +  engineVariant: String = "",
    +  env: Option[String] = None,
    +  verbose: Boolean = false,
    +  debug: Boolean = false,
    +  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
    +
    +object BatchPredict extends Logging {
    +
    +  class KryoInstantiator(classLoader: ClassLoader) extends 
ScalaKryoInstantiator {
    +    override def newKryo(): KryoBase = {
    +      val kryo = super.newKryo()
    +      kryo.setClassLoader(classLoader)
    +      SynchronizedCollectionsSerializer.registerSerializers(kryo)
    +      kryo
    +    }
    +  }
    +
    +  object KryoInstantiator extends Serializable {
    +    def newKryoInjection : Injection[Any, Array[Byte]] = {
    +      val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
    +      KryoInjection.instance(kryoInstantiator)
    +    }
    +  }
    +
    +  val engineInstances = Storage.getMetaDataEngineInstances
    +  val modeldata = Storage.getModelDataModels
    +
    +  def main(args: Array[String]): Unit = {
    +    val parser = new 
scopt.OptionParser[BatchPredictConfig]("BatchPredict") {
    +      opt[String]("input") action { (x, c) =>
    +        c.copy(inputFilePath = x)
    +      } text("Path to file containing input queries; a " +
    +        "multi-object JSON file with one object per line.")
    +      opt[String]("output") action { (x, c) =>
    +        c.copy(outputFilePath = x)
    +      } text("Path to file containing output predictions; a " +
    +        "multi-object JSON file with one object per line.")
    +      opt[Int]("query-partitions") action { (x, c) =>
    +        c.copy(queryPartitions = Some(x))
    +      } text("Limit concurrency of predictions by setting the number " +
    +        "of partitions used internally for the RDD of queries.")
    +      opt[String]("engineId") action { (x, c) =>
    +        c.copy(engineId = Some(x))
    +      } text("Engine ID.")
    +      opt[String]("engineId") action { (x, c) =>
    +        c.copy(engineId = Some(x))
    +      } text("Engine ID.")
    +      opt[String]("engineVersion") action { (x, c) =>
    +        c.copy(engineVersion = Some(x))
    +      } text("Engine version.")
    +      opt[String]("engine-variant") required() action { (x, c) =>
    +        c.copy(engineVariant = x)
    +      } text("Engine variant JSON.")
    +      opt[String]("env") action { (x, c) =>
    +        c.copy(env = Some(x))
    +      } text("Comma-separated list of environmental variables (in 
'FOO=BAR' " +
    +        "format) to pass to the Spark execution environment.")
    +      opt[String]("engineInstanceId") required() action { (x, c) =>
    +        c.copy(engineInstanceId = x)
    +      } text("Engine instance ID.")
    +      opt[Unit]("verbose") action { (x, c) =>
    +        c.copy(verbose = true)
    +      } text("Enable verbose output.")
    +      opt[Unit]("debug") action { (x, c) =>
    +        c.copy(debug = true)
    +      } text("Enable debug output.")
    +      opt[String]("json-extractor") action { (x, c) =>
    +        c.copy(jsonExtractor = JsonExtractorOption.withName(x))
    +      }
    +    }
    +
    +    parser.parse(args, BatchPredictConfig()) map { config =>
    +      WorkflowUtils.modifyLogging(config.verbose)
    +      engineInstances.get(config.engineInstanceId) map { engineInstance =>
    +
    +        val engine = getEngine(engineInstance)
    +
    +        run(config, engineInstance, engine)
    +
    +      } getOrElse {
    +        error(s"Invalid engine instance ID. Aborting batch predict.")
    +      }
    +    }
    +  }
    +
    +  def getEngine(engineInstance: EngineInstance): Engine[_, _, _, _, _, _] 
= {
    +
    +    val engineFactoryName = engineInstance.engineFactory
    +
    +    val (engineLanguage, engineFactory) =
    +      WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
    +    val maybeEngine = engineFactory()
    +
    +    // EngineFactory return a base engine, which may not be deployable.
    +    if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
    +      throw new NoSuchMethodException(
    +        s"Engine $maybeEngine cannot be used for batch predict")
    +    }
    +
    +    maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]]
    +  }
    +
    +  def run[Q, P](
    +    config: BatchPredictConfig,
    +    engineInstance: EngineInstance,
    +    engine: Engine[_, _, _, Q, P, _]): Unit = {
    +
    +    val engineParams = engine.engineInstanceToEngineParams(
    +      engineInstance, config.jsonExtractor)
    +
    +    val kryo = KryoInstantiator.newKryoInjection
    +
    +    val modelsFromEngineInstance =
    +      kryo.invert(modeldata.get(engineInstance.id).get.models).get.
    +      asInstanceOf[Seq[Any]]
    +
    +    val prepareSparkContext = WorkflowContext(
    +      batch = engineInstance.engineFactory,
    +      executorEnv = engineInstance.env,
    +      mode = "Batch Predict (model)",
    +      sparkEnv = engineInstance.sparkConf)
    +
    +    val models = engine.prepareDeploy(
    +      prepareSparkContext,
    +      engineParams,
    +      engineInstance.id,
    +      modelsFromEngineInstance,
    +      params = WorkflowParams()
    +    )
    +
    +    val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
    +      Doer(engine.algorithmClassMap(n), p)
    +    }
    +
    +    val servingParamsWithName = engineParams.servingParams
    +
    +    val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
    +      servingParamsWithName._2)
    +
    +    val runSparkContext = WorkflowContext(
    +      batch = engineInstance.engineFactory,
    +      executorEnv = engineInstance.env,
    +      mode = "Batch Predict (runner)",
    +      sparkEnv = engineInstance.sparkConf)
    +
    +    val inputRDD: RDD[String] = 
runSparkContext.textFile(config.inputFilePath)
    +    val queriesRDD: RDD[String] = config.queryPartitions match {
    +      case Some(p) => inputRDD.repartition(p)
    +      case None => inputRDD
    +    }
    +
    +    val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
    +      val jsonExtractorOption = config.jsonExtractor
    +      // Extract Query from Json
    +      val query = JsonExtractor.extract(
    +        jsonExtractorOption,
    +        queryString,
    +        algorithms.head.queryClass,
    +        algorithms.head.querySerializer,
    +        algorithms.head.gsonTypeAdapterFactories
    +      )
    +      // Deploy logic. First call Serving.supplement, then Algo.predict,
    +      // finally Serving.serve.
    +      val supplementedQuery = serving.supplementBase(query)
    +      // TODO: Parallelize the following.
    +      val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
    +        a.predictBase(models(ai), supplementedQuery)
    +      }
    --- End diff --
    
    Zipping predictions and models directly is simpler.
    
    ```scala
    val predictions = algorithms.zip(models).map { case (a, m) =>
      a.predictBase(m, supplementedQuery)
    }
    ```


> Batch Predictions
> -----------------
>
>                 Key: PIO-105
>                 URL: https://issues.apache.org/jira/browse/PIO-105
>             Project: PredictionIO
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Mars Hall
>            Assignee: Mars Hall
>
> Implement a new {{pio batchpredict}} command to enable massive, fast, batch 
> predictions from a trained model. Read a multi-object JSON file as the input 
> format, with one query object per line. Similarly, write results to a 
> multi-object JSON file, with one prediction result + its original query per 
> line.
> Currently getting bulk predictions from PredictionIO is possible with either:
> * a {{pio eval}} script, which will always train a fresh, unvalidated model 
> before getting predictions
> * a custom script that hits the {{queries.json}} HTTP API, which is a serious 
> bottleneck when requesting hundreds-of-thousands or millions of predictions
> Neither of these existing bulk-prediction hacks are adequate for the reasons 
> mentioned.
> It's time for this use-case to be a firstclass command :D
> Pull request https://github.com/apache/incubator-predictionio/pull/412



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to