Amazon Personalize 是一个全托管式的机器学习服务,开发人员在没有机器学习背景的情况下,可以使用自己熟悉语言的 SDK 或者控制台 GUI 点击轻松地构建一个实时推荐系统任务来满足对应业务的需要。目前 Amazon Personalize 已经在个性化推荐,相似物品推荐,个性化物品排名,以及个性化促销和通知等使用场景均有比较好的使用案例。在 2021 年 11 月 29 日,Amazon Personalize 也新推出了智能用户细分的功能。使用用户细分的这个新功能不仅可以帮助运营团队进行精细化运营,也可以帮助销售和市场团队进行客户分群来做到精准营销以提高 ROI。在电商场景下,可以针对不同的商品,对潜在用户进行划分,以便更有针对性地进行投放活动。本文将以一个广告投放的使用场景为例,使用 Python Boto3 的 SDK 介绍如何使用 Amazon Personalize 的新功能来帮助运营人员和市场人员完成一个用户细分的任务。
数据集构建
为了完成一个 Amazon Personalize 的训练任务,我们首先需要构建数据集。ETL 工程师从数仓中导出的 Raw Data 可能是以下形式。其中 uid 代表了完成广告浏览并产生交互行为的用户 ID,event_track 代表相应用户在应用内产生的用户行为,ad_id 代表影响用户的相应广告 ID(或标签),generated_time 代表用户完成用户行为时被记录的时间戳。
uid |
event_track |
ad_id |
generated_time |
3591406 |
创建订单 |
ad-label1b |
2021-12-18 03-16-24 |
3591417 |
创建订单 |
ad-label2a |
2021-12-17 17-40-02 |
3591406 |
支付 |
ad-label2b |
2021-12-20 17-59-20 |
3592961 |
支付 |
ad-label4c |
2021-12-17 17-14-19 |
3592961 |
购买金币 |
ad-label4c |
2021-12-17 17-14-17 |
3578063 |
注册完成 |
ad-label7b |
2021-12-17 17-14-16 |
Amazon Personalize 支持三种数据集,分别是用户数据集,物品数据集,以及用户交互数据集。
- Users – User Dataset 用于记录用户的 metadata。例如用户的年龄,性别,会员信息等。
- Items – Item Dataset 用于记录物品的 metadata。例如商品的 SKU,价格等,在广告投放中,可以记录广告的品类以及标签。
- Interactions – Interaction Dataset 包含了 User 和 Item 的历史和实时数据。例如用户的点击, 浏览,点赞等行为。在 Amazon Personalize 中,每一次用户事件都作为一条记录会被用于作为训练数据任务。
为了完成 Amazon Personalize 的作业至少需要创建一个交互数据集来完成训练。Amazon Personalize 的交互数据集需要至少一列 USER_ID, 一列 ITEM_ID,一列 TIMESTAMP来完成训练任务,我们也可以提供 EVENT_TYPE 和 EVENT_VALUE 数据来显式地对模型部署时提高精准投放能力。
首先我们需要对数据集进行清洗和数据预处理,以满足 Amazon Personalize 的数据格式需要。这里我们将使用 Pandas 完成这个步骤。你可以使用云上的 SageMaker Notebook Instance 来运行代码,也可以使用本地的 Jupyter Notebook 来完成。
import pandas as pd
import datetime
import time
from numpy import float32, uint32
def string_to_unix_time_long(date_string):
casted_time = date_string[:13]+":"+date_string[14:16]+":"+date_string[17:]
return uint32(time.mktime(pd.to_datetime(casted_time).timetuple())).item()
raw_data["generated_time"] = raw_data["generated_time"].apply(string_to_unix_time_long)
以上代码可以将事例中 generated_time 的格式替换成为 Amazon Personalize 要求的 UNIX epoch 时间格式。为了实现精准投放,我们将 event_track 中的用户行为转化为点击事件,并对其中的各种用户行为按照各种行为的重要程度进行赋值,将 categorical data 转化为 numerical data,以便后续训练模型时进行针对训练。
event_map = {
'创建订单':3,
'支付':4,
'加入购物车': 2,
'注册完成': 1
}
def event_mapping_method(event):
return event_map[event]
raw_data["event_tracking"] = raw_data["event_tracking"].apply(event_mapping_method)
raw_data['EVENT_TYPE'] = "click"
以上代码对EVENT_VALUE 按照用户行为的重要性进行赋值,并追加 click事件列。
至此,我们完成了一个简易的数据集组的构建。接下来我们将对 Amazon Personalize 导入并创建数据集。
import boto3
from time import sleep
import json
personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')
create_dataset_group_response = personalize.create_dataset_group(
name = "segmentation-dataset"
)
dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))
我们使用以下代码来监控数据集创建任务。待创建完成后,方可继续后续步骤。
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
describe_dataset_group_response = personalize.describe_dataset_group(
datasetGroupArn = dataset_group_arn
)
status = describe_dataset_group_response["datasetGroup"]["status"]
print("DatasetGroup: {}".format(status))
if status == "ACTIVE" or status == "CREATE FAILED":
break
time.sleep(60)
接下来,我们创建数据集的 Schema 以便 Amazon Personalize 可以完成数据集的构建。
interactions_schema = schema = {
"type": "record",
"name": "Interactions",
"namespace": "com.amazonaws.personalize.schema",
"fields": [
{
"name": "USER_ID",
"type": "string"
},
{
"name": "ITEM_ID",
"type": "string"
},
{ "name": "EVENT_TYPE",
"type": "string"
},
{
"name": "EVENT_VALUE",
"type": "float"
},
{
"name": "TIMESTAMP",
"type": "long"
}
],
"version": "1.0"
}
create_schema_response = personalize.create_schema(
name = "segmentation-schema",
schema = json.dumps(interactions_schema)
)
interaction_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))
之后,我们将创建一个 S3 桶,以及相应的桶访问策略。用于存放 Amazon Personalize 的数据集,以及做用户细分时所需要的输入输出文件路径。请注意,S3 桶创建的地区需要和 Amazon Personalize 的地区保持一致,以便 Amazon Personalize 能够正常使用 S3 中的数据来进行作业。
s3 = boto3.client('s3')
region = "us-east-2"
bucket_name = "personalize-user-segmentation-poc-2022-1-1"
s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': region})
policy = {
"Version": "2012-10-17",
"Id": "PersonalizeS3BucketAccessPolicy",
"Statement": [
{
"Sid": "PersonalizeS3BucketAccessPolicy",
"Effect": "Allow",
"Principal": {
"Service": "personalize.amazonaws.com"
},
"Action": [
"s3:*Object",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::{}".format(bucket_name),
"arn:aws:s3:::{}/*".format(bucket_name)
]
}
]
}
s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy))
接下来,我们将数据集上传到云上的 S3,以便后续使用。
raw_data.to_csv("segmentation.csv",index=False)
boto3.Session().resource('s3').Bucket(bucket_name).Object("segmentation.csv").upload_file("segmentation.csv")
我们使用以下代码创建一个 IAM 角色,并附给 Amazon Personalize 用于访问 S3 存储桶读写权限。
iam = boto3.client("iam")
role_name = "Personalize-User-Segmentation-Role"
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "personalize.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
create_role_response = iam.create_role(
RoleName = role_name,
AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
RoleName = role_name,
PolicyArn = policy_arn
)
iam.attach_role_policy(
PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',
RoleName=role_name
)
time.sleep(60) # 等待 IAM 角色附加完成
role_arn = create_role_response["Role"]["Arn"]
print(role_arn)
接下来我们将创建数据集并进行数据导入作业。
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
name = "personalize-user-segmentation-data-1",
datasetType = dataset_type,
datasetGroupArn = dataset_group_arn,
schemaArn = interaction_schema_arn
)
# 创建数据集
interactions_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))
time.sleep(100) # 等待数据集创建完成
# 创建数据集导入任务
create_dataset_import_job_response = personalize.create_dataset_import_job(
jobName = "personalize-segmentation-import-1",
datasetArn = interactions_dataset_arn,
dataSource = {
"dataLocation": "s3://{}/{}".format(bucket_name, "segmentation.csv")
},
roleArn = role_arn
)
dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))
至此我们完成了数据集导入的任务。接下来我们将创建用户分群的任务。
用户分群
接下来我们将创建用户分群的任务。目前用户分群是 Amazon Personalize 的一项批处理任务,因此我们需要提前准备好需要针对分群的 Item ID,并按照如下格式创建好输入的 Json 文件,上传到 S3 中作为输入文件。
{"itemId": "ad-label1b"}
{"itemId": "ad-label1c"}
本文,我们以广告 ID ad-label1b 和 ad-label1c 为例,来选取对这两个广告有兴趣的潜在用户,以便市场部门进行针对性广告投放。我们使用以下代码创建名为 ad_id_list.json 的广告 ID文件,并上传至 S3,以便后续创建分群作业使用。
!echo -e "{"itemId": "ad-label1b"}\n{"itemId": "ad-label1c"}" > ad_id_list.json
boto3.Session().resource('s3').Bucket(bucket_name).Object("ad_id_list.json").upload_file("ad_id_list.json")
接下来我们将创建用户分群的解决方案以及方案版本,此方案创建中,我们定义 eventValue 值大于 2 的行为用于训练。即,对本例中广告投放的用户行为中至少点击了 “加入购物车” 的行为来进行精准地训练。
user_segmentation_recipe_arn ="arn:aws:personalize:::recipe/aws-item-affinity"
user_personalization_create_solution_response = personalize.create_solution(
name = "personalize-user-segmentation",
datasetGroupArn = dataset_group_arn,
recipeArn = user_segmentation_recipe_arn,
eventType = "click",
solutionConfig ={
"eventValueThreshold":"2"
}
)
user_segmentation_solution_arn = user_personalization_create_solution_response['solutionArn']
print(json.dumps(user_segmentation_solution_arn, indent=2))
time.sleep(60) # 等待方案创建完成后创建版本
user_segmentation_create_solution_version_response = personalize.create_solution_version(
solutionArn = user_segmentation_solution_arn
)
solution_version_arn = user_segmentation_create_solution_version_response["solutionVersionArn"]
print(json.dumps(solution_version_arn, indent=2))
# 以下代码会轮训方案创建状态,待方案创建完成后方可进行作业任务
max_time = time.time() + 10*60*60 # 10 hours
while time.time() < max_time:
version_response = personalize.describe_solution_version(
solutionVersionArn = solution_version_arn
)
status = version_response["solutionVersion"]["status"]
if status == "ACTIVE" or status == "CREATE FAILED":
break
time.sleep(60)
接下来我们将进行用户分群作业,并输入先前创建的广告 ID 文件,以针对每个广告投放生成的潜在用户数。分群的输出结果将写入 S3 output 文件夹中。
batch_job_arn = personalize.create_batch_segment_job (
solutionVersionArn = solution_version_arn,
jobName = "personalize-user-segmentation-job-6",
numResults = 25, # 生成 1 and 5,000,000 个对投放广告对象的潜在用户数
roleArn = role_arn,
jobInput =
{"s3DataSource": {"path": "s3://personalize-user-segmentation-poc-2022-1-1/ad_id_list.json"}},
jobOutput =
{"s3DataDestination": {"path": "s3://personalize-user-segmentation-poc-2022-1-1/output/"}}
)["batchSegmentJobArn"]
接下来我们会轮训地监控任务状态,任务完成后会在 S3 目录中生成 ad_id_list.json.out 文件,等待任务完成后拉去并展示结果。
# 以下代码会轮训任务作业创建状态,待作业完成后方可取回结果
max_time = time.time() + 10*60*60 # 10 hours
while time.time() < max_time:
job_response = personalize.describe_batch_segment_job(
batchSegmentJobArn= batch_job_arn
)
print(job_status)
job_status = job_response["batchSegmentJob"]["status"]
if job_status == "ACTIVE" or job_status == "CREATE FAILED":
break
time.sleep(60)
s3.download_file('personalize-user-segmentation-poc-2022-1-1', 'output/ad_id_list.json.out', 'potential_user.json')
!cat potential_user.json
至此我们完成了用户行为数据集的数据清洗和预处理,数据集的构建,用户分群方案和方案版本的构建,以及针对广告 ID 的用户分群的批处理以及潜在用户的结果生成。用户分群的使用场景非常广泛,在电商场景中的数据应用本方案,可以将电商中的商品对应本文中的广告 ID,寻找对潜在商品感兴趣的用户进行针对推荐,也可以导入 ITEM 数据集,轻松构建商品的品类和属性,创建针对不同品类/SKU 的商品的推荐用户列表。取得的用户 ID ,可以进行针对性推送通知推荐和优惠活动,进行用户激活,引导潜在用户的购买行为。
除此之外,Amazon Personalize 也可以支持增量的数据训练,以满足不同场景的用户需要。
本篇作者