This is an automated email from the ASF dual-hosted git repository.
xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push:
new e9316a4 Update lookalike application
new 371e4b2 Merge pull request #13 from radibnia77/main
e9316a4 is described below
commit e9316a40cac096e6de44730e1c49acd448e899c0
Author: Reza <[email protected]>
AuthorDate: Thu Sep 16 15:09:50 2021 -0700
Update lookalike application
These are changes for lookalike application
1. User consolidation for fast processing
2. Add Integration tests
3. Refactoring
---
.../application/pipeline/dags => }/README.md | 0
Model/lookalike-model/doc/ssd-v2.pdf | Bin 406365 -> 0 bytes
Model/lookalike-model/doc/ssd-v3.pdf | Bin 0 -> 431277 bytes
.../{pipeline/dags/README.md => __init__.py} | 0
.../application/legacy_files/distance_table.py | 95 -------
.../legacy_files/distance_table_list.py | 144 -----------
.../application/pipeline/__init__.py | 15 ++
.../application/pipeline/config.yml | 34 ++-
.../lookalike_model/application/pipeline/run.sh | 14 +-
.../application/pipeline/score_generator.py | 21 +-
...vector_rebucketing.py => score_matrix_table.py} | 47 ++--
.../application/pipeline/score_vector_table.py | 17 +-
.../application/pipeline/seed_user_selector.py | 64 -----
.../pipeline/top_n_similarity_table_generator.py | 111 ++++----
.../lookalike_model/application/rest_client.py | 89 -------
.../validation_plan_0.py} | 10 +-
.../application/validations/validation_plan_1.py | 94 +++++++
Model/lookalike-model/lookalike_model/config.yml | 6 +-
.../lookalike_model/pipeline/main_clean.py | 16 +-
.../lookalike_model/pipeline/main_keywords.py | 113 +++++++++
.../lookalike_model/pipeline/main_trainready.py | 59 +----
Model/lookalike-model/lookalike_model/run.sh | 7 +
.../pipeline/dags => tests/application}/README.md | 0
.../pipeline/config_score_matrix_table.yml | 38 +++
.../pipeline/config_top_n_similarity.yml | 38 +++
.../pipeline/test_score_matrix_table.py | 224 ++++++++++++++++
.../test_top_n_similarity_table_generator_1.py | 183 +++++++++++++
.../test_top_n_similarity_table_generator_2.py | 282 +++++++++++++++++++++
.../pipeline/dags => tests/pipeline}/README.md | 0
.../tests/pipeline/config_clean.yml | 2 +
.../tests/pipeline/config_keywords.yml | 22 ++
.../tests/pipeline/data_generator.py | 87 +++++--
.../tests/pipeline/test_main_clean.py | 24 +-
.../tests/pipeline/test_main_keywords.py | 88 +++++++
Model/lookalike-model/tests/run_test.sh | 23 +-
35 files changed, 1376 insertions(+), 591 deletions(-)
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
b/Model/lookalike-model/README.md
similarity index 100%
copy from
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
copy to Model/lookalike-model/README.md
diff --git a/Model/lookalike-model/doc/ssd-v2.pdf
b/Model/lookalike-model/doc/ssd-v2.pdf
deleted file mode 100644
index 0c0502d..0000000
Binary files a/Model/lookalike-model/doc/ssd-v2.pdf and /dev/null differ
diff --git a/Model/lookalike-model/doc/ssd-v3.pdf
b/Model/lookalike-model/doc/ssd-v3.pdf
new file mode 100644
index 0000000..3729198
Binary files /dev/null and b/Model/lookalike-model/doc/ssd-v3.pdf differ
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
b/Model/lookalike-model/lookalike_model/application/__init__.py
similarity index 100%
copy from
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
copy to Model/lookalike-model/lookalike_model/application/__init__.py
diff --git
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
deleted file mode 100644
index e82c018..0000000
---
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import yaml
-import argparse
-from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.functions import lit, col, udf
-from pyspark.sql.types import FloatType, StringType, StructType, StructField,
ArrayType, MapType
-# from rest_client import predict, str_to_intlist
-import requests
-import json
-import argparse
-from pyspark.sql.functions import udf
-from math import sqrt
-import time
-
-
-
-
-def distance(l1):
- def _distance(l2):
- dist = sum([l1[el]*l2[el] for el,value in l1.items()])
- return dist
- return _distance
-
-def x(l1):
- _udf_distance = udf(distance(l1), FloatType() )
- return _udf_distance
-
-def run(hive_context, cfg):
-
- # input tables
- keywords_table = cfg["input"]["keywords_table"]
- seeduser_table = cfg["input"]["seeduser_table"]
- lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm']
-
- # output dataframes
- lookalike_score_table = cfg["output"]["score_table"]
-
- command = "SELECT * FROM {}"
- df = hive_context.sql(command.format(lookalike_loaded_table_norm))
- df_keywords = hive_context.sql(command.format(keywords_table))
- df_seed_user = hive_context.sql(command.format(seeduser_table))
-
-
- #### creating a tuple of did and kws for seed users
- df_seed_user = df_seed_user.join(df.select('did','kws_norm'), on=['did'],
how='left')
- # df_seed_user = df_seed_user.withColumn("seed_user_list", zip_("did",
"kws"))
- seed_user_list = df_seed_user.select('did','kws_norm').collect()
- # seed_user list = [(did1, {k1:0, k2:0.2, ...}), (did2, )]
- # user =
- c = 0
- temp_list = []
- for item in seed_user_list:
-
- c+= 1
- if c > 850 :
- break
- df = df.withColumn(item[0],x(item[1])(col('kws_norm')))
-
- df.write.option("header", "true").option(
- "encoding",
"UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_score_table)
-
-
-
-if __name__ == "__main__":
- start = time.time()
- parser = argparse.ArgumentParser(description=" ")
- parser.add_argument('config_file')
- args = parser.parse_args()
- with open(args.config_file, 'r') as yml_file:
- cfg = yaml.safe_load(yml_file)
-
- sc = SparkContext.getOrCreate()
- sc.setLogLevel('WARN')
- hive_context = HiveContext(sc)
-
- run(hive_context=hive_context, cfg=cfg)
- sc.stop()
- end = time.time()
- print('Runtime of the program is:', (end - start))
diff --git
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
deleted file mode 100644
index 2217994..0000000
---
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import yaml
-from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.functions import lit, col, udf, array, mean
-from pyspark.sql.types import FloatType, StringType, StructType, StructField,
ArrayType, MapType
-import argparse
-from pyspark.sql.functions import udf
-import time
-import math
-
-'''
-spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16
--executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g
distance_table_list.py config.yml
-'''
-
-
-def euclidean(l1):
- def _euclidean(l2):
- list = []
- for item in l1:
- similarity = 1 - (math.sqrt(sum([(item[i]-l2[i]) ** 2 for i in
range(len(item))]))/math.sqrt(len(item)))
- list.append(similarity)
- return list
- return _euclidean
-
-
-def dot(l1):
- def _dot(l2):
- list = []
- for item in l1:
- similarity = sum([item[i]*l2[i] for i in range(len(item))])
- list.append(similarity)
- return list
- return _dot
-
-
-def ux(l1):
- if alg == "euclidean":
- _udf_similarity = udf(euclidean(l1), ArrayType(FloatType()))
- if alg =="dot":
- _udf_similarity = udf(dot(l1), ArrayType(FloatType()))
- return _udf_similarity
-
-
-
-def l(d):
- s = [value for key, value in d.items()]
- return s
-udf_tolist = udf(l, ArrayType(FloatType()))
-
-def top_n(l):
- #### top 10
- n = 10
- l.sort()
- return l[-n:]
-udf_top_n = udf(top_n, ArrayType(FloatType()))
-
-def _top_n(l1, l2):
- n = 10
- l = sorted(l1+l2)
- return l[-n:]
-
-_udf_top_n = udf(_top_n, ArrayType(FloatType()))
-
-def _mean(l):
- ave = sum(l)/len(l)
- return ave
-udf_mean = udf(_mean, FloatType())
-
-def run(hive_context, cfg):
-
- ## load dataframes
- lookalike_score_table_norm = cfg['output']['score_norm_table']
- keywords_table = cfg["input"]["keywords_table"]
- seeduser_table = cfg["input"]["seeduser_table"]
- lookalike_similarity_table = cfg["output"]["similarity_table"]
-
- command = "SELECT * FROM {}"
- df = hive_context.sql(command.format(lookalike_score_table_norm))
- df_keywords = hive_context.sql(command.format(keywords_table))
- df_seed_user = hive_context.sql(command.format(seeduser_table))
-
-
- #### creating a tuple of did and kws for seed users
- if alg == "dot":
- df = df.withColumn('kws_norm_list', udf_tolist(col('kws_norm')))
- if alg == "euclidean":
- df = df.withColumn('kws_norm_list', udf_tolist(col('kws')))
- df_seed_user = df_seed_user.join(df.select('did','kws_norm_list'),
on=['did'], how='left')
- seed_user_list = df_seed_user.select('did', 'kws_norm_list').collect()
-
-## batch 1 : 0-100 801 seed
- batch_length = 800
- c = 0
- #### i=0, c=0 , batched_user=[0,200], top_10
- total_c = len(seed_user_list)
- df = df.withColumn('top_10', array(lit(0.0)))
- while total_c > 0 :
- len_tobe_p = min(batch_length,total_c)
- total_c-= len_tobe_p
- batched_user = [item[1] for item in seed_user_list[c:
c+len_tobe_p]]
- df =
df.withColumn("similarity_list",ux(batched_user)(col('kws_norm_list')))
- df = df.withColumn("top_10",
_udf_top_n(col("similarity_list"),col("top_10")))
- c+=len_tobe_p
-
- df = df.withColumn("mean_score",udf_mean(col("top_10")))
- df.write.option("header", "true").option(
- "encoding",
"UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_similarity_table)
- extended_did = df.sort(col('mean_score').desc()).select('did',
'mean_score')
-
-
-
-if __name__ == "__main__":
- start = time.time()
- parser = argparse.ArgumentParser(description=" ")
- parser.add_argument('config_file')
- args = parser.parse_args()
- with open(args.config_file, 'r') as yml_file:
- cfg = yaml.safe_load(yml_file)
-
- sc = SparkContext.getOrCreate()
- sc.setLogLevel('WARN')
- hive_context = HiveContext(sc)
-
- ## select similarity algorithm
- alg = cfg["input"]["alg"]
- run(hive_context=hive_context, cfg=cfg)
- sc.stop()
- end = time.time()
- print('Runtime of the program is:', (end - start))
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py
b/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py
new file mode 100644
index 0000000..3e46a55
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
index cd941f6..485f7be 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
@@ -1,35 +1,31 @@
-product_tag: 'lookalike'
-pipeline_tag: '08042021'
+product_tag: 'lookalike_application'
+pipeline_tag: '08172021_1000'
score_generator:
input:
- log_table : "lookalike_03042021_logs"
- did_table: "lookalike_03042021_trainready"
+ log_table : "lookalike_08172021_1000_logs"
+ did_table: "lookalike_08172021_1000_trainready"
keywords_table: "din_ad_keywords_09172020"
- test_table: "lookalike_trainready_jimmy_test"
+ significant_keywords_table: "lookalike_08172021_1000_keywords"
din_model_tf_serving_url:
"http://10.193.217.105:8506/v1/models/lookalike:predict"
din_model_length: 20
- seeduser_table : "lookalike_seeduser"
- number_of_seeduser: 1000
extend: 2000
alg: "euclidean" ##### currently just support "euclideand" and "dot"
output:
- did_score_table: "{product_tag}_score_{pipeline_tag}"
- score_norm_table: "{product_tag}_score_norm_{pipeline_tag}"
- normalize: False
-
+ score_table: "{product_tag}_{pipeline_tag}_score"
+ normalize: False
score_vector:
keywords_table: "din_ad_keywords_09172020"
- score_norm_table: "{product_tag}_score_norm_{pipeline_tag}"
- score_vector_table: "{product_tag}_score_vector_{pipeline_tag}"
+ score_table: "{product_tag}_{pipeline_tag}_score"
+ score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
did_bucket_size: 2
did_bucket_step: 2
-score_vector_rebucketing:
+score_matrix_table:
did_bucket_size: 2
did_bucket_step: 2
- alpha_did_bucket_size: 20 #default=1000
- score_vector_alpha_table: '{product_tag}_score_vector_alpha_{pipeline_tag}'
+ score_matrix_table: '{product_tag}_{pipeline_tag}_score_matrix'
top_n_similarity:
- did_bucket_step: 1
- alpha_did_bucket_step: 10
+ did_bucket_size: 100
+ did_bucket_step: 100
+ cross_bucket_size: 1 # in production this should be as same as
did_bucket_size
top_n: 10
- similarity_table: "{product_tag}_similarity_{pipeline_tag}"
\ No newline at end of file
+ similarity_table: "{product_tag}_{pipeline_tag}_similarity"
\ No newline at end of file
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
index d5bf875..a141ebc 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
@@ -1,16 +1,10 @@
#!/bin/bash
-# Not used as part of pipeline
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict seed_user_selector.py
config.yml "29"
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py
config.yml
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py
config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py
config.yml
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py
config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matrix_table.py
config.yml
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
score_vector_rebucketing.py config.yml
-
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
top_n_similarity_table_generator.py config.yml
-
-# Not used as part of pipeline
-spark-submit ---master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py
config.yml "29"
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
top_n_similarity_table_generator.py config.yml
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
index 2dbebda..6dd86b2 100644
---
a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
+++
b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
@@ -29,7 +29,7 @@ from util import resolve_placeholder
from lookalike_model.pipeline.util import write_to_table,
write_to_table_with_partition
'''
-This process generates the score-norm-table with the following format.
+This process generates the score-table with the following format.
DataFrame[age: int, gender: int, did: string, did_index: bigint,
interval_starting_time: array<string>, interval_keywords: array<string>,
@@ -55,7 +55,7 @@ def str_to_intlist(table):
return ji
-def inputData(record, keyword, length):
+def input_data(record, keyword, length):
if len(record['show_counts']) >= length:
hist = flatten(record['show_counts'][:length])
instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j':
keyword, 'sl': len(hist)}
@@ -69,7 +69,7 @@ def inputData(record, keyword, length):
def predict(serving_url, record, length, new_keyword):
body = {'instances': []}
for keyword in new_keyword:
- instance = inputData(record, keyword, length)
+ instance = input_data(record, keyword, length)
body['instances'].append(instance)
body_json = json.dumps(body)
result = requests.post(serving_url, data=body_json).json()
@@ -174,18 +174,21 @@ if __name__ == '__main__':
hive_context = HiveContext(sc)
# load dataframes
- did_table, keywords_table, din_tf_serving_url, length =
cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
- 'keywords_table'],
cfg['score_generator']['input']['din_model_tf_serving_url'],
cfg['score_generator']['input']['din_model_length']
+ did_table, keywords_table, significant_keywords_table, din_tf_serving_url,
length = cfg['score_generator']['input']['did_table'],
cfg['score_generator']['input'][
+ 'keywords_table'], cfg['score_generator']['input'][
+ 'significant_keywords_table'],
cfg['score_generator']['input']['din_model_tf_serving_url'],
cfg['score_generator']['input']['din_model_length']
command = 'SELECT * FROM {}'
df_did = hive_context.sql(command.format(did_table))
- df_keywords = hive_context.sql(command.format(keywords_table))
+
+ command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS
T1 JOIN {} AS T2 ON T1.keyword=T2.keyword'
+ df_keywords = hive_context.sql(command.format(keywords_table,
significant_keywords_table))
# temporary adding to filter based on active keywords
df_keywords = df_keywords.filter((df_keywords.keyword == 'video') |
(df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
(df_keywords.keyword == 'social') |
(df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
(df_keywords.keyword == 'entertainment'))
- did_loaded_table = cfg['score_generator']['output']['did_score_table']
- score_norm_table = cfg['score_generator']['output']['score_norm_table']
+
+ score_table = cfg['score_generator']['output']['score_table']
# create a CTR score generator instance and run to get the loaded did
ctr_score_generator = CTRScoreGenerator(df_did, df_keywords,
din_tf_serving_url, length)
@@ -198,4 +201,4 @@ if __name__ == '__main__':
df = df.withColumn('kws_norm', udf_normalize(col('kws')))
# save the loaded did to hive table
- write_to_table_with_partition(df, score_norm_table,
partition=('did_bucket'), mode='overwrite')
+ write_to_table_with_partition(df, score_table, partition=('did_bucket'),
mode='overwrite')
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
b/Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py
similarity index 62%
rename from
Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
rename to
Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py
index 7cae65c..ee82c06 100644
---
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
+++
b/Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py
@@ -17,8 +17,8 @@
import yaml
import argparse
from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.functions import lit, col, udf
+from pyspark.sql import HiveContext, SparkSession, Window
+from pyspark.sql.functions import lit, col, udf, collect_list
from pyspark.sql.types import FloatType, StringType, StructType, StructField,
ArrayType, MapType, IntegerType
# from rest_client import predict, str_to_intlist
import requests
@@ -36,41 +36,41 @@ from lookalike_model.pipeline.util import write_to_table,
write_to_table_with_pa
'''
To run, execute the following in application folder.
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
score_vector_rebucketing.py config.yml
+spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matirx_table.py
config.yml
-This process generates added secondary buckects ids (alpha-did-bucket).
+This process consolidates bucket score vectors into matrices.
'''
-def assign_new_bucket_id(df, n, new_column_name):
- def __hash_sha256(s):
- hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
- return int(hex_value, 16)
- _udf = udf(lambda x: __hash_sha256(x) % n, IntegerType())
- df = df.withColumn(new_column_name, _udf(df.did))
- return df
-
-
-def run(hive_context, cfg):
+def run(spark_session, hive_context, cfg):
score_vector_table = cfg['score_vector']['score_vector_table']
- bucket_size = cfg['score_vector_rebucketing']['did_bucket_size']
- bucket_step = cfg['score_vector_rebucketing']['did_bucket_step']
- alpha_bucket_size =
cfg['score_vector_rebucketing']['alpha_did_bucket_size']
- score_vector_alpha_table =
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+ bucket_size = cfg['score_matrix_table']['did_bucket_size']
+ bucket_step = cfg['score_matrix_table']['did_bucket_step']
+ score_matrix_table = cfg['score_matrix_table']['score_matrix_table']
first_round = True
+ num_batches = (bucket_size + bucket_step - 1) / bucket_step
+ batch_num = 1
for did_bucket in range(0, bucket_size, bucket_step):
- command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE
did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket,
did_bucket+bucket_step-1)
+ print('Processing batch {} of {} bucket number:
{}'.format(batch_num, num_batches, did_bucket))
+
+ max_bucket = min(did_bucket+bucket_step-1, bucket_size)
+ command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE
did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket, max_bucket)
+ # command = "SELECT did_bucket, collect_list(struct(did, score_vector,
c1)) AS item FROM {} WHERE did_bucket BETWEEN {} AND {} GROUP BY
did_bucket".format(score_vector_table, did_bucket,
min(did_bucket+bucket_step-1, bucket_size))
df = hive_context.sql(command)
- df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_did_bucket')
+ df = df.groupBy('did_bucket').agg(
+ collect_list('did').alias('did_list'),
+ collect_list('score_vector').alias('score_matrix'),
+ collect_list('c1').alias('c1_list'))
mode = 'overwrite' if first_round else 'append'
- write_to_table_with_partition(df.select('did', 'score_vector', 'c1',
'did_bucket', 'alpha_did_bucket'),
- score_vector_alpha_table,
partition=('did_bucket', 'alpha_did_bucket'), mode=mode)
+ write_to_table_with_partition(df.select('did_list', 'score_matrix',
'c1_list', 'did_bucket'),
+ score_matrix_table,
partition=('did_bucket'), mode=mode)
first_round = False
+ batch_num += 1
if __name__ == "__main__":
@@ -84,8 +84,9 @@ if __name__ == "__main__":
sc = SparkContext.getOrCreate()
sc.setLogLevel('WARN')
hive_context = HiveContext(sc)
+ spark_session = SparkSession(sc)
- run(hive_context=hive_context, cfg=cfg)
+ run(spark_session, hive_context=hive_context, cfg=cfg)
sc.stop()
end = time.time()
print('Runtime of the program is:', (end - start))
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
index 5c13fc8..aa8ee51 100644
---
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
+++
b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
@@ -36,7 +36,7 @@ from util import resolve_placeholder
'''
To run, execute the following in application folder.
-spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py
config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py
config.yml
This process generates the score_vector_table table.
@@ -50,10 +50,11 @@ The top-n-similarity table is
'''
+
def run(hive_context, cfg):
keywords_table = cfg["score_vector"]["keywords_table"]
- score_norm_table = cfg['score_vector']['score_norm_table']
+ score_table = cfg['score_vector']['score_table']
score_vector_table = cfg['score_vector']['score_vector_table']
bucket_size = cfg['score_vector']['did_bucket_size']
bucket_step = cfg['score_vector']['did_bucket_step']
@@ -65,8 +66,12 @@ def run(hive_context, cfg):
# add score-vector iterativly
first_round = True
+ num_batches = (bucket_size + bucket_step - 1) / bucket_step
+ batch_num = 1
for did_bucket in range(0, bucket_size, bucket_step):
- command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket
BETWEEN {} AND {}".format(score_norm_table, did_bucket,
did_bucket+bucket_step-1)
+ print('Processing batch {} of {} bucket number:
{}'.format(batch_num, num_batches, did_bucket))
+
+ command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket
BETWEEN {} AND {}".format(score_table, did_bucket,
min(did_bucket+bucket_step-1, bucket_size))
# |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0
|
# [social -> 0.24231663, entertainment -> 0.20828941, reading ->
0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804,
info -> 0.1978679]|
@@ -75,10 +80,12 @@ def run(hive_context, cfg):
udf(lambda kws: [kws[keyword] if keyword in kws
else 0.0 for keyword in keywords], ArrayType(FloatType()))(df.kws))
df = df.withColumn('c1', udf(lambda x:
float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))
-
+
mode = 'overwrite' if first_round else 'append'
write_to_table_with_partition(df.select('did', 'score_vector', 'c1',
'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
+ # write_to_table_with_partition(df.select('did', 'did_bucket'),
score_vector_table, partition=('did_bucket'), mode=mode)
first_round = False
+ batch_num += 1
if __name__ == "__main__":
@@ -90,7 +97,7 @@ if __name__ == "__main__":
cfg = yaml.safe_load(yml_file)
resolve_placeholder(cfg)
sc = SparkContext.getOrCreate()
- sc.setLogLevel('WARN')
+ sc.setLogLevel('INFO')
hive_context = HiveContext(sc)
run(hive_context=hive_context, cfg=cfg)
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
b/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
deleted file mode 100644
index 691d588..0000000
---
a/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from pyspark import SparkContext
-from pyspark.sql import HiveContext
-import yaml
-import argparse
-from util import resolve_placeholder
-
-
-'''
-input: logfile
-output: lookalike_seeduser table
-
-
-'''
-
-
-def run(hive_context, cfg, kwi):
- seed_user_table = cfg['input']['seeduser_table']
- log_table = cfg['input']['log_table']
- number_of_seeduser = cfg['input']['number_of_seeduser']
-
- # command = "select * from (select * from {} where is_click=1 and
keyword_index=29) as s join (select * from {} where is_click=1 and
keyword_index=26) as b on b.did = s.did where s.gender = 1"
- command = "SELECT * FROM {} WHERE is_click=1 AND keyword_index={}"
- df = hive_context.sql(command.format(log_table, kwi))
- user_list =
df.select('did').alias('did').distinct().limit(number_of_seeduser)
- user_list.cache()
-
- user_list.write.option("header", "true").option(
- "encoding",
"UTF-8").mode("overwrite").format('hive').saveAsTable(seed_user_table)
-
-
-if __name__ == "__main__":
- """
- select seed users
- """
- parser = argparse.ArgumentParser(description=" ")
- parser.add_argument('config_file')
- parser.add_argument('kwi')
- args = parser.parse_args()
- kwi = args.kwi
- with open(args.config_file, 'r') as yml_file:
- cfg = yaml.safe_load(yml_file)
- resolve_placeholder(cfg)
- sc = SparkContext.getOrCreate()
- sc.setLogLevel('WARN')
- hive_context = HiveContext(sc)
-
- run(hive_context=hive_context, cfg=cfg, kwi=kwi)
- sc.stop()
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
index 66363c5..60071b6 100644
---
a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
+++
b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
@@ -19,22 +19,16 @@ import argparse
import pyspark.sql.functions as fn
from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.types import FloatType, StringType, StructType, StructField,
ArrayType, MapType, StructType
+from pyspark.sql import HiveContext, SparkSession
+from pyspark.sql.types import FloatType, StringType, StructType, StructField,
ArrayType, MapType, IntegerType
+from pyspark.sql.functions import udf, col, explode
-# from rest_client import predict, str_to_intlist
-import requests
-import json
-import argparse
-from pyspark.sql.functions import udf
from math import sqrt
import time
import numpy as np
import itertools
import heapq
from util import resolve_placeholder
-
-
from lookalike_model.pipeline.util import write_to_table,
write_to_table_with_partition
'''
@@ -53,65 +47,87 @@ The top-n-similarity table is
'''
-def run(sc, hive_context, cfg):
+def run(spark_session, hive_context, cfg):
- score_vector_alpha_table =
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+ score_matrix_table = cfg['score_matrix_table']['score_matrix_table']
similarity_table = cfg['top_n_similarity']['similarity_table']
- N = cfg['top_n_similarity']['top_n']
-
- did_bucket_size = cfg['score_vector_rebucketing']['did_bucket_size']
+ did_bucket_size = cfg['top_n_similarity']['did_bucket_size']
did_bucket_step = cfg['top_n_similarity']['did_bucket_step']
-
- alpha_bucket_size =
cfg['score_vector_rebucketing']['alpha_did_bucket_size']
- alpha_bucket_step = cfg['top_n_similarity']['alpha_did_bucket_step']
+ cross_bucket_size = cfg['top_n_similarity']['cross_bucket_size']
+ top_n_value = cfg['top_n_similarity']['top_n']
first_round = True
+ num_batches = (did_bucket_size + did_bucket_step - 1) / did_bucket_step
+ batch_num = 1
for did_bucket in range(0, did_bucket_size, did_bucket_step):
+ print('Processing batch {} of {} bucket number:
{}'.format(batch_num, num_batches, did_bucket))
- command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE
did_bucket BETWEEN {} AND {}".format(
- score_vector_alpha_table, did_bucket, did_bucket + did_bucket_step
- 1)
+ command = "SELECT did_list, did_bucket, score_matrix, c1_list FROM {}
WHERE did_bucket BETWEEN {} AND {}".format(
+ score_matrix_table, did_bucket, min(did_bucket + did_bucket_step -
1, did_bucket_size))
# |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0
|
# [0.24231663, 0.20828941, 0.0]|
df = hive_context.sql(command)
df = df.withColumn('top_n_similar_user', fn.array())
- for alpha_bucket in range(0, alpha_bucket_size, alpha_bucket_step):
- command = """SELECT did, score_vector, c1, alpha_did_bucket
- FROM {} WHERE alpha_did_bucket BETWEEN {} AND {}"""
- command = command.format(score_vector_alpha_table,
- alpha_bucket, alpha_bucket +
alpha_bucket_step - 1)
+ for cross_bucket in range(0, cross_bucket_size):
+ print('Processing batch {}, alpha bucket {}'.format(batch_num,
cross_bucket))
+
+ command = """SELECT did_list, score_matrix, c1_list, did_bucket
+ FROM {} WHERE did_bucket = {} """
+ command = command.format(score_matrix_table, cross_bucket)
df_user = hive_context.sql(command)
- block_user = df_user.select('did', 'score_vector', 'c1').collect()
- block_user_did_score = ([_['did'] for _ in block_user],
[_['score_vector'] for _ in block_user])
- block_user_broadcast = sc.broadcast(block_user_did_score)
-
- c2 = np.array([_['c1'] for _ in block_user])
- c2 = np.square(np.linalg.norm(c2)).tolist()
- c2_broadcast = sc.broadcast(c2)
-
- def calculate_similarity(user_score_vector, top_n_user_score, c1):
- m = len(user_score_vector)
- user_score_vector = np.array(user_score_vector)
- dids, other_score_vectors = block_user_broadcast.value
- other_score_vectors = np.array(other_score_vectors)
- cross_mat = np.matmul(user_score_vector,
other_score_vectors.transpose())
- c2 = np.array(c2_broadcast.value)
- similarity = np.sqrt(m) - np.sqrt(c1 + c2 - 2 * cross_mat)
- user_score_s = list(itertools.izip(dids, similarity.tolist()))
- user_score_s.extend(top_n_user_score)
- user_score_s = heapq.nlargest(N, user_score_s, key=lambda x:
x[1])
- return user_score_s
+ cross_users = df_user.select('did_list', 'score_matrix',
'c1_list').collect()
+
+ if len(cross_users) == 0:
+ continue
+
+ cross_users_did_score = (cross_users[0]['did_list'],
cross_users[0]['score_matrix'])
+ c2 = np.array(cross_users[0]['c1_list'])
+
+ def calculate_similarity(cross_users_did_score, c2):
+ def __helper(user_score_matrix, top_n_user_score, c1_list):
+ user_score_matrix = np.array(user_score_matrix)
+ m = user_score_matrix.shape[1]
+ cross_dids, cross_score_matrix = cross_users_did_score
+ cross_score_matrix = np.array(cross_score_matrix)
+ cross_mat = np.matmul(user_score_matrix,
cross_score_matrix.transpose())
+
+ similarity = np.sqrt(m) -
np.sqrt(np.maximum(np.expand_dims(c1_list, 1) + c2 - (2 * cross_mat), 0.0))
+ result = []
+ for cosimilarity, top_n in
itertools.izip_longest(similarity, top_n_user_score, fillvalue=[]):
+ user_score_s = list(itertools.izip(cross_dids,
cosimilarity.tolist()))
+ user_score_s.extend(top_n)
+ user_score_s = heapq.nlargest(top_n_value,
user_score_s, key=lambda x: x[1])
+ result.append(user_score_s)
+ return result
+ return __helper
elements_type = StructType([StructField('did', StringType(),
False), StructField('score', FloatType(), False)])
# update top_n_similar_user field
- df = df.withColumn('top_n_similar_user', udf(calculate_similarity,
ArrayType(elements_type))(df.score_vector, df.top_n_similar_user, df.c1))
+ df = df.withColumn('top_n_similar_user',
udf(calculate_similarity(cross_users_did_score, c2),
+
ArrayType(ArrayType(elements_type)))(df.score_matrix, df.top_n_similar_user,
df.c1_list))
+
+ # Unpack the matrices into individual users.
+ # Note: in Spark 2.4, the udf can be replaced with arrays_zip().
+ def combine(x, y):
+ return list(zip(x, y))
+ df = df.withColumn("new", udf(combine,
ArrayType(StructType([StructField("did", StringType()),
+
StructField("top_n_similar_user", ArrayType(StructType([
+
StructField("did", StringType(), True),
+
StructField("score", FloatType(), True), ]), True)),
+
])))(df.did_list, df.top_n_similar_user))
+ df = df.withColumn("new", explode("new"))
+ df = df.select(col("new.did").alias("did"),
+
col("new.top_n_similar_user").alias("top_n_similar_user"),
+ "did_bucket")
mode = 'overwrite' if first_round else 'append'
# use the partitioned field at the end of the select. Order matters.
- write_to_table_with_partition(df.select('did', 'top_n_similar_user',
'did_bucket'), similarity_table, partition=('did_bucket'), mode=mode)
+ write_to_table_with_partition(df, similarity_table,
partition=('did_bucket'), mode=mode)
first_round = False
+ batch_num += 1
if __name__ == "__main__":
@@ -126,8 +142,9 @@ if __name__ == "__main__":
sc = SparkContext.getOrCreate()
sc.setLogLevel('INFO')
hive_context = HiveContext(sc)
+ spark_session = SparkSession(sc)
- run(sc=sc, hive_context=hive_context, cfg=cfg)
+ run(spark_session, hive_context, cfg)
sc.stop()
end = time.time()
print('Runtime of the program is:', (end - start))
diff --git a/Model/lookalike-model/lookalike_model/application/rest_client.py
b/Model/lookalike-model/lookalike_model/application/rest_client.py
deleted file mode 100644
index c3b7a02..0000000
--- a/Model/lookalike-model/lookalike_model/application/rest_client.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-THE script gets the data, process it and send the request to
-the rest client and print out the response from the the rest API
-"""
-import requests
-import json
-import argparse
-import yaml
-
-
-def flatten(lst):
- f = [y for x in lst for y in x]
- return f
-
-
-def str_to_intlist(table):
- ji = []
- for k in [table[j].split(",") for j in range(len(table))]:
- s = []
- for a in k:
- b = int(a.split(":")[0])
- s.append(b)
- ji.append(s)
- return ji
-
-
-def inputData(record, keyword, length):
- if len(record['show_counts']) >= length:
- hist = flatten(record['show_counts'][:length])
- instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j':
keyword, 'sl': len(hist)}
- else:
- hist = flatten(record['show_counts'])
- # [hist.extend([0]) for i in range(length - len(hist))]
- instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j':
keyword, 'sl': len(hist)}
- return instance
-
-
-def predict(serving_url, record, length, new_keyword):
- body = {"instances": []}
- for keyword in new_keyword:
- instance = inputData(record, keyword, length)
- body['instances'].append(instance)
- body_json = json.dumps(body)
- result = requests.post(serving_url, data=body_json).json()
- if 'error' in result.keys():
- predictions = result['error']
- else:
- predictions = result['predictions']
- return predictions
-
-
-def run(cfg):
- length = cfg['input']['din_model_length']
- url = cfg['input']['din_model_tf_serving_url']
- ##time_interval, did, click_counts, show_counts, media_category,
net_type_index, gender, age, keyword
- record = {"did": 0, "show_counts": ['25:3', '29:6,25:2', '29:1,25:2,14:2',
'14:1,29:2,25:2',
- '29:1', '26:1,14:2,25:4',
'14:1,25:3'], "show_clicks": [], "age": '10', "gender": '3'}
- record['show_counts'] = str_to_intlist(record['show_counts'])
- new_keyword = [26, 27, 29]
- response = predict(serving_url=url, record=record, length=length,
new_keyword=new_keyword)
-
- print(response)
-
-
-if __name__ == '__main__': # record is equal to window size
- parser = argparse.ArgumentParser(description='Prepare data')
- parser.add_argument('config_file')
- args = parser.parse_args()
-
- with open(args.config_file, 'r') as ymlfile:
- cfg = yaml.safe_load(ymlfile)
-
- run(cfg)
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/validation.py
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py
similarity index 87%
rename from
Model/lookalike-model/lookalike_model/application/pipeline/validation.py
rename to
Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py
index 5409472..08f1f5a 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/validation.py
+++
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py
@@ -20,6 +20,10 @@ from pyspark import SparkContext
import argparse, yaml
from util import resolve_placeholder
+'''
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py
config.yml "29"
+'''
+
def counting(click,kwi):
count_click = 0
@@ -31,10 +35,10 @@ def counting(click,kwi):
def run(hive_context,cfg, kwi):
lookalike_score_table = cfg["output"]["similarity_table"]
- seed_user_table = cfg['input']['seeduser_table']
+ seed_user_table = 'lookalike_seeduser'
extend = cfg['input']['extend']
- test_table = cfg['input']['test_table']
- number_of_seeduser = cfg['input']['number_of_seeduser']
+ test_table = 'lookalike_trainready_jimmy_test'
+ number_of_seeduser = 1000
######### filtering the df and removing seed users
command = "select * from {}"
diff --git
a/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py
new file mode 100644
index 0000000..e92ea97
--- /dev/null
+++
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyspark.sql.functions import lit, col, udf, rand
+from pyspark.sql import HiveContext
+from pyspark import SparkContext
+import argparse
+import yaml
+from util import resolve_placeholder
+
+'''
+
+spark-submit --master yarn --num-executors 10 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation_plan_1.py
config.yml '29'
+
+1. Randomly select 1000 users. It is user_list.
+2. Calculate click/imp for kw. It is actual_interest.
+3. Sort user_list based on actual_interest and get first 500. This is list M.
+4. Sort user_list based on kw score and get first 500. It is list N.
+5. Final result is (size of common(M,N) ) / (size of M)
+
+
+'''
+
+
+def process(user, kwi):
+ l = user['kwi_show_counts']
+ imp_for_kwi = sum([int(j.split(':')[1]) if j.split(':')[0] == kwi else 0
for _ in l for j in _.split(',')])
+
+ l = user['kwi_click_counts']
+ click_for_kwi = sum([int(j.split(':')[1]) if j.split(':')[0] == kwi else 0
for _ in l for j in _.split(',')])
+
+ actual_interest = click_for_kwi * 1.0 / imp_for_kwi if imp_for_kwi != 0
else 0
+
+ kwi_score = 0
+ if kwi in user['kws']:
+ kwi_score = user['kws']['kwi']
+
+ return (user['did'], actual_interest, kwi_score)
+
+
+def run(hive_context, cfg, kwi):
+ RANDOM_USERS_SIZE = 1000
+ lookalike_score_table = cfg['score_generator']['output']['score_table']
+
+ # Randomly select 1000 users. It is user_list.
+ command = "SELECT * FROM {}"
+ df = hive_context.sql(command.format(lookalike_score_table))
+ user_list = df.orderBy(rand()).limit(RANDOM_USERS_SIZE).collect()
+ user_metrics = [process(user, kwi) for user in user_list]
+
+ n = sorted(user_metrics, key=lambda x: x[1],
reverse=True)[:RANDOM_USERS_SIZE//2]
+ m = sorted(user_metrics, key=lambda x: x[2],
reverse=True)[:RANDOM_USERS_SIZE//2]
+
+ n = set([_[0] for _ in n])
+ m = set([_[0] for _ in m])
+
+ size_of_common = len(n.intersection(m))
+ result = size_of_common*1.0/(RANDOM_USERS_SIZE//2)
+
+ print(result)
+
+
+if __name__ == "__main__":
+ """
+ validate the result
+ """
+ parser = argparse.ArgumentParser(description=" ")
+ parser.add_argument('config_file')
+ parser.add_argument('kwi')
+ args = parser.parse_args()
+ kwi = args.kwi
+ with open(args.config_file, 'r') as yml_file:
+ cfg = yaml.safe_load(yml_file)
+ resolve_placeholder(cfg)
+
+ sc = SparkContext.getOrCreate()
+ sc.setLogLevel('WARN')
+ hive_context = HiveContext(sc)
+
+ run(hive_context=hive_context, cfg=cfg, kwi=kwi)
+ sc.stop()
diff --git a/Model/lookalike-model/lookalike_model/config.yml
b/Model/lookalike-model/lookalike_model/config.yml
index dbfdf08..fe05afc 100644
--- a/Model/lookalike-model/lookalike_model/config.yml
+++ b/Model/lookalike-model/lookalike_model/config.yml
@@ -99,6 +99,9 @@ features:
'task_id',
'pps_inside_exprmt_ab_tag']
pipeline:
+ main_keywords:
+ keyword_output_table: '{product_tag}_{pipeline_tag}_keywords'
+ keyword_threshold: 0.01 # Portion of showlog traffic that a keyword must
reach to be included.
main_clean:
did_bucket_num: 2 # Number of partitions for did
load_logs_in_minutes: 14400 #1440/day, original=14400
@@ -161,9 +164,6 @@ pipeline:
logs_output_table_name: '{product_tag}_{pipeline_tag}_logs'
main_trainready:
trainready_output_table: '{product_tag}_{pipeline_tag}_trainready'
- show_threshold_low: 0.5 # per day (set to -1 for no low threshold)
- show_threshold_high: -1 # per day (set to -1 for no high threshold)
- active_interval_threshold: 0.2 # proportion of days (set to -1 for no
active day filter)
tfrecords:
tfrecords_statistics_path:
'{product_tag}_{pipeline_tag}_tfrecord_statistics.pkl'
tfrecords_hdfs_path: '{product_tag}_{pipeline_tag}_tfrecord' # it is hdfs
location for tfrecords, over-writes the existing files
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
index 9efa3c6..ffa38e9 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
@@ -20,7 +20,7 @@ from datetime import datetime, timedelta
import timeit
from pyspark import SparkContext
-from pyspark.sql.functions import col, udf
+from pyspark.sql.functions import col, udf, collect_set
from pyspark.sql.types import BooleanType, IntegerType, StringType
from pyspark.sql import HiveContext
from util import load_config, load_batch_config, load_df
@@ -90,6 +90,12 @@ def clean_batched_log(df, df_persona, conditions,
df_keywords, did_bucket_num):
df = add_did_bucket(df, did_bucket_num)
return df
+def filter_keywords(df, keywords):
+ # User defined function to return if the keyword is in the inclusion set.
+ _udf = udf(lambda x: x in keywords, BooleanType())
+
+ # Return the filtered dataframe.
+ return df.filter(_udf(col('keyword')))
def clean_logs(cfg, df_persona, df_keywords, log_table_names):
sc = SparkContext.getOrCreate()
@@ -185,6 +191,8 @@ def run(hive_context, cfg):
did_bucket_num = cfg_clean['did_bucket_num']
+ keywords_effective_table =
cfg['pipeline']['main_keywords']['keyword_output_table']
+
command = """SELECT did,
gender_new_dev AS gender,
forecast_age_dev AS age
@@ -201,6 +209,12 @@ def run(hive_context, cfg):
df_keywords = load_df(hive_context, keywords_table)
#[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')]
+ # Use the effective keyword table to filter the keyword table which
+ # will serve to filter the show and click log tables.
+ df_effective_keywords = load_df(hive_context, keywords_effective_table)
+ effective_keywords =
df_effective_keywords.select(collect_set('keyword')).first()[0]
+ df_keywords = filter_keywords(df_keywords, effective_keywords)
+
log_table_names = (showlog_table, showlog_new_table, clicklog_table,
clicklog_new_table)
clean_logs(cfg, df_persona, df_keywords, log_table_names)
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py
b/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py
new file mode 100644
index 0000000..e09e16b
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+from pyspark import SparkContext
+
+from util import load_config, load_batch_config, print_batching_info
+from util import write_to_table, generate_add_keywords, resolve_placeholder
+
+
+def run(hive_context, showlog_table, keywords_mapping_table,
create_keywords_mapping,
+ start_date, end_date, load_minutes, keyword_threshold,
effective_keywords_table):
+ """
+ # This script goes through the showlog and identifies all the
+ # keywords that comprise a portion of the overall traffic greater
+ # than the specified threshold.
+ """
+
+ # Create ad keywords table if does not exist.
+ if create_keywords_mapping:
+ generate_add_keywords(keywords_mapping_table)
+ #[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')]
+
+ starting_time = datetime.strptime(start_date, "%Y-%m-%d")
+ ending_time = datetime.strptime(end_date, "%Y-%m-%d")
+
+ # In batches, get the show counts for all of the keywords.
+ keyword_totals = {}
+ batched_round = 1
+ while starting_time < ending_time:
+ time_start = starting_time.strftime("%Y-%m-%d %H:%M:%S")
+ batch_time_end = starting_time + timedelta(minutes=load_minutes)
+ batch_time_end = min(batch_time_end, ending_time)
+ time_end = batch_time_end.strftime("%Y-%m-%d %H:%M:%S")
+ print_batching_info("Main keywords", batched_round, time_start,
time_end)
+
+ # Get the impressions for the time window joined with the keywords.
+ command = """SELECT
+ logs.spread_app_id,
+ logs.show_time,
+ kw.keyword
+ FROM {log_table} as logs inner join {keyword_table} as kw
on logs.spread_app_id = kw.spread_app_id
+ WHERE logs.show_time >= '{time_start}' AND show_time <
'{time_end}' """
+ df_showlog_batched =
hive_context.sql(command.format(log_table=showlog_table,
+ keyword_table=keywords_mapping_table, time_start=time_start,
time_end=time_end))
+
+ # Get the number of impressions for each keyword.
+ df = df_showlog_batched.groupby('keyword').count().collect()
+
+ # Add the impression count for each keyword to the dictionary.
+ for row in df:
+ keyword_totals[row['keyword']] =
keyword_totals.get(row['keyword'], 0) + int(row['count'])
+ starting_time = batch_time_end
+ batched_round += 1
+
+ # With the total keyword counts calculated, identify the keywords that meet
+ # the threshold to be included.
+ # Get the total and calculate the count threshold for effective keywords.
+ total_impressions = sum(keyword_totals.values())
+ impression_threshold = keyword_threshold * total_impressions
+
+ # For each keyword, if its count is greater than the threshold, add
+ # it to the effective keyword list.
+ effective_keywords = []
+ for key, value in keyword_totals.items():
+ if value > impression_threshold:
+ effective_keywords.append((key,)) # Append as a tuple
+
+ # Create the dataframe with the results and save to Hive.
+ sc = SparkContext.getOrCreate()
+ df_effective_keywords =
sc.parallelize(effective_keywords).toDF(['keyword'])
+ write_to_table(df_effective_keywords, effective_keywords_table)
+
+
+if __name__ == "__main__":
+
+ """
+ main_keywords is a process to identify the effective keywords that
+ comprise a percentage of the traffic above a given threshold.
+ """
+ sc, hive_context, cfg = load_config(
+ description="clean data of persona, clicklog and showlog.")
+ resolve_placeholder(cfg)
+
+ cfg_clean = cfg['pipeline']['main_clean']
+ showlog_table = cfg['showlog_table_name']
+ keywords_mapping_table = cfg['keywords_table']
+ create_keywords_mapping = cfg_clean['create_keywords']
+
+ cfg_keywords = cfg['pipeline']['main_keywords']
+ keyword_threshold = cfg_keywords['keyword_threshold']
+ effective_keywords_table = cfg_keywords['keyword_output_table']
+
+ start_date, end_date, load_minutes = load_batch_config(cfg)
+
+ run(hive_context, showlog_table, keywords_mapping_table,
create_keywords_mapping,
+ start_date, end_date, load_minutes, keyword_threshold,
effective_keywords_table)
+
+ sc.stop()
+
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
index cf8218f..0fc3247 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
@@ -23,7 +23,7 @@ from pyspark import SparkContext
from pyspark.sql import functions as fn
from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws,
first, create_map, monotonically_increasing_id, row_number
from pyspark.sql.window import Window
-from pyspark.sql.types import FloatType, IntegerType, ArrayType, StringType,
LongType
+from pyspark.sql.types import IntegerType, ArrayType, StringType, LongType
from pyspark.sql import HiveContext
from datetime import datetime, timedelta
from util import write_to_table, write_to_table_with_partition,
print_batching_info, resolve_placeholder, load_config, load_batch_config,
load_df
@@ -36,54 +36,10 @@ def date_to_timestamp(dt):
epoch = datetime.utcfromtimestamp(0)
return int((dt - epoch).total_seconds())
-def filter_users(df, show_threshold_low, show_threshold_high,
active_interval_threshold, total_num_intervals):
- def list_map_count(x):
- value_total = 0
- for cell in x:
- key_values = cell.split(',')
- for key_value in key_values:
- print('list_map_count: {}'.format(key_value))
- _, value = key_value.split(':')
- value_total += int(value)
- return float(value_total)/total_num_intervals
-
- # Returns the ratio of items in the list that have non-zero values.
- # def list_map_nonzero_count(x):
- def list_map_nonzero_ratio(x):
- active_intervals = 0
- for cell in x:
- key_values = cell.split(',')
- for key_value in key_values:
- print('list_map_nonzero_ratio: {}'.format(key_value))
- _, value = key_value.split(':')
- if int(value) != 0:
- active_intervals += 1
- break
- return float(active_intervals)/total_num_intervals
-
- # Aggregate user activity by impressions.
- df = df.withColumn('total_show_count', udf(list_map_count,
FloatType())(col('kwi_show_counts')))
-
- # Filter out users below set activity level.
- if show_threshold_low >= 0:
- df = df.filter(df.total_show_count > show_threshold_low)
- if show_threshold_high >= 0:
- df = df.filter(df.total_show_count < show_threshold_high)
-
- # Calculate the number of active intervals.
- df = df.withColumn('show_active_interval_ratio',
udf(list_map_nonzero_ratio, FloatType())(col('kwi_show_counts')))
-
- # Filter out users below set number of active intervals.
- if (active_interval_threshold >= 0):
- df = df.filter(df.show_active_interval_ratio >
active_interval_threshold)
-
- return df
-
def generate_trainready(hive_context, batch_config,
interval_time_in_seconds,
- logs_table_name, trainready_table, did_bucket_num,
- show_threshold_low, show_threshold_high,
active_interval_threshold):
+ logs_table_name, trainready_table, did_bucket_num):
def group_batched_logs(df_logs):
# group logs from did + interval_time + keyword.
@@ -240,9 +196,6 @@ def generate_trainready(hive_context, batch_config,
for i, feature_name in enumerate(['interval_starting_time',
'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']):
df = df.withColumn(feature_name, col('metrics_list').getItem(i))
- # Filter the users by activity level.
- df = filter_users(df, show_threshold_low, show_threshold_high,
active_interval_threshold, len(all_intervals))
-
# Add did_index
w = Window.orderBy("did_bucket", "did")
df = df.withColumn('row_number', row_number().over(w))
@@ -265,17 +218,11 @@ def run(hive_context, cfg):
cfg_train = cfg['pipeline']['main_trainready']
trainready_table = cfg_train['trainready_output_table']
- show_threshold_low = cfg_train['show_threshold_low']
- show_threshold_high = cfg_train['show_threshold_high']
- active_interval_threshold = cfg_train['active_interval_threshold']
-
did_bucket_num = cfg_clean['did_bucket_num']
batch_config = load_batch_config(cfg)
- generate_trainready(hive_context, batch_config, interval_time_in_seconds,
- logs_table_name, trainready_table, did_bucket_num,
- show_threshold_low, show_threshold_high, active_interval_threshold)
+ generate_trainready(hive_context, batch_config, interval_time_in_seconds,
logs_table_name, trainready_table, did_bucket_num)
if __name__ == "__main__":
diff --git a/Model/lookalike-model/lookalike_model/run.sh
b/Model/lookalike-model/lookalike_model/run.sh
index b24adab..201fa0a 100644
--- a/Model/lookalike-model/lookalike_model/run.sh
+++ b/Model/lookalike-model/lookalike_model/run.sh
@@ -1,5 +1,12 @@
#!/bin/bash
+# main_keywords: identify the keywords with proportion of traffic above set
threshold.
+if false
+then
+ # generate the effective keywords table.
+ spark-submit --master yarn --num-executors 20 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
pipeline/main_keywords.py config.yml
+fi
+
# main_clean: preparing cleaned persona, click and show logs data.
if false
then
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
b/Model/lookalike-model/tests/application/README.md
similarity index 100%
copy from
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
copy to Model/lookalike-model/tests/application/README.md
diff --git
a/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml
b/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml
new file mode 100644
index 0000000..eec98fc
--- /dev/null
+++
b/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml
@@ -0,0 +1,38 @@
+product_tag: 'lookalike_application'
+pipeline_tag: 'unittest_score_matrix_table'
+score_generator:
+ input:
+ log_table : "lookalike_03042021_logs"
+ did_table: "lookalike_03042021_trainready"
+ keywords_table: "din_ad_keywords_09172020"
+ din_model_tf_serving_url:
"http://10.193.217.105:8506/v1/models/lookalike:predict"
+ din_model_length: 20
+ extend: 2000
+ alg: "euclidean" ##### currently just support "euclideand" and "dot"
+ output:
+ score_table: "{product_tag}_score_{pipeline_tag}"
+ normalize: False
+
+score_vector:
+ keywords_table: "din_ad_keywords_09172020"
+ score_table: "{product_tag}_{pipeline_tag}_score"
+ score_vector_table: '{product_tag}_{pipeline_tag}_input_score_vector'
+ did_bucket_size: 2
+ did_bucket_step: 2
+score_vector_rebucketing:
+ did_bucket_size: 2
+ did_bucket_step: 1
+ alpha_did_bucket_size: 4
+ score_vector_alpha_table:
'{product_tag}_{pipeline_tag}_input_score_vector_alpha'
+score_matrix_table:
+ did_bucket_size: 1
+ did_bucket_step: 1
+ score_matrix_table: '{product_tag}_{pipeline_tag}_output_score_matrix'
+top_n_similarity:
+ did_bucket_size: 1
+ did_bucket_step: 1
+ cross_bucket_size: 1
+ cross_bucket_step: 1
+ alpha_did_bucket_step: 1
+ top_n: 10
+ similarity_table: "{product_tag}_{pipeline_tag}_output_similarity"
\ No newline at end of file
diff --git
a/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml
b/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml
new file mode 100644
index 0000000..d18d6e9
--- /dev/null
+++
b/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml
@@ -0,0 +1,38 @@
+product_tag: 'lookalike_application'
+pipeline_tag: 'unittest_top_n_similarity'
+score_generator:
+ input:
+ log_table : "lookalike_03042021_logs"
+ did_table: "lookalike_03042021_trainready"
+ keywords_table: "din_ad_keywords_09172020"
+ din_model_tf_serving_url:
"http://10.193.217.105:8506/v1/models/lookalike:predict"
+ din_model_length: 20
+ extend: 2000
+ alg: "euclidean" ##### currently just support "euclideand" and "dot"
+ output:
+ score_table: "{product_tag}_score_{pipeline_tag}"
+ normalize: False
+
+score_vector:
+ keywords_table: "din_ad_keywords_09172020"
+ score_table: "{product_tag}_{pipeline_tag}_score"
+ score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
+ did_bucket_size: 2
+ did_bucket_step: 2
+score_vector_rebucketing:
+ did_bucket_size: 2
+ did_bucket_step: 1
+ alpha_did_bucket_size: 4
+ score_vector_alpha_table:
'{product_tag}_{pipeline_tag}_input_score_vector_alpha'
+score_matrix_table:
+ did_bucket_size: 2
+ did_bucket_step: 1
+ score_matrix_table: '{product_tag}_{pipeline_tag}_input_score_matrix'
+top_n_similarity:
+ did_bucket_size: 1
+ did_bucket_step: 1
+ cross_bucket_size: 1
+ cross_bucket_step: 1
+ alpha_did_bucket_step: 1
+ top_n: 10
+ similarity_table: "{product_tag}_{pipeline_tag}_output_similarity"
\ No newline at end of file
diff --git
a/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py
b/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py
new file mode 100644
index 0000000..035c994
--- /dev/null
+++
b/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py
@@ -0,0 +1,224 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.types import DataType, StringType, StructField, StructType,
FloatType, IntegerType, ArrayType, MapType
+from lookalike_model.application.pipeline import util, score_matrix_table
+
+
+class TestTopNSimilarityTableGenerator(unittest.TestCase):
+
+ def setUp (self):
+ # Set the log level.
+ self.sc = SparkContext.getOrCreate()
+ self.sc.setLogLevel('ERROR')
+
+ # Initialize the Spark session
+ self.spark = SparkSession.builder.appName('unit
test').enableHiveSupport().getOrCreate()
+ self.hive_context = HiveContext(self.sc)
+
+ def test_run(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_score_matrix_table.yml', 'r')
as ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+
+ # Get the names of the input and output tables.
+ vector_table = cfg['score_vector']['score_vector_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+
+ # Create the input table.
+ self.create_vector_table(vector_table, True)
+
+ # Run the function being tested.
+ score_matrix_table.run(self.spark, self.hive_context, cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by
did_bucket""".format(matrix_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ command = """select * from {} order by
did_bucket""".format(vector_table)
+ df_vector = self.hive_context.sql(command)
+
+ # Validate the output.
+ self.validate_similarity_table(df, df_vector, True)
+
+ def test_run2(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run2 ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_score_matrix_table.yml', 'r')
as ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+
+ # Get the names of the input and output tables.
+ vector_table = cfg['score_vector']['score_vector_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+
+ cfg['score_matrix_table']['did_bucket_size'] = 2
+
+ # Create the input table.
+ self.create_vector_table(vector_table, False)
+
+ # Run the function being tested.
+ score_matrix_table.run(self.spark, self.hive_context, cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by
did_bucket""".format(matrix_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ command = """select * from {} order by
did_bucket""".format(vector_table)
+ df_vector = self.hive_context.sql(command)
+
+ # Validate the output.
+ self.validate_similarity_table(df, df_vector, False)
+
+ def test_run3(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run3 ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_score_matrix_table.yml', 'r')
as ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+
+ # Get the names of the input and output tables.
+ vector_table = cfg['score_vector']['score_vector_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+
+ cfg['score_matrix_table']['did_bucket_size'] = 2
+ cfg['score_matrix_table']['did_bucket_step'] = 2
+
+ # Create the input table.
+ self.create_vector_table(vector_table, False)
+
+ # Run the function being tested.
+ score_matrix_table.run(self.spark, self.hive_context, cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by
did_bucket""".format(matrix_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ command = """select * from {} order by
did_bucket""".format(vector_table)
+ df_vector = self.hive_context.sql(command)
+
+ # Validate the output.
+ self.validate_similarity_table(df, df_vector, False)
+
+ def create_vector_table (self, table_name, same_bucket=False):
+ later_bucket = 1
+ if same_bucket:
+ later_bucket = 0
+
+ data = [
+ ('0000001', [0.1, 0.8, 0.9], 1.46, 0),
+ ('0000002', [0.1, 0.1, 0.1], 0.03, 0),
+ ('0000003', [0.1, 0.8, 0.9], 1.46, later_bucket),
+ ('0000004', [0.1, 0.2, 0.3], 0.14, later_bucket),
+ ]
+
+ schema = StructType([
+ StructField("did", StringType(), True),
+ StructField("score_vector", ArrayType(FloatType(), True), True),
+ StructField("c1", FloatType(), True),
+ StructField("did_bucket", IntegerType(), True)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ util.write_to_table(df, table_name)
+
+ def create_matrix(self):
+ data = [
+ (['0000001', '0000002', '0000003', '0000004'],
+ [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1], [0.1, 0.8, 0.9], [0.1, 0.2,
0.3]],
+ [1.46, 0.03, 1.46, 0.14],
+ 0)
+ ]
+
+ schema = StructType([
+ StructField("did_list", ArrayType(StringType(), True)),
+ StructField("score_matrix", ArrayType(ArrayType(FloatType(),
True)), True),
+ StructField("c1_list", ArrayType(FloatType(), True)),
+ StructField("did_bucket", IntegerType(), True)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ return df
+
+ def create_matrix2(self):
+ data = [
+ (['0000001', '0000002'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]],
[1.46, 0.03], 0),
+ (['0000003', '0000004'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]],
[1.46, 0.14], 1)
+ ]
+
+ schema = StructType([
+ StructField("did_list", ArrayType(StringType(), True)),
+ StructField("score_matrix", ArrayType(ArrayType(FloatType(),
True)), True),
+ StructField("c1_list", ArrayType(FloatType(), True)),
+ StructField("did_bucket", IntegerType(), True)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ return df
+
+ def validate_similarity_table (self, df, df_vector, same_bucket=False):
+ if same_bucket:
+ df_ref = self.create_matrix()
+ else:
+ df_ref = self.create_matrix2()
+
+ # Verify the column names.
+ self.assertEqual(len(df.columns), len(df_ref.columns))
+ for name in df.columns:
+ self.assertIn(name, df_ref.columns)
+
+ # Verify the number of rows.
+ self.assertEqual(df.count(), df_ref.count())
+
+ # Index the score vector by did_bucket and did for quick reference.
+ vectors = {}
+ for row in df_vector.collect():
+ did = row['did']
+ score_vector = row['score_vector']
+ c1 = row['c1']
+ did_bucket = row['did_bucket']
+ if did_bucket not in vectors:
+ vectors[did_bucket] = {did: (score_vector, c1)}
+ else:
+ vectors[did_bucket][did] = (score_vector, c1)
+
+ # Check the row values.
+ for row in df.collect():
+ did_bucket = row['did_bucket']
+ self.assertIn(did_bucket, vectors)
+ did_map = vectors[did_bucket]
+ for did, vector, c1 in zip(row['did_list'], row['score_matrix'],
row['c1_list']):
+ self.assertIn(did, did_map)
+ self.assertEqual(vector, did_map[did][0])
+ self.assertEqual(c1, did_map[did][1])
+
+
+# Runs the tests.
+if __name__ == '__main__':
+ # Run the unit tests.
+ unittest.main()
\ No newline at end of file
diff --git
a/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py
new file mode 100644
index 0000000..ddecac8
--- /dev/null
+++
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py
@@ -0,0 +1,183 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import HiveContext, Row, SparkSession
+from pyspark.sql.functions import col, udf, collect_set
+from pyspark.sql.types import IntegerType, BooleanType, StructType,
StructField, StringType, StructType, ArrayType, FloatType
+from lookalike_model.pipeline import main_clean, util
+from lookalike_model.pipeline.util import write_to_table
+from lookalike_model.application.pipeline import
top_n_similarity_table_generator
+import random
+import string
+
+'''
+spark-submit --master yarn --num-executors 2 --executor-cores 5
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
test_top_n_similarity_table_generator_1.py
+'''
+
+
+def random_string_generator(str_size):
+ PREFIX = 'lookalike_application_unittest_'
+ return PREFIX+''.join(random.choice(string.ascii_letters) for _ in
range(str_size))
+
+
+class TestMainClean(unittest.TestCase):
+
+ def setUp(self):
+ # Set the log level.
+ self.sc = SparkContext.getOrCreate()
+ self.sc.setLogLevel('ERROR')
+
+ # Initialize the Spark session
+ self.spark = SparkSession.builder.appName('unit
test').enableHiveSupport().getOrCreate()
+ self.hive_context = HiveContext(self.sc)
+
+ def drop_table(self, table_name):
+ self.hive_context.sql('DROP TABLE {}'.format(table_name))
+
+ def create_matrix_table(self, data, table_name):
+ schema = StructType([
+ StructField("did_list", ArrayType(StringType(), False)),
+ StructField("score_matrix", ArrayType(ArrayType(FloatType(),
False)), False),
+ StructField("c1_list", ArrayType(FloatType(), False)),
+ StructField("did_bucket", IntegerType(), False)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ util.write_to_table(df, table_name)
+
+ def run_top_n_similarity_table_generator(self, cfg, _input):
+ self.create_matrix_table(_input,
cfg['score_matrix_table']['score_matrix_table'])
+ top_n_similarity_table_generator.run(self.sc, self.hive_context, cfg)
+ result = self.hive_context.sql('SELECT
did,top_n_similar_user,did_bucket FROM
{}'.format(cfg['top_n_similarity']['similarity_table']))
+ return result
+
+ def compare_output_and_expected_and_cleanup(self, cfg, test_name,
df_output, _expected_output):
+ elements_type = StructType([StructField('did', StringType(), False),
StructField('score', FloatType(), False)])
+ _schema = StructType([StructField('did', StringType(), False),
StructField('top_n_similar_user',
+
ArrayType(elements_type), False), StructField('did_bucket', IntegerType(),
False)])
+ df_expected_output =
self.hive_context.createDataFrame(_expected_output, _schema)
+
+ df_output = df_output.sort('did')
+ df_expected_output = df_expected_output.sort('did')
+
+ print('Test name : {}'.format(test_name))
+
+ print('Expected')
+ df_expected_output.show(10, False)
+
+ print('Output')
+ df_output.show(10, False)
+
+ diff = df_output.subtract(df_expected_output)
+ print('Difference')
+ diff.show(10, False)
+ diff_count = diff.count()
+
+ # Clean up: Remove tmp tables
+ self.drop_table(cfg['score_matrix_table']['score_matrix_table'])
+ self.drop_table(cfg['top_n_similarity']['similarity_table'])
+
+ return diff_count == 0
+
+ def test_run_1(self):
+ cfg = {
+ 'score_matrix_table': {
+ 'did_bucket_size': 2,
+ 'did_bucket_step': 2,
+ 'score_matrix_table': random_string_generator(10)
+ },
+
+ 'top_n_similarity': {'did_bucket_size': 2,
+ 'did_bucket_step': 2,
+ 'cross_bucket_size': 2,
+ 'top_n': 10,
+ 'similarity_table':
random_string_generator(10)}
+ }
+
+ _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.8, 0.9]], [1.46,
1.46], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.8, 0.9]], [1.46, 1.46], 1)]
+ _expected_output = [
+ ('1', [Row(did='1', score=1.7316996), Row(did='2',
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4',
score=1.7316996)], 0),
+ ('2', [Row(did='1', score=1.7316996), Row(did='2',
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4',
score=1.7316996)], 0),
+ ('3', [Row(did='1', score=1.7316996), Row(did='2',
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4',
score=1.7316996)], 1),
+ ('4', [Row(did='1', score=1.7316996), Row(did='2',
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4',
score=1.7316996)], 1),
+ ]
+
+ df_output = self.run_top_n_similarity_table_generator(cfg, _input)
+ are_equal = self.compare_output_and_expected_and_cleanup(cfg,
'test_run_1', df_output, _expected_output)
+ self.assertTrue(are_equal)
+
+ def test_run_2(self):
+ cfg = {
+ 'score_matrix_table': {
+ 'did_bucket_size': 2,
+ 'did_bucket_step': 2,
+ 'score_matrix_table': random_string_generator(10)
+ },
+
+ 'top_n_similarity': {'did_bucket_size': 2,
+ 'did_bucket_step': 2,
+ 'cross_bucket_size': 2,
+ 'top_n': 10,
+ 'similarity_table':
random_string_generator(10)}
+ }
+
+ _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46,
0.03], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1)]
+ _expected_output = [
+ ('1', [Row(did='1', score=1.7316996), Row(did='3',
score=1.7316996), Row(did='4', score=0.8835226), Row(did='2',
score=0.6690362)], 0),
+ ('2', [Row(did='2', score=1.7320508), Row(did='4',
score=1.5084441), Row(did='3', score=0.6690362), Row(did='1',
score=0.6690362)], 0),
+ ('3', [Row(did='1', score=1.7316996), Row(did='3',
score=1.7316996), Row(did='4', score=0.8835226), Row(did='2',
score=0.6690362)], 1),
+ ('4', [Row(did='4', score=1.7320508), Row(did='2',
score=1.5084441), Row(did='3', score=0.8835226), Row(did='1',
score=0.8835226)], 1),
+ ]
+
+ df_output = self.run_top_n_similarity_table_generator(cfg, _input)
+ are_equal = self.compare_output_and_expected_and_cleanup(cfg,
'test_run_2', df_output, _expected_output)
+ self.assertTrue(are_equal)
+
+ def test_run_3(self):
+ cfg = {
+ 'score_matrix_table': {
+ 'did_bucket_size': 2,
+ 'did_bucket_step': 2,
+ 'score_matrix_table': random_string_generator(10)
+ },
+
+ 'top_n_similarity': {'did_bucket_size': 2,
+ 'did_bucket_step': 2,
+ 'cross_bucket_size': 2,
+ 'top_n': 2,
+ 'similarity_table':
random_string_generator(10)}
+ }
+
+ _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46,
0.03], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1)]
+ _expected_output = [
+ ('1', [Row(did='1', score=1.7316996), Row(did='3',
score=1.7316996)], 0),
+ ('2', [Row(did='2', score=1.7320508), Row(did='4',
score=1.5084441)], 0),
+ ('3', [Row(did='1', score=1.7316996), Row(did='3',
score=1.7316996)], 1),
+ ('4', [Row(did='4', score=1.7320508), Row(did='2',
score=1.5084441)], 1),
+ ]
+
+ df_output = self.run_top_n_similarity_table_generator(cfg, _input)
+ are_equal = self.compare_output_and_expected_and_cleanup(cfg,
'test_run_3', df_output, _expected_output)
+ self.assertTrue(are_equal)
+
+
+# Runs the tests.
+if __name__ == '__main__':
+ # Run the unit tests.
+ unittest.main()
diff --git
a/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py
new file mode 100644
index 0000000..d7a56ae
--- /dev/null
+++
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py
@@ -0,0 +1,282 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.types import DataType, StringType, StructField, StructType,
FloatType, IntegerType, ArrayType, MapType
+from lookalike_model.application.pipeline import util,
top_n_similarity_table_generator
+
+
+class TestTopNSimilarityTableGenerator(unittest.TestCase):
+
+ def setUp (self):
+ # Set the log level.
+ self.sc = SparkContext.getOrCreate()
+ self.sc.setLogLevel('ERROR')
+
+ # Initialize the Spark session
+ self.spark = SparkSession.builder.appName('unit
test').enableHiveSupport().getOrCreate()
+ self.hive_context = HiveContext(self.sc)
+
+ def test_run(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_top_n_similarity.yml', 'r') as
ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+
+ # Get the names of the input and output tables.
+ # alpha_table =
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+ top_n_table = cfg['top_n_similarity']['similarity_table']
+
+ # Create the input table.
+ # self.create_alpha_table(alpha_table)
+ self.create_matrix_table(matrix_table)
+
+ # Run the function being tested.
+ top_n_similarity_table_generator.run(self.spark, self.hive_context,
cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by did""".format(top_n_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ # Validate the output.
+ self.validate_similarity_table(df, True)
+
+ def test_run2(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run2 ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_top_n_similarity.yml', 'r') as
ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+ cfg['top_n_similarity']['did_bucket_size'] = 2
+ cfg['top_n_similarity']['cross_bucket_size'] = 2
+
+ # Get the names of the input and output tables.
+ # alpha_table =
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+ top_n_table = cfg['top_n_similarity']['similarity_table']
+
+ # Create the input table.
+ # self.create_alpha_table(alpha_table)
+ self.create_matrix_table2(matrix_table)
+
+ # Run the function being tested.
+ top_n_similarity_table_generator.run(self.spark, self.hive_context,
cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by did""".format(top_n_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ # Validate the output.
+ self.validate_similarity_table(df)
+
+ def test_run3(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run3 ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_top_n_similarity.yml', 'r') as
ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+ cfg['top_n_similarity']['did_bucket_size'] = 2
+ cfg['top_n_similarity']['did_bucket_step'] = 2
+ cfg['top_n_similarity']['cross_bucket_size'] = 2
+
+ # Get the names of the input and output tables.
+ # alpha_table =
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+ top_n_table = cfg['top_n_similarity']['similarity_table']
+
+ # Create the input table.
+ # self.create_alpha_table(alpha_table)
+ self.create_matrix_table2(matrix_table)
+
+ # Run the function being tested.
+ top_n_similarity_table_generator.run(self.spark, self.hive_context,
cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by did""".format(top_n_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ # Validate the output.
+ self.validate_similarity_table(df)
+
+ def test_run4(self):
+ print('*** Running TestTopNSimilarityTableGenerator.test_run4 ***')
+
+ # Load the test configuration.
+ with open('application/pipeline/config_top_n_similarity.yml', 'r') as
ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+ util.resolve_placeholder(cfg)
+ cfg['top_n_similarity']['did_bucket_size'] = 2
+ cfg['top_n_similarity']['did_bucket_step'] = 2
+ cfg['top_n_similarity']['cross_bucket_size'] = 2
+ cfg['top_n_similarity']['cross_bucket_step'] = 2
+
+ # Get the names of the input and output tables.
+ # alpha_table =
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+ matrix_table = cfg['score_matrix_table']['score_matrix_table']
+ top_n_table = cfg['top_n_similarity']['similarity_table']
+
+ # Create the input table.
+ # self.create_alpha_table(alpha_table)
+ self.create_matrix_table2(matrix_table)
+
+ # Run the function being tested.
+ top_n_similarity_table_generator.run(self.spark, self.hive_context,
cfg)
+
+ # Load the output of the function.
+ command = """select * from {} order by did""".format(top_n_table)
+ df = self.hive_context.sql(command)
+ df.show()
+
+ # Validate the output.
+ self.validate_similarity_table(df)
+
+ def create_matrix_table(self, table_name):
+ data = [
+ (['0000001', '0000002', '0000003', '0000004'],
+ [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1], [0.1, 0.8, 0.9], [0.1, 0.2,
0.3]],
+ [1.46, 0.03, 1.46, 0.14],
+ 0)
+ ]
+
+ schema = StructType([
+ StructField("did_list", ArrayType(StringType(), True)),
+ StructField("score_matrix", ArrayType(ArrayType(FloatType(),
True)), True),
+ StructField("c1_list", ArrayType(FloatType(), True)),
+ StructField("did_bucket", IntegerType(), True)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ util.write_to_table(df, table_name)
+
+ def create_matrix_table2(self, table_name):
+ data = [
+ (['0000001', '0000002'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]],
[1.46, 0.03], 0),
+ (['0000003', '0000004'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]],
[1.46, 0.14], 1)
+ ]
+
+ schema = StructType([
+ StructField("did_list", ArrayType(StringType(), True)),
+ StructField("score_matrix", ArrayType(ArrayType(FloatType(),
True)), True),
+ StructField("c1_list", ArrayType(FloatType(), True)),
+ StructField("did_bucket", IntegerType(), True)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ util.write_to_table(df, table_name)
+
+ def create_alpha_table (self, table_name):
+ data = [
+ ('0000001', [0.1, 0.8, 0.9], 1.46, 0, 0),
+ ('0000002', [0.1, 0.1, 0.1], 0.03, 0, 1),
+ ('0000003', [0.1, 0.8, 0.9], 1.46, 1, 2),
+ ('0000004', [0.1, 0.2, 0.3], 0.14, 1, 3),
+ ]
+
+ schema = StructType([
+ StructField("did", StringType(), True),
+ StructField("score_vector", ArrayType(FloatType(), True), True),
+ StructField("c1", FloatType(), True),
+ StructField("did_bucket", IntegerType(), True),
+ StructField("alpha_did_bucket", IntegerType(), True)
+ ])
+
+ df =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ util.write_to_table(df, table_name)
+
+ def validate_similarity_table (self, df, same_bucket=False):
+ later_bucket = 1
+ if same_bucket:
+ later_bucket = 0
+ data = [
+ ('0000001', [{'did':'0000001', 'score':1.73205081},
{'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267},
{'did':'0000002', 'score':0.66903623}], 0),
+ ('0000002', [{'did':'0000002', 'score':1.73205081},
{'did':'0000004', 'score':1.50844401}, {'did':'0000001', 'score':0.66903623},
{'did':'0000003', 'score':0.66903623}], 0),
+ ('0000003', [{'did':'0000001', 'score':1.73205081},
{'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267},
{'did':'0000002', 'score':0.66903623}], later_bucket),
+ ('0000004', [{'did':'0000004', 'score':1.73205081},
{'did':'0000002', 'score':1.50844401}, {'did':'0000001', 'score':0.88532267},
{'did':'0000003', 'score':0.88532267}], later_bucket)
+ ]
+
+ # data = [
+ # (['0000001', '0000002', '0000003', '0000004'],
+ # [[{'did':'0000001', 'score':1.73205081}, {'did':'0000003',
'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002',
'score':0.66903623}],
+ # [{'did':'0000002', 'score':1.73205081}, {'did':'0000004',
'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, {'did':'0000003',
'score':0.66903623}],
+ # [{'did':'0000001', 'score':1.73205081}, {'did':'0000003',
'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002',
'score':0.66903623}],
+ # [{'did':'0000004', 'score':1.73205081}, {'did':'0000002',
'score':1.50844401}, {'did':'0000001', 'score':0.88532267}, {'did':'0000003',
'score':0.88532267}]],
+ # 0)
+ # ]
+
+ schema = StructType([
+ StructField("did", StringType(), True),
+ StructField("top_n_similar_user", ArrayType(MapType(StringType(),
StringType(), True), True)),
+ StructField("did_bucket", IntegerType(), True)
+ ])
+
+ df_ref =
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+ # util.write_to_table(df, table_name)
+
+ # Verify the column names.
+ self.assertEqual(len(df.columns), len(df_ref.columns))
+ for name in df.columns:
+ self.assertIn(name, df_ref.columns)
+
+ # Verify the number of rows.
+ self.assertEqual(df.count(), df_ref.count())
+
+ # Check the row values.
+ for row, row_ref in zip(df.collect(), df_ref.collect()):
+ self.assertEqual(row['did'], row_ref['did'])
+ self.assertEqual(row['did_bucket'], row_ref['did_bucket'])
+ top_n = row['top_n_similar_user']
+ top_n_ref = row_ref['top_n_similar_user']
+ top_n_ordered = []
+
+ # Convert the reference list into a list of score/[did] tuples.
+ for ref in top_n_ref:
+ if len(top_n_ordered) == 0 or float(ref['score']) !=
top_n_ordered[-1][0]:
+ top_n_ordered.append((float(ref['score']), [ ref['did'] ]))
+ else:
+ top_n_ordered[-1][1].append(ref['did'])
+ print(top_n_ordered)
+
+ # Verify the similarity order and values.
+ index = 0
+ index_count = 0
+ for item in top_n:
+ self.assertAlmostEqual(item['score'], top_n_ordered[index][0],
2)
+ self.assertIn(item['did'], top_n_ordered[index][1])
+ index_count += 1
+ if (index_count == len(top_n_ordered[index][1])):
+ index += 1
+ index_count = 0
+
+
+
+
+
+
+# Runs the tests.
+if __name__ == '__main__':
+ # Run the unit tests.
+ unittest.main()
\ No newline at end of file
diff --git
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
b/Model/lookalike-model/tests/pipeline/README.md
similarity index 100%
rename from
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
rename to Model/lookalike-model/tests/pipeline/README.md
diff --git a/Model/lookalike-model/tests/pipeline/config_clean.yml
b/Model/lookalike-model/tests/pipeline/config_clean.yml
index 8ae2c63..546a0b5 100644
--- a/Model/lookalike-model/tests/pipeline/config_clean.yml
+++ b/Model/lookalike-model/tests/pipeline/config_clean.yml
@@ -7,6 +7,8 @@ keywords_table: 'lookalike_unittest_clean_input_keywords'
log:
level: 'ERROR' # log level for spark and app
pipeline:
+ main_keywords:
+ keyword_output_table: 'lookalike_unittest_clean_input_effective_keywords'
main_clean:
did_bucket_num: 2 # Number of partitions for did
load_logs_in_minutes: 1440 #1440/day, original=14400
diff --git a/Model/lookalike-model/tests/pipeline/config_keywords.yml
b/Model/lookalike-model/tests/pipeline/config_keywords.yml
new file mode 100644
index 0000000..4f95b65
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/config_keywords.yml
@@ -0,0 +1,22 @@
+product_tag: 'lookalike'
+pipeline_tag: 'unittest'
+persona_table_name: 'lookalike_unittest_keywords_input_persona'
+showlog_table_name: 'lookalike_unittest_keywords_input_showlog'
+clicklog_table_name: 'lookalike_unittest_keywords_input_clicklog'
+keywords_table: 'lookalike_unittest_keywords_input_keywords'
+log:
+ level: 'ERROR' # log level for spark and app
+pipeline:
+ main_keywords:
+ keyword_output_table: 'lookalike_unittest_keywords_output_keywords'
+ keyword_threshold: 0.1 # Portion of showlog traffic that a keyword must
reach to be included.
+ main_clean:
+ did_bucket_num: 2 # Number of partitions for did
+ load_logs_in_minutes: 1440 #1440/day, original=14400
+ create_keywords: False # set True for first run, then keep False to use
the created table.
+ conditions: {
+ 'starting_date': '2020-01-01',
+ 'ending_date': '2020-01-11'
+ }
+ cutting_date: 1584748800
+ length: 10
diff --git a/Model/lookalike-model/tests/pipeline/data_generator.py
b/Model/lookalike-model/tests/pipeline/data_generator.py
index 45711ce..59fc0ce 100644
--- a/Model/lookalike-model/tests/pipeline/data_generator.py
+++ b/Model/lookalike-model/tests/pipeline/data_generator.py
@@ -68,6 +68,20 @@ def create_unified_log_table (spark, table_name):
df = create_unified_log(spark)
write_to_table(df, table_name)
+# Creates raw clicklog data and writes it to Hive.
+def create_keywords_showlog_table (spark, table_name):
+ df = create_keywords_raw_log(spark)
+ df = df.withColumnRenamed('media', 'adv_type')
+ df = df.withColumnRenamed('price_model', 'adv_bill_mode_cd')
+ df = df.withColumnRenamed('action_time', 'show_time')
+ df.printSchema()
+ write_to_table(df, table_name)
+
+# Creates the effective keywords data and writes is to Hive.
+def create_effective_keywords_table(spark, table_name):
+ df = create_effective_keywords(spark)
+ write_to_table(df, table_name)
+
#==========================================
# Create dataframes for the unit tests
#==========================================
@@ -164,20 +178,6 @@ def create_raw_log (spark):
def create_cleaned_log (spark):
data = [
('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 13:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 14:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 15:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 15:59:59.00', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 15:59:59.99', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 16:00:00.00', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 16:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 17:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 18:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 19:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 20:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 21:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 22:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
- # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 23:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1',
'2020-01-01', '1', ),
('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1',
'2020-01-02', '1', ),
('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1',
'2020-01-03', '1', ),
('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G',
'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2',
'2020-01-04', '1', ),
@@ -227,7 +227,17 @@ def create_keywords(spark):
('reading', 'C021', 3),
('reading', 'C022', 3),
('reading', 'C023', 3),
- ('reading', 'C024', 3)
+ ('reading', 'C024', 3),
+ ('shopping', 'C030', 4),
+ ('shopping', 'C031', 4),
+ ('shopping', 'C032', 4),
+ ('shopping', 'C033', 4),
+ ('shopping', 'C034', 4),
+ ('education', 'C040', 5),
+ ('education', 'C041', 5),
+ ('education', 'C042', 5),
+ ('education', 'C043', 5),
+ ('education', 'C044', 5),
]
schema = StructType([
@@ -360,6 +370,53 @@ def create_trainready_filter_user_data (spark):
return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+# Returns a dataframe with unclean log data.
+def create_keywords_raw_log (spark):
+ data = [
+ ('0000001', '1000', 'splash', 'abcdef0', 'C000', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-01 12:34:56.78'), # travel
+ ('0000002', '1000', 'splash', 'abcdef1', 'C001', 'DUB-AL00', 'WIFI',
'CPC', '2020-01-02 12:34:56.78'), # travel
+ ('0000003', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000005', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000006', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000007', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000008', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000009', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000010', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000011', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000012', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000013', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # travel
+ ('0000014', '1001', 'native', 'abcdef2', 'C010', 'ABC-AL00', '4G',
'CPD', '2020-01-03 12:34:56.78'), # game-avg
+ ('0000004', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00', '4G',
'CPD', '2020-01-04 12:34:56.78'), # game-avg
+ ('0000005', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00', '4G',
'CPD', '2020-01-05 12:34:56.78'), # game-avg
+ ('0000007', '1003', 'splash', 'abcdef6', 'C020', 'XYZ-AL00', '4G',
'CPT', '2020-01-07 12:34:56.78'), # reading; only one entry for this keyword so
will be excluded.
+ ('0000008', '1003', 'splash', 'abcdef6', 'C030', 'XYZ-AL00', '4G',
'CPT', '2020-01-08 12:34:56.78'), # shopping; only one entry for this keyword
so will be excluded.
+ ('0000009', '1003', 'splash', 'abcdef6', 'C040', 'XYZ-AL00', '4G',
'CPT', '2020-01-09 12:34:56.78'), # education; just enough entries to be
included.
+ ('0000009', '1003', 'splash', 'abcdef6', 'C040', 'XYZ-AL00', '4G',
'CPT', '2020-01-09 12:34:56.78'), # education; just enough entries to be
included.
+ ('0000010', '1003', 'splash', 'abcdef6', 'C050', 'XYZ-AL00', '4G',
'CPT', '2020-01-10 12:34:56.78'), # no mapping; only one entry for this keyword
so will be excluded.
+ ('0000001', '1000', 'native', 'abcde10', 'C020', 'JKL-AL00', '4G',
'CPD', '2020-01-11 12:34:56.78'), # reading; outside the date range.
+ ]
+
+ schema = StructType([
+ StructField("did", StringType(), True),
+ StructField("adv_id", StringType(), True),
+ StructField("media", StringType(), True),
+ StructField("slot_id", StringType(), True),
+ StructField("spread_app_id", StringType(), True),
+ StructField("device_name", StringType(), True),
+ StructField("net_type", StringType(), True),
+ StructField("price_model", StringType(), True),
+ StructField("action_time", StringType(), True)
+ ])
+
+ return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+# Returns a dataframe with the effective keyword data.
+def create_effective_keywords(spark):
+ data = [('travel',), ('game-avg',), ('education',)]
+ schema = StructType([
+ StructField("keyword", StringType(), True)
+ ])
+ return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
# Prints to screen the code to generate the given data frame.
diff --git a/Model/lookalike-model/tests/pipeline/test_main_clean.py
b/Model/lookalike-model/tests/pipeline/test_main_clean.py
index 3102fa3..c302479 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_clean.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_clean.py
@@ -18,8 +18,8 @@ import unittest
import yaml
from pyspark import SparkContext
from pyspark.sql import SparkSession, HiveContext
-from pyspark.sql.functions import col
-from pyspark.sql.types import IntegerType
+from pyspark.sql.functions import col, udf, collect_set
+from pyspark.sql.types import IntegerType, BooleanType
from lookalike_model.pipeline import main_clean, util
from data_generator import *
@@ -163,10 +163,12 @@ class TestMainClean(unittest.TestCase):
keywords_table = cfg['keywords_table']
showlog_table = cfg['showlog_table_name']
clicklog_table = cfg['clicklog_table_name']
+ effective_keywords_table =
cfg['pipeline']['main_keywords']['keyword_output_table']
create_persona_table(self.spark, persona_table)
create_keywords_table(self.spark, keywords_table)
create_clicklog_table(self.spark, clicklog_table)
create_showlog_table(self.spark, showlog_table)
+ create_effective_keywords_table(self.spark, effective_keywords_table)
# Drop the output tables
showlog_output_table =
cfg['pipeline']['main_clean']['showlog_output_table']
@@ -184,22 +186,32 @@ class TestMainClean(unittest.TestCase):
bucket_num = cfg['pipeline']['main_clean']['did_bucket_num']
df_keywords = util.load_df(self.hive_context, keywords_table)
+ # run() does filtering on the effective keywords so we need to filter
+ # the raw logs with the spread app ids when validating the output.
+ effective_spread_app_ids = ['C000', 'C001', 'C002', 'C003', 'C004',
'C010', 'C011', 'C012', 'C013', 'C014', ]
+ df_log = create_raw_log(self.spark)
+ df_log = self.filter_spread_app_ids(df_log, effective_spread_app_ids)
+
# Validate the cleaned persona table.
df_persona = util.load_df(self.hive_context, persona_output_table)
self.validate_clean_persona(df_persona, bucket_num)
# Validate the cleaned clicklog table.
df_clicklog = util.load_df(self.hive_context, clicklog_output_table)
- print_df_generator_code(df_clicklog.sort('did'))
- df_log = create_raw_log(self.spark)
self.validate_cleaned_log(df_clicklog, conditions, df_persona,
df_keywords, df_log, bucket_num)
+ print_df_generator_code(df_clicklog.sort('did'))
# Validate the cleaned showlog table.
df_showlog = util.load_df(self.hive_context, clicklog_output_table)
- print_df_generator_code(df_showlog.sort('did'))
- df_log = create_raw_log(self.spark)
self.validate_cleaned_log(df_showlog, conditions, df_persona,
df_keywords, df_log, bucket_num)
+ print_df_generator_code(df_showlog.sort('did'))
+
+ def filter_spread_app_ids(self, df, spread_app_ids):
+ # User defined function to return if the keyword is in the inclusion
set.
+ _udf = udf(lambda x: x in spread_app_ids, BooleanType())
+ # Return the filtered dataframe.
+ return df.filter(_udf(col('spread_app_id')))
#========================================
# Helper methods
diff --git a/Model/lookalike-model/tests/pipeline/test_main_keywords.py
b/Model/lookalike-model/tests/pipeline/test_main_keywords.py
new file mode 100644
index 0000000..6a50389
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/test_main_keywords.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from lookalike_model.pipeline import main_keywords, util
+from data_generator import *
+
+class TestMainKeywords(unittest.TestCase):
+
+ def setUp (self):
+ # Set the log level.
+ sc = SparkContext.getOrCreate()
+ sc.setLogLevel('ERROR')
+
+ # Initialize the Spark session
+ self.spark = SparkSession.builder.appName('unit
test').enableHiveSupport().getOrCreate()
+ self.hive_context = HiveContext(sc)
+
+ def test_run(self):
+ print('*** Running test_run ***')
+ with open('pipeline/config_keywords.yml', 'r') as ymlfile:
+ cfg = yaml.safe_load(ymlfile)
+
+ util.resolve_placeholder(cfg)
+
+ # Create the input tables.
+ cfg_clean = cfg['pipeline']['main_clean']
+ showlog_table = cfg['showlog_table_name']
+ keywords_mapping_table = cfg['keywords_table']
+ create_keywords_mapping = cfg_clean['create_keywords']
+ create_keywords_table(self.spark, keywords_mapping_table)
+ create_keywords_showlog_table(self.spark, showlog_table)
+
+ # Drop the output tables.
+ cfg_keywords = cfg['pipeline']['main_keywords']
+ keyword_threshold = cfg_keywords['keyword_threshold']
+ effective_keywords_table = cfg_keywords['keyword_output_table']
+ util.drop_table(self.hive_context, effective_keywords_table)
+
+ start_date, end_date, load_minutes = util.load_batch_config(cfg)
+
+ main_keywords.run(self.hive_context, showlog_table,
keywords_mapping_table, create_keywords_mapping,
+ start_date, end_date, load_minutes, keyword_threshold,
effective_keywords_table)
+
+ df_keywords = util.load_df(self.hive_context, effective_keywords_table)
+
+ self.validate_effective_keywords(df_keywords)
+
+
+ def validate_effective_keywords(self, df):
+ # Check the column names.
+ self.assertTrue('keyword' in df.columns)
+
+ # Expected keywords in the dataframe.
+ keywords_match = ['education', 'travel', 'game-avg']
+
+ # Check number of rows.
+ self.assertEqual(df.count(), len(keywords_match))
+
+ df.show()
+
+ # Check the values of the rows.
+ # keywords = df.agg(collect_list('keyword')).collect()
+ # for value in keywords_match:
+ # self.assertTrue(value in keywords)
+ for row in df.collect():
+ self.assertIn(row['keyword'], keywords_match)
+
+# Runs the tests.
+if __name__ == '__main__':
+ # Run the unit tests.
+ unittest.main()
diff --git a/Model/lookalike-model/tests/run_test.sh
b/Model/lookalike-model/tests/run_test.sh
index e4fdff9..e360cdf 100644
--- a/Model/lookalike-model/tests/run_test.sh
+++ b/Model/lookalike-model/tests/run_test.sh
@@ -3,6 +3,12 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd $DIR
+# test_main_logs: identifies the keywords that have traffic greater than a set
percentage.
+if false
+then
+ spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf
spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
pipeline/test_main_keywords.py
+fi
+
# test_main_clean: preparing cleaned persona, click and show logs data.
if false
then
@@ -15,8 +21,21 @@ then
spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf
spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
pipeline/test_main_logs.py
fi
-# test_main_logs: merges click and show log data.
-if true
+# test_main_trainready: aggregates the click and show log data by user.
+if false
then
spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf
spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
pipeline/test_main_trainready.py
fi
+
+# test_score_matrix_table: Converts the score vector table into matrices.
+if true
+then
+ spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf
spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
application/pipeline/test_score_matrix_table.py
+fi
+
+# test_top_n_similarity_table_generator: Finds the top n users similar to the
given user.
+if false
+then
+ spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf
spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
application/pipeline/test_top_n_similarity_table_generator_1.py
+ spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf
spark.hadoop.hive.exec.dynamic.partition=true --conf
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
application/pipeline/test_top_n_similarity_table_generator_2.py
+fi