This is an automated email from the ASF dual-hosted git repository.
xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push:
new 669cae0 update dlpredictor
new 1e66a17 Merge pull request #4 from radibnia77/main
669cae0 is described below
commit 669cae028f642e0c4a31614a18a9d70a29d696b6
Author: Reza <[email protected]>
AuthorDate: Wed Apr 21 16:58:01 2021 -0700
update dlpredictor
---
Processes/dlpredictor/README.md | 2 +-
Processes/dlpredictor/conf/config.yml | 20 ++--
Processes/dlpredictor/dag/dlpredictor_dag.py | 75 +++++++++++++
.../dlpredictor/{log.py => configutil.py} | 29 +++--
Processes/dlpredictor/dlpredictor/log.py | 6 +-
Processes/dlpredictor/dlpredictor/main_es_push.py | 83 ++++++++++++++
Processes/dlpredictor/dlpredictor/main_spark_es.py | 119 +++++++++------------
Processes/dlpredictor/dlpredictor/show_config.py | 89 +++++++++++++++
.../dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar | Bin 0 -> 1012419 bytes
Processes/dlpredictor/run.sh | 14 ++-
Processes/dlpredictor/setup.py | 16 ---
11 files changed, 345 insertions(+), 108 deletions(-)
diff --git a/Processes/dlpredictor/README.md b/Processes/dlpredictor/README.md
index 171fbbf..4de8fcd 100644
--- a/Processes/dlpredictor/README.md
+++ b/Processes/dlpredictor/README.md
@@ -14,7 +14,7 @@ pip install -r requirements.txt
2. Transfer the dlpredictor directory to ~/code/dlpredictor on a machine
which also has Spark Client.
3. cd dlpredictor
4. pip install -r requirements.txt (to install required packages)
-5. python setup install (to install predictor_dl_model package)
+5. python setup.py install (to install predictor_dl_model package)
6. Run run.sh
### Documentation
diff --git a/Processes/dlpredictor/conf/config.yml
b/Processes/dlpredictor/conf/config.yml
index b10ebcb..cbb5513 100644
--- a/Processes/dlpredictor/conf/config.yml
+++ b/Processes/dlpredictor/conf/config.yml
@@ -1,12 +1,18 @@
-log_level: 'INFO'
-product_tag: 'dlpm'
-pipeline_tag: '11092020'
-factdata_table: 'factdata_hq_09222020_bucket_0'
-distribution_table: '{product_tag}_{pipeline_tag}_tmp_distribution'
-norm_table: '{product_tag}_{pipeline_tag}_trainready'
-model_stat_table: '{product_tag}_{pipeline_tag}_model_stat'
+log_level: 'WARN'
+product_tag: 'dlpredictor'
+pipeline_tag: '04212021'
+
+#input tables from dlpm pipeline
+factdata_table: 'factdata_hq_09222020_r_ipl_mapped_11052020' # this looks like
dlpm_03182021
+distribution_table: 'dlpm_03182021_tmp_distribution'
+norm_table: 'dlpm_03182021_trainready'
+model_stat_table: 'dlpm_03182021_model_stat'
bucket_size: 10
bucket_step: 1
+
+yesterday: '2020-05-31'
+serving_url: 'http://10.193.217.105:8505/v1/models/dlpm6:predict'
+
es_host: '10.213.37.41'
es_port: '9200'
es_predictions_index: '{product_tag}_{pipeline_tag}_predictions'
diff --git a/Processes/dlpredictor/dag/dlpredictor_dag.py
b/Processes/dlpredictor/dag/dlpredictor_dag.py
new file mode 100644
index 0000000..6fd74e0
--- /dev/null
+++ b/Processes/dlpredictor/dag/dlpredictor_dag.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import DAG
+import datetime as dt
+from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
+from datetime import timedelta
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.bash_operator import BashOperator
+
+default_args = {
+ 'owner': 'dlpredictor',
+ 'depends_on_past': False,
+ 'start_date': dt.datetime(2021, 3, 15),
+ 'retries': 0,
+ 'retry_delay': timedelta(minutes=1),
+}
+
+dag = DAG(
+ 'dlpredictor',
+ default_args=default_args,
+ schedule_interval=None,
+
+)
+
+def sparkOperator(
+ file,
+ task_id,
+ executor_cores=5,
+ num_executors=10,
+ **kwargs
+):
+ return SparkSubmitOperator(
+
application='/home/airflow/airflow-apps/dlpredictor/dlpredictor/{}'.format(file),
+
application_args=['/home/airflow/airflow-apps/dlpredictor/conf/config.yml'],
+ conn_id='spark_default',
+ executor_memory='16G',
+ conf={'spark.driver.maxResultSize': '8g'},
+ driver_memory='16G',
+ executor_cores=executor_cores,
+ num_executors=num_executors,
+ task_id=task_id,
+ dag=dag,
+ **kwargs
+ )
+
+
+show_config = sparkOperator('show_config.py', 'show_config')
+
+dlpredictor = sparkOperator('main_spark_es.py',
+ 'dlpredictor',
+
py_files='/home/airflow/airflow-apps/dlpredictor/dist/dlpredictor-1.6.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/imscommon-2.0.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg',
+
jars='/home/airflow/airflow-apps/dlpredictor/lib/elasticsearch-hadoop-6.8.0.jar')
+
+es_push = sparkOperator('main_es_push.py',
+ 'es_push',
+ 3,
+ 3,
+
py_files='/home/airflow/airflow-apps/dlpredictor/dist/dlpredictor-1.6.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/imscommon-2.0.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg',
+
jars='/home/airflow/airflow-apps/dlpredictor/lib/elasticsearch-hadoop-6.8.0.jar')
+
+show_config >> dlpredictor >> es_push
\ No newline at end of file
diff --git a/Processes/dlpredictor/dlpredictor/log.py
b/Processes/dlpredictor/dlpredictor/configutil.py
similarity index 52%
copy from Processes/dlpredictor/dlpredictor/log.py
copy to Processes/dlpredictor/dlpredictor/configutil.py
index 1190a7c..7a069b0 100644
--- a/Processes/dlpredictor/dlpredictor/log.py
+++ b/Processes/dlpredictor/dlpredictor/configutil.py
@@ -14,14 +14,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging.config
-import os
+import re
-path = 'conf/log.conf'
+def resolve_placeholder(in_dict):
+ stack = []
+ for key in in_dict.keys():
+ stack.append((in_dict, key))
+ while len(stack) > 0:
+ (_dict, key) = stack.pop()
+ value = _dict[key]
+ if type(value) == dict:
+ for _key in value.keys():
+ stack.append((value, _key))
+ elif type(value) == str:
+ z = re.findall('\{(.*?)\}', value)
+ if len(z) > 0:
+ new_value = value
+ for item in z:
+ if item in in_dict and type(in_dict[item]) == str:
+ new_value = new_value.replace(
+ '{'+item+'}', in_dict[item])
+ _dict[key] = new_value
-logging.config.fileConfig(path)
-logger_operation = logging.getLogger('operation')
-logger_run = logging.getLogger('run')
-logger_security = logging.getLogger('security')
-logger_user = logging.getLogger('user')
-logger_interface = logging.getLogger('interface')
diff --git a/Processes/dlpredictor/dlpredictor/log.py
b/Processes/dlpredictor/dlpredictor/log.py
index 1190a7c..5da9479 100644
--- a/Processes/dlpredictor/dlpredictor/log.py
+++ b/Processes/dlpredictor/dlpredictor/log.py
@@ -17,9 +17,13 @@
import logging.config
import os
+# Get the base path of this installation.
+# Assuming that this file is packaged in a .egg file.
+basedir =
os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
path = 'conf/log.conf'
+fullpath = os.path.join(basedir, path)
-logging.config.fileConfig(path)
+logging.config.fileConfig(fullpath)
logger_operation = logging.getLogger('operation')
logger_run = logging.getLogger('run')
logger_security = logging.getLogger('security')
diff --git a/Processes/dlpredictor/dlpredictor/main_es_push.py
b/Processes/dlpredictor/dlpredictor/main_es_push.py
new file mode 100644
index 0000000..351bfb6
--- /dev/null
+++ b/Processes/dlpredictor/dlpredictor/main_es_push.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import sys
+import yaml
+
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+
+from dlpredictor.configutil import *
+from dlpredictor.log import *
+from dlpredictor import transform
+
+
+def run (cfg):
+ sc = SparkContext()
+ hive_context = HiveContext(sc)
+ sc.setLogLevel(cfg['log_level'])
+
+ # Load the data frame from Hive.
+ table_name = cfg['es_predictions_index']
+ command = """select * from {}""".format(table_name)
+ df = hive_context.sql(command)
+
+ # Select the columns to push to elasticsearch.
+ rdd = df.rdd.map(lambda x: transform.format_data(x, 'ucdoc'))
+
+ # Write the data frame to elasticsearch.
+ es_write_conf = {"es.nodes": cfg['es_host'],
+ "es.port": cfg['es_port'],
+ "es.resource":
cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
+ "es.batch.size.bytes": "1000000",
+ "es.batch.size.entries": "100",
+ "es.input.json": "yes",
+ "es.mapping.id": "uckey",
+ "es.nodes.wan.only": "true",
+ "es.write.operation": "upsert"}
+ rdd.saveAsNewAPIHadoopFile(
+ path='-',
+ outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
+ keyClass="org.apache.hadoop.io.NullWritable",
+ valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
+ conf=es_write_conf)
+
+ sc.stop()
+
+
+if __name__ == '__main__':
+
+ # Get the execution parameters.
+ parser = argparse.ArgumentParser(description='Prepare data')
+ parser.add_argument('config_file')
+ args = parser.parse_args()
+
+ # Load config file.
+ try:
+ with open(args.config_file, 'r') as ymlfile:
+ cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
+ resolve_placeholder(cfg)
+ logger_operation.info("Successfully open
{}".format(args.config_file))
+ except IOError as e:
+ logger_operation.error("Open config file unexpected error: I/O
error({0}): {1}".format(e.errno, e.strerror))
+ except:
+ logger_operation.error("Unexpected error:{}".format(sys.exc_info()[0]))
+ raise
+
+ # Run this module.
+ run(cfg)
+
diff --git a/Processes/dlpredictor/dlpredictor/main_spark_es.py
b/Processes/dlpredictor/dlpredictor/main_spark_es.py
index c51ea37..a8be99f 100644
--- a/Processes/dlpredictor/dlpredictor/main_spark_es.py
+++ b/Processes/dlpredictor/dlpredictor/main_spark_es.py
@@ -15,43 +15,23 @@
# limitations under the License.
import argparse
-import re
# -*- coding: UTF-8 -*-
import sys
from datetime import datetime, timedelta
-
import yaml
+
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, expr, collect_list, struct
from pyspark.sql.types import StringType, ArrayType, MapType, FloatType,
StructField, StructType
from dlpredictor import transform
+from dlpredictor.configutil import *
from dlpredictor.log import *
from dlpredictor.prediction.forecaster import Forecaster
from dlpredictor.util.sparkesutil import *
-def resolve_placeholder(in_dict):
- stack = []
- for key in in_dict.keys():
- stack.append((in_dict, key))
- while len(stack) > 0:
- (_dict, key) = stack.pop()
- value = _dict[key]
- if type(value) == dict:
- for _key in value.keys():
- stack.append((value, _key))
- elif type(value) == str:
- z = re.findall('\{(.*?)\}', value)
- if len(z) > 0:
- new_value = value
- for item in z:
- if item in in_dict and type(in_dict[item]) == str:
- new_value = new_value.replace('{'+item+'}',
in_dict[item])
- _dict[key] = new_value
-
-
def sum_count_array(hour_counts):
'''
[{14: [u'1:3']}, {13: [u'1:3']}, {11: [u'1:3']}, {15: [u'1:5']}, {22:
[u'1:8']}, {23: [u'1:6']}, {19: [u'1:1']}, {18: [u'1:1']}, {12: [u'1:5']}, {17:
[u'1:5']}, {20: [u'1:3']}, {21: [u'1:21']}]
@@ -122,20 +102,20 @@ def __save_as_table(df, table_name, hive_context,
create_table):
hive_context.sql(command)
-def run(cfg, yesterday, serving_url):
+def run(cfg):
# os.environ[
# 'PYSPARK_SUBMIT_ARGS'] = '--jars
/home/reza/eshadoop/elasticsearch-hadoop-6.5.2/dist/elasticsearch-hadoop-6.5.2.jar
pyspark-shell'
- es_write_conf = {"es.nodes": cfg['es_host'],
- "es.port": cfg['es_port'],
- "es.resource":
cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
- "es.batch.size.bytes": "1000000",
- "es.batch.size.entries": "100",
- "es.input.json": "yes",
- "es.mapping.id": "uckey",
- "es.nodes.wan.only": "true",
- "es.write.operation": "upsert"}
+ # es_write_conf = {"es.nodes": cfg['es_host'],
+ # "es.port": cfg['es_port'],
+ # "es.resource":
cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
+ # "es.batch.size.bytes": "1000000",
+ # "es.batch.size.entries": "100",
+ # "es.input.json": "yes",
+ # "es.mapping.id": "uckey",
+ # "es.nodes.wan.only": "true",
+ # "es.write.operation": "upsert"}
sc = SparkContext()
hive_context = HiveContext(sc)
@@ -151,6 +131,9 @@ def run(cfg, yesterday, serving_url):
traffic_dist = cfg['traffic_dist']
model_stat_table = cfg['model_stat_table']
+ yesterday = cfg['yesterday']
+ serving_url = cfg['serving_url']
+
model_stats = get_model_stats(hive_context, model_stat_table)
# Read dist
@@ -163,6 +146,9 @@ def run(cfg, yesterday, serving_url):
FROM {} AS DIST
""".format(distribution_table)
df_dist = hive_context.sql(command)
+ df_dist = df_dist.repartition('uckey')
+ df_dist.cache()
+ df_dist.count()
# Read norm table
# DataFrame[uckey: string, ts: array<int>, p: float, a__n: float, a_1_n:
float, a_2_n: float, a_3_n: float, a_4_n: float, a_5_n: float, a_6_n: float,
t_UNKNOWN_n: float, t_3G_n: float, t_4G_n: float, t_WIFI_n: float, t_2G_n:
float, g__n: float, g_g_f_n: float, g_g_m_n: float, g_g_x_n: float,
price_cat_1_n: float, price_cat_2_n: float, price_cat_3_n: float, si_vec_n:
array<float>, r_vec_n: array<float>, p_n: float, ts_n: array<float>]
@@ -216,38 +202,33 @@ def run(cfg, yesterday, serving_url):
df = hive_context.sql(command)
+ # add partition_group
+ df = df.repartition("uckey")
+
# [Row(count_array=[u'1:504'], day=u'2019-11-02', hour=2,
uckey=u'magazinelock,04,WIFI,g_m,1,CPM,78', hour_price_imp_map={2: [u'1:504']})]
- df = df.withColumn('hour_price_imp_map',
- expr("map(hour, count_array)"))
+ df = df.withColumn('hour_price_imp_map', expr("map(hour,
count_array)"))
#
[Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19',
day=u'2019-11-02', hour_price_imp_map_list=[{15: [u'3:3']}, {7: [u'3:5']}, {10:
[u'3:3']}, {9: [u'3:1']}, {16: [u'3:2']}, {22: [u'3:11']}, {23: [u'3:3']}, {18:
[u'3:7']}, {0: [u'3:4']}, {1: [u'3:2']}, {19: [u'3:10']}, {8: [u'3:4']}, {21:
[u'3:2']}, {6: [u'3:1']}])]
- df = df.groupBy('uckey', 'day').agg(
-
collect_list('hour_price_imp_map').alias('hour_price_imp_map_list'))
+ df = df.groupBy('uckey',
'day').agg(collect_list('hour_price_imp_map').alias('hour_price_imp_map_list'))
#
[Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19',
day=u'2019-11-02', day_price_imp=[u'3:58'])]
df = df.withColumn('day_price_imp', udf(
sum_count_array,
ArrayType(StringType()))(df.hour_price_imp_map_list)).drop('hour_price_imp_map_list')
#
[Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19',
day=u'2019-11-02', day_price_imp=[u'3:58'], day_price_imp_map={u'2019-11-02':
[u'3:58']})]
- df = df.withColumn('day_price_imp_map', expr(
- "map(day, day_price_imp)"))
+ df = df.withColumn('day_price_imp_map', expr("map(day,
day_price_imp)"))
# [Row(uckey=u'native,z041bf6g4s,WIFI,g_f,1,CPM,71',
day_price_imp_map_list=[{u'2019-11-02': [u'1:2', u'2:261']}, {u'2019-11-03':
[u'2:515']}])])
- df = df.groupBy('uckey').agg(collect_list(
- 'day_price_imp_map').alias('day_price_imp_map_list'))
+ df =
df.groupBy('uckey').agg(collect_list('day_price_imp_map').alias('day_price_imp_map_list'))
# [Row(uckey=u'native,z041bf6g4s,WIFI,g_f,1,CPM,71',
day_price_imp_map_list=[{u'2019-11-02': [u'1:2', u'2:261']}, {u'2019-11-03':
[u'2:515']}], ratio=0.09467455744743347, cluster_uckey=u'892', price_cat=u'1')]
df = df.join(df_dist, on=['uckey'], how='inner')
# df_uckey_cluster keeps the ratio and cluster_key for only uckeys
that are being processed
if not df_uckey_cluster:
- df_uckey_cluster = df.select(
- 'uckey', 'cluster_uckey', 'ratio', 'price_cat')
- df_uckey_cluster.cache()
+ df_uckey_cluster = df.select('uckey', 'cluster_uckey', 'ratio',
'price_cat')
else:
- df_uckey_cluster = df.select(
- 'uckey', 'cluster_uckey', 'ratio',
'price_cat').union(df_uckey_cluster)
- df_uckey_cluster.cache()
+ df_uckey_cluster = df.select('uckey', 'cluster_uckey', 'ratio',
'price_cat').union(df_uckey_cluster)
# [Row(cluster_uckey=u'2469', price_cat=u'2',
cluster_day_price_imp_list=[[{u'2019-11-02': [u'2:90']}, {u'2019-11-03':
[u'2:172']}]])])
df = df.groupBy('cluster_uckey', 'price_cat').agg(
@@ -261,18 +242,15 @@ def run(cfg, yesterday, serving_url):
if not df_prediction_ready:
df_prediction_ready = df
- df_prediction_ready.cache()
else:
df = df_prediction_ready.union(df)
- df = df.groupBy('cluster_uckey', 'price_cat').agg(
- collect_list('ts').alias('ts_list'))
+ df = df.groupBy('cluster_uckey',
'price_cat').agg(collect_list('ts').alias('ts_list'))
df = df.withColumn('ts', udf(sum_day_count_array,
ArrayType(MapType(StringType(),
ArrayType(StringType()))))(df.ts_list))
df = df.drop('ts_list')
# [Row(cluster_uckey=u'magazinelock,03,WIFI,g_f,1,CPM,60',
ts=[{u'2019-11-02': [u'1:2']}])]
df_prediction_ready = df
- df_prediction_ready.cache()
# [Row(cluster_uckey=u'1119', price_cat=u'2', ts=[{u'2019-11-02':
[u'1:862', u'3:49', u'2:1154'], u'2019-11-03': [u'1:596', u'3:67',
u'2:1024']}])]
df = df_prediction_ready
@@ -280,18 +258,15 @@ def run(cfg, yesterday, serving_url):
df = df.join(df_norm, on=['cluster_uckey', 'price_cat'], how='inner')
# [Row(cluster_uckey=u'1119', price_cat=u'2', ts=[{u'2019-11-02':
[u'1:862', u'3:49', u'2:1154'], u'2019-11-03': [u'1:596', u'3:67',
u'2:1024']}], a__n=-0.005224577616900206, a_1_n=0.6089736819267273,
a_2_n=-0.21013110876083374, a_3_n=0.16884993016719818,
a_4_n=-0.3416250944137573, a_5_n=0.15184317529201508,
a_6_n=-0.16529197990894318, t_UNKNOWN_n=-0.4828081429004669,
t_3G_n=1.2522615194320679, t_4G_n=-0.15080969035625458,
t_WIFI_n=-0.35078370571136475, t_2G_n=1.991615653038025, g__n [...]
- df = df.join(df_uckey_cluster, on=[
- 'cluster_uckey', 'price_cat'], how='inner')
+ df = df.join(df_uckey_cluster, on=['cluster_uckey', 'price_cat'],
how='inner')
predictor_udf = udf(transform.predict_daily_uckey(days=day_list,
serving_url=serving_url,
forecaster=forecaster, model_stats=model_stats, columns=df.columns),
MapType(StringType(), FloatType()))
- df = df.withColumn('day_prediction_map',
- predictor_udf(struct([df[name] for name in
df.columns])))
+ df = df.withColumn('day_prediction_map', predictor_udf(struct([df[name]
for name in df.columns])))
# [Row(cluster_uckey=u'1119', price_cat=u'2',
day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0},
ratio=0.11989551782608032,
uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')]
- df = df.select('cluster_uckey', 'price_cat',
- 'day_prediction_map', 'ratio', 'uckey')
+ df = df.select('cluster_uckey', 'price_cat', 'day_prediction_map',
'ratio', 'uckey')
# [Row(ucdoc_elements=Row(price_cat=u'2', ratio=0.11989551782608032,
day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0}),
uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')]
ucdoc_elements_type = StructType([StructField('price_cat', StringType(),
False), StructField(
@@ -300,19 +275,25 @@ def run(cfg, yesterday, serving_url):
(price_cat, ratio,
day_prediction_map), ucdoc_elements_type)(df.price_cat, df.ratio,
df.day_prediction_map)).select('ucdoc_elements_pre_price_cat', 'uckey')
# [Row(uckey=u'splash,d971z9825e,WIFI,g_m,1,CPT,74',
ucdoc_elements=[Row(price_cat=u'1', ratio=0.5007790923118591,
day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0})])]
- df =
df.groupBy('uckey').agg(collect_list('ucdoc_elements_pre_price_cat').alias('ucdoc_elements'))
+ df = df.groupBy('uckey').agg(collect_list(
+ 'ucdoc_elements_pre_price_cat').alias('ucdoc_elements'))
df = df.withColumn('prediction_output',
udf(transform.generate_ucdoc(traffic_dist), StringType())(
df.uckey, df.ucdoc_elements))
df_predictions_doc = df.select('uckey', 'prediction_output')
- rdd = df_predictions_doc.rdd.map(lambda x: transform.format_data(x,
'ucdoc'))
- rdd.saveAsNewAPIHadoopFile(
- path='-',
- outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
- keyClass="org.apache.hadoop.io.NullWritable",
- valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
- conf=es_write_conf)
+
+ # Save the predictions to Hive.
+ table_name = cfg['es_predictions_index']
+ df_predictions_doc.write.option("header", "true").option("encoding",
"UTF-8").mode('overwrite').format('hive').saveAsTable(table_name)
+
+ # rdd = df_predictions_doc.rdd.map(lambda x: transform.format_data(x,
'ucdoc'))
+ # rdd.saveAsNewAPIHadoopFile(
+ # path='-',
+ # outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
+ # keyClass="org.apache.hadoop.io.NullWritable",
+ # valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
+ # conf=es_write_conf)
sc.stop()
@@ -321,8 +302,6 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Prepare data')
parser.add_argument('config_file')
- parser.add_argument('yesterday')
- parser.add_argument('serving_url')
args = parser.parse_args()
# Load config file
@@ -330,13 +309,11 @@ if __name__ == '__main__':
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
resolve_placeholder(cfg)
- logger_operation.info(
- "Successfully open {}".format(args.config_file))
+ logger_operation.info("Successfully open
{}".format(args.config_file))
except IOError as e:
- logger_operation.error(
- "Open config file unexpected error: I/O error({0}):
{1}".format(e.errno, e.strerror))
+ logger_operation.error("Open config file unexpected error: I/O
error({0}): {1}".format(e.errno, e.strerror))
except:
logger_operation.error("Unexpected error:{}".format(sys.exc_info()[0]))
raise
- run(cfg, args.yesterday, args.serving_url)
+ run(cfg)
diff --git a/Processes/dlpredictor/dlpredictor/show_config.py
b/Processes/dlpredictor/dlpredictor/show_config.py
new file mode 100644
index 0000000..cd66fdf
--- /dev/null
+++ b/Processes/dlpredictor/dlpredictor/show_config.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import yaml
+import argparse
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+
+def run(cfg, hive_context):
+
+ log_level = cfg['log_level']
+ product_tag = cfg['product_tag']
+ pipeline_tag = cfg['pipeline_tag']
+
+ factdata = cfg['factdata_table']
+ distribution_table = cfg['distribution_table']
+ norm_table = cfg['norm_table']
+ model_stat_table = cfg['model_stat_table']
+ bucket_size = cfg['bucket_size']
+ bucket_step = cfg['bucket_step']
+
+ es_host = cfg['es_host']
+ es_port = cfg['es_port']
+ es_predictions_index = cfg['es_predictions_index']
+ es_predictions_type = cfg['es_predictions_type']
+ holiday_list = cfg['holiday_list']
+ traffic_dist = cfg['traffic_dist']
+
+ for key in cfg:
+ print('{}: {}'.format(key, cfg[key]))
+ print('')
+
+ print('Output index:')
+ print(es_predictions_index.format(product_tag=product_tag,
pipeline_tag=pipeline_tag))
+ print('')
+
+ command = "SELECT * FROM {}"
+ df = hive_context.sql(command.format(factdata))
+ df_factdata_schema = df.schema
+ print('Factdata schema')
+ df.printSchema()
+
+ command = "SELECT * FROM {}"
+ df = hive_context.sql(command.format(distribution_table))
+ df_distribution_schema = df.schema
+ print('Distribution schema')
+ df.printSchema()
+
+ command = "SELECT * FROM {}"
+ df = hive_context.sql(command.format(norm_table))
+ df_norm_schema = df.schema
+ print('Norm schema')
+ df.printSchema()
+
+ command = "SELECT * FROM {}"
+ df = hive_context.sql(command.format(model_stat_table))
+ df_model_schema = df.schema
+ print('Model stat schema')
+ df.printSchema()
+
+
+
+if __name__ == '__main__':
+
+ parser = argparse.ArgumentParser(description="DLPredictor")
+ parser.add_argument('config_file')
+ args = parser.parse_args()
+ with open(args.config_file, 'r') as yml_file:
+ cfg = yaml.safe_load(yml_file)
+
+ sc = SparkContext.getOrCreate()
+ sc.setLogLevel('WARN')
+ hive_context = HiveContext(sc)
+
+
+ run(cfg=cfg, hive_context= hive_context)
\ No newline at end of file
diff --git a/Processes/dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar
b/Processes/dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar
new file mode 100644
index 0000000..e07213d
Binary files /dev/null and
b/Processes/dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar differ
diff --git a/Processes/dlpredictor/run.sh b/Processes/dlpredictor/run.sh
index a5c5f64..2500d54 100644
--- a/Processes/dlpredictor/run.sh
+++ b/Processes/dlpredictor/run.sh
@@ -1,9 +1,17 @@
#!/bin/bash
-#Start the predictor
+SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
+
+# Start the predictor
if true
then
# spark-submit --num-executors 10 --executor-cores 5 --jars
lib/elasticsearch-hadoop-6.5.2.jar dlpredictor/main_spark_es.py conf/config.yml
'2019-11-03' 's32' '1' 'http://10.193.217.105:8501/v1/models/faezeh:predict'
# spark-submit --master yarn --py-files
dist/dlpredictor-2.0.0-py2.7.egg,dist/imscommon-2.0.0-py2.7.egg,dist/predictor_dl_model-1.0.0-py2.7.egg
--num-executors 3 --executor-cores 3 --jars lib/elasticsearch-hadoop-6.8.0.jar
dlpredictor/main_spark_es.py conf/config.yml '2020-02-08' 's32' '1'
'http://10.193.217.105:8501/v1/models/faezeh:predict'
- spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 16G --driver-memory 16G --py-files
dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg
--jars lib/elasticsearch-hadoop-6.8.0.jar dlpredictor/main_spark_es.py
conf/config.yml '2020-05-31' 'http://10.193.217.105:8503/v1/models/dl3:predict'
-fi
\ No newline at end of file
+ spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 16G --driver-memory 16G --py-files
$SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg,$SCRIPTPATH/lib/imscommon-2.0.0-py2.7.egg,$SCRIPTPATH/lib/predictor_dl_model-1.6.0-py2.7.egg
--jars $SCRIPTPATH/lib/elasticsearch-hadoop-6.8.0.jar
$SCRIPTPATH/dlpredictor/main_spark_es.py $SCRIPTPATH/conf/config.yml
+fi
+
+# Push the data to elasticsearch
+if true
+then
+ spark-submit --master yarn --num-executors 3 --executor-cores 3
--executor-memory 16G --driver-memory 16G --py-files
$SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg,$SCRIPTPATH/lib/imscommon-2.0.0-py2.7.egg
--jars $SCRIPTPATH/lib/elasticsearch-hadoop-6.8.0.jar
$SCRIPTPATH/dlpredictor/main_es_push.py $SCRIPTPATH/conf/config.yml
+fi
diff --git a/Processes/dlpredictor/setup.py b/Processes/dlpredictor/setup.py
index 27d2291..cb3f5d1 100644
--- a/Processes/dlpredictor/setup.py
+++ b/Processes/dlpredictor/setup.py
@@ -1,19 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
from setuptools import setup, find_packages
with open("README.md", "r") as fh: