亚马逊AWS官方博客

使用 Amazon Neptune 与 Amazon Redshift 构建客户 360 度知识库

Original Link: https://aws.amazon.com/cn/blogs/database/building-a-customer-360-knowledge-repository-with-amazon-neptune-and-amazon-redshift/

组织如今可以构建并部署各类大型数据平台(包括数据湖、数据仓库以及LakeHouse),借以捕捉并分析客户旅程中的整体视图。这类数据平台的核心意义,在于了解影响客户满意度的重要指标,并提取出对参与度贡献最大的客户行为模式。目前的应用场景虽然能够捕捉客户的各个联系点,但这些数据往往分布在不同位置(例如外部合作伙伴、聊天机器人或呼叫中心等SaaS平台、本地系统、以及Google Analytics等公有云环境),各数据点之间几乎没有链接能够直通。因此,其中的最大挑战就是如何在由最初接触到客户演进至当前状态期间,将这些不同数据源连接起来。我们还需要通过知识库对客户进行360度全方位审视,其中应包含客户旅程中各项关键属性以及指向对应数据的连接。最后,这套知识库还必须灵活、敏捷且可扩展,保证其能够随着数据格局的发展而不断支持更多新型数据模式。

Amazon Neptune是一项专门为此而生的图数据库服务,可用于存储相互关联的数据模式,帮助用户以近实时方式捕捉来自客户旅程的数据并建立起之前提到的客户360度视图。这种全关联数据解决方案还将为您的客户服务代表提供业务洞见、协助推动新的销售举措、为客户提供建议、支持客户360度仪表板,或者直接作为机器学习分析解决方案的数据存储库使用。

在本文中,我们将展示如何使用Neptune为家庭保险场景下的客户旅程建立客户360度知识库解决方案。此次演练中涉及的基本步骤如下:

  1. 构建一套沙箱环境,用于显示关联数据平台的价值证明(POV)。
  2. 从初始接触到实际参与,定义活跃客户在旅程中的各个步骤。
  3. 使用Amazon Redshift SQL与分析功能,借此理解客户计算器中来自不同来源的数据之间的关联链接与数据模式。
  4. 在实体关系(ER)示意图中定义业务状态实体及其关联关系。
  5. 创建一套图模型,通过ER示意图与沙箱数据确定其顶点、边与属性。
  6. 通过Amazon Redshift在Amazon S3上生成Neptune加载文件。
  7. 在Neptune中批量加载关联图数据。
  8. 构建客户360度应用,以证明关联数据的商业价值。
  9. 将Neptune与数据管道集成起来,借此连接数据存储并将新的事实发布在专用数据库当中。

下图所示,为以上提到的整个流程。

解决方案概述

企业数据平台使用多种数据捕捉机制(Web前端、移动应用、呼叫中心、外部馈送、外部数据代理与合作伙伴、云前端与内部托管应用等)以提取客户旅程数据。在典型的数据平台当中,要了解客户的行为模式,我们需要提取、转换并管理不同类型数据存储库内所保留的全部数据类型。我们可以大致将数据源模式划分为外部结构化、外部非结构化、内部结构化与内部非结构化四种。随着数据模式的增长与变化(新的或经过修改的、以客户为中心的产品),客户旅程数据往往会在多种数据存储体系之间发生脱节。一旦脱节,我们将很难将所有客户数据关联起来以建立起整体数据状态。数据湖将沦为数据沼泽,而我们必须清理并建立新的专用存储体系以应对以往无法解决的业务挑战、减少客户的不满。

您可以使用Tinkerpop属性图在企业数据湖的策划层(curated layer)中捕捉各数据实体(即顶点)与相互连接(即边)之间的关系。借助Neptune提供的可变schema(新属性)与新实体(顶点与边)的可扩展添加功能,您可以近实时存储来自新馈送的客户数据,并更快发布客户360度视图。下图对这些架构方案做出了比较。

演练

在本文的用例中,我们假定某家家庭保险公司希望了解刚刚推出的保险范式给客户行为带来了怎样的影响。新范式将使用现代应用程序签约新客户,帮助他们购买家庭或公寓安全险。您需要根据访问路径(网络浏览器、移动应用或呼叫中心)了解客户旅程的当前状态,并以适当的价格与赔偿范围组织保险单条款,从而提升客户的长期满意度与忠诚度。

本文将探讨四种不同的数据摄取模式,分析由无关数据带来的运营挑战,如何使用Amazon Redshift分析并创建图数据,以及Neptune如何增加商业价值、通过数据之间的关联点以近实时方式为客户建立360度视图。下表对这四种摄取模式做出了总结:

源来源 数据类型 源类型 数据描述
外部 结构化数据 呼叫中心数据

呼叫中心由SaaS解决方案提供支持。服务供应商提供关于客户呼叫与客户代理交互方式的每日呼叫记录。

这套系统主要强调高效处理客户呼叫。

外部 非结构化数据 Web分析数据

由第三方捕捉获得的Web分析结果。主要为每日网页浏览量与事件。

收集数据以了解客户的Web访问模式,并改善Web应用程序的使用体验。

内部 结构化数据 核心交易数据

从关系数据库内捕捉到的报价、保单与客户信息等保险业务数据。

这些数据用于生成新政策影响指标,以及客户旅程中断期间的指标。

内部 非结构化数据 应用服务器事件

Amazon CloudWatch当中捕捉到的应用服务器事件,其中包含跟踪关键应用实体所获得的浏览信息。

使用日志分析,我们可以扫描数据以提高Web应用程序的可靠性与可用性。

构建沙箱环境

在本示例中,我们使用AWS平台各组件(Amazon S3、Amazon Redshift、Neptune)构建一套沙箱环境,用于进一步实现对关联数据平台的价值证明。关于相关命令与程序的更多详细信息,请参阅AWS说明文档

要建立这样一套环境,大家需要完成以下步骤:

  1. 创建一个S3存储桶(cjsandboxpov<last five digits of account>),其中应包含以下文件夹以存储来自现有数据平台的示例数据:
    • /landingarea – “着陆区”文件夹,用于从四个来源处收集CSV或Parquet格式的数据。
    • /neptune_vertex – Neptune负载顶点文件夹,用以转储与图模型中顶点数据相关的CSV负载文件。
    • /neptune_edge – Neptune负载边文件夹,用以转储与图模型中边数据相关的CSV负载文件。

本文所使用的S3存储桶名称为 cjsandboxpov99999。

  1. 直接使用各项默认存储桶安全策略。
  2. 在VPC沙箱环境中创建一个Amazon Redshift集群以上传源数据。
    • 关于集群大小,其数据存储容量应达到原始数据量的两倍(例如,如果原始数据最高为2 TB,则应创建一个双节点ds2.xlarge集群)。
    • 将IAM角色与Amazon Redshift相关联,借此面向S3存储桶实现读取与写入。

关于更多详细信息,请参阅《使用IAM角色授权COPY、UNLOAD以及CREATE EXTERNAL SCHEMA等操作。

  1. 使用Amazon Redshift查询编辑器接入Amazon Redshift数据库,而后创建沙箱schema cjsandbox以定义各输入源对象。
  2. 创建一个Neptune集群,用于捕捉图属性以构建知识库:
    • 选择最新版本,创建一个db.r5large集群,无需创建读取副本。
    • 选择 cjsandbox 作为数据库标识符,其他选项全部保留默认值。
    • 开始创建Neptune集群。

探索客户旅程并捕捉关键状态

下图所示,为客户在旅程当中可以采取的各种路径。

在房屋、公寓或财产保险等业务领域当中,用户可以选择多种方式与保险服务商进行合作。通过移动应用,我们可以方便快捷地为公寓财产购买保险;但如果您拥有更多具体的保险需求,那么财产的面积、地理位置等因素也会让保险条款制定变得非常复杂。

保险公司会通过多种选项(移动应用、Web应用或者呼叫中心等)与潜在客户进行实效接触,借此支持不断发展的多元化市场。我们的目标是提供最佳客户体验,并在客户遇到麻烦或者打算放弃时迅速提供帮助。

最佳客户体验要求我们第一时间捕捉到客户旅程的当前状态,以更快的速度提供决策选项,从而引导客户接受并签署保险条款。我们的目标在于尽可能减少负面用户体验、消除多种选项带来的复杂性、提供更多服务/成本层面的最优建议、并建立起更加简洁清晰的界面导航机制,这一切都是为了避免客户在操作过程中选择放弃。另外,我们还需要让企业代表(销售/服务代表或者应用程序开发人员)及时掌握与客户状态及指标相关的最新趋势,引导客户成功签署保单。

在客户旅程当中,我们可以捕捉到以下几项关键状态:

  • 初始客户信息——电子邮件、地址、电话等。
  • 呼叫中心数据——电话号码、客服代理、状态等。
  • Web应用程序日志——会话、报价、最新状态等。
  • Web分析日志——访问、页面浏览、页面事件等。
  • 核心旅程数据——政策、报价、客户放弃或拒绝等。

用顶点与边捕捉客户旅程并生成ER图与模型图

在定义了客户旅程状态之后,我们接下来需要确定交易实体、并捕捉其中的数据以实现交互。下图所示,为四种相互隔离的客户视图。

每个数据存储都由关键事实表组成。下表总结了事件表及其相关键属性。

数据存储 关键ID 关键属性 其他事实参考列
Web Analytics VISIT VISIT_ID Visit time, browser ID SESSION_ID generated by application (SESSION)
PAGE_VIEW PAGE_ID VISIT_ID, PAGE INFO VISIT_ID to VISIT
PAGE_EVENTS EVENT_IG PAGE_ID, EVENT_INFO PAGE_ID to PAGE_VIEW
CALL Data CALL CALL_ID AGENT_ID, PHONE_NO, CALL_DETAILS

PHONE_NO to CUSTOMER

AGENT_ID to AGENT

AGENT AGENT_ID AGENT_INFO AGENT_ID to CALL
LOG Data SESSION SESSION_ID QUOTE_ID, EMAIL, CUSTOMER_INFO

QUOTE ID to QUOTE

EMAIL to Customer

SESSION_ID to VISIT

CORE Data CUSTOMER CUSTOMER_ID

PHONE, EMAIL, DEMOGRAPHICS,

ADDRESS_INFO

PHONE to CALL

EMAIL to SESSION

ADDRESS_ID to ADDRESS

ADDRESS ADDRESS_ID City, Zip, State, Property address ADDRESS_ID to CUSTOMER
QUOTE QUOTE_ID

Quote info, Email,

CALL_ID , SESSION_ID

Email to CUSTOMER

CALL_ID to CALL

SESSION_ID to SESSION

SESSION_ID to VISIT

POLICY POLICY_ID

Policy info, QUOTE_ID,

CUSTOMER_ID

CUSTOMER_ID to CUSTOMER

QUOTE_ID to QUOTE

在查看表中的数据及其关联关系之后,我们可以分析ER图以获取图模型的关键实体。每一项关键表事实都可作为主顶点的来源,例如CUSTOMER、ADDRESS、CALL、QUOTE、SESSION、 VISIT、PAGE_VIEW以及 PAGE EVENT。此外,我们还需要确定其他作为顶点的关键实体,用于连接更多不同实体(例如PHONE与ZIP)的关键属性。而引用各表之间的关系,则可帮助我们确定不同顶点之间的边。下图所示,为基于确定顶点与边的图模型。

接下来,我们需要设计图模型以支持Neptune TInkerpop属性图的要求。Neptune数据库中的每一个顶点与边实体都需要唯一的ID。大家可以将实体的前缀值与当前唯一业务标识符组合起来以生成该ID。下表所示,为在Neptune数据库内创建唯一标识顶点及边的部分命名标准示例。顶点实体与由三个大写字母表示的首字母缩写词关联,而该缩写词又与数据库唯一标识符相关联。边名称由边的起始点决定,一条边沿指向方向分别为〈起始顶点〉-〈指向顶点〉-〈子唯一id〉,用于描述两个顶点间的一对多关系。由于单一实体对应唯一的主键值,因此只要使用前缀进行连接,即可保证该值在Neptune数据库内保持唯一性。

实体 实体类型 ID格式 示例ID
Customer 顶点 CUS|| <CUSTOMER_ID> CUS12345789
Session 顶点 SES||<SESSION_ID> SES12345678
Quote 顶点 QTE||<QUOTE_ID> QTE12345678
VISIT 顶点 GAV||<VISIT_ID> GAEabcderfgtg
PAGE VIEW 顶点 GAP||<PAGE_ID> GAPadbcdefgh
PAGE EVENT 顶点 GAE||<PAGE_EVENT_ID> GAEabcdefight
ADDRESS 顶点 ADD||<ADDRESS_ID> ADD123344555
CALL 顶点 CLL||<CALL_ID> CLL45467890
AGENT 顶点 AGT||<AGENT_ID> AGT12345467
PHONE 顶点 PHN||<PHONEID> PHN7035551212
ZIP 顶点 ZIP||<ZIPCODE> ZIP20191

Customer->Session

(1 -> Many)

CUS_SES||<SESSION_ID> CUS_SESS12345678

Session-> Customer

(Many -> 1)

SES_CUS||<SESSION_ID> SES_CUST12345678

ZIP->ADDRESS

(1 -> Many)

ZIP_ADD||<ADDRESS_ID> ZIP_ADD1234567

ADDRESS-> ZIP

Many-> 1)

ADD_ZIP||<ADDRESS_ID> ADD_ZIP1234567

在Amazon Redshift中对策划数据进行关系分析

Amazon Redshift提供一套大规模并发处理(MPP)平台,用以查看大量数据中的数据模式。您也可以使用其他编程选项(例如AWS Glue)为存储在经过策划的Amazon S3数据湖内的数据创建Neptune批量加载文件。但是,Amazon Redshift提供的数据库平台能够分析数据模式并查找不同数据源之间的关联性,同时提供SQL接口以生成批量加载文件。对于以数据库为中心的分析类用户而言,单是SQL接口一项就足以成为选择它的理由,这就避免了我们另花时间学习如何构建PySpark库。本文假定数据存储中的策划数据以CSV或Parquet格式发布在基于Amazon S3的数据湖内。您可以使用COPY命令将数据中经过整理的数据加载至Amazon Redshift数据库当中。

要在沙箱schema中创建客户表,请使用以下操作代码:

Create table sandbox.customer (customer_id number,email. varchar(40),phone varchar(20),address_id number,

..);

要从Amazon S3将数据复制至该客户表,请使用以下操作代码:

copy customerfrom ‘s3://cjsandboxpov9999/landingarea/customer.dat’ iam_role ‘arn:aws:iam::0123456789012:role/MyRedshiftRole’;

将顶点与边文件转储至Amazon S3

Neptune数据库支持对CSV文件的批量加载,其中每个文件都需要一条头记录。根据需要,顶点文件应包含顶点ID属性与标签属性。其余属性则用于为头记录中的各属性指定属性格式。边转储文件则需要具有一个ID(起始顶点ID与终止顶点ID),并在标签中包含必要属性。边数据中还可包含其他必要属性。

以下代码为会话顶点转储记录示例:

~id,session_id:int,marketing_id:String,quote_id:int,date_created:String,~label
SES29557958,29557958,8016200001,16524635,"2019-02-04 21:51:33.229275",session
SES29557961,29557961,pgrcrp,16524645,"2019-02-04 21:51:33.229275",session
SES29557963,29557963,,16524639,"2019-02-04 21:51:33.229275",session
SES29557965,29557965,InternetW,16524638,"2019-02-04 21:51:33.229275",session
SES29557966,29557966,pgrcrp,16524637,"2019-02-04 21:51:33.229275",session
SES29557967,29557967,9003849004,16524642,"2019-02-04 21:51:33.229275",session
SES29557968,29557968,,16524641,"2019-02-04 21:51:33.229275",session
SES29557969,29557969,OLS,16524653,"2019-02-04 21:51:33.229275",session

以下代码为会话报价边转储记录示例:

~id,~from,~to,~label
"SESQTE16524635",SES29557958,QTE16524635,has_quote
"SESQTE16524645",SES29557961,QTE16524645,has_quote
"SESQTE16524639",SES29557963,QTE16524639,has_quote
"SESQTE16524638",SES29557965,QTE16524638,has_quote
"SESQTE16524637",SES29557966,QTE16524637,has_quote
"SESQTE16524642",SES29557967,QTE16524642,has_quote

由于所有相关数据都位于Amazon Redshift当中,因此您可以使用视图定义以简化各转储文件的创建流程,借此实现Neptune批量加载。示例定义将保证头记录始终为转储文件中的第一条记录。但在向Amazon S3创建转储文件时,我们需要考虑以下限制:

  • 列中的NULL值会令整体字符串为null。视图会使用NVL函数以避免当某项属性为NULL时,该顶点被直接跳过的问题。
  • 在属性数据中使用逗号或者其他唯一字符,会导致转储文件的数据格式无效。视图会使用replace函数以消除数据流中的此类字符。
  • 这里的unload命令最大只能生成6 GB大小的单一转储文件。任何大小溢出的文件都将没有头记录,并导致Neptune无法正常加载该格式。通过视图,我们可以按键列对数据进行排序以分块实现数据转载,其中每个数据块都将具有符合Neptune格式要求的头记录。

以下示例代码,为建立顶点源视图的视图定义:

create or replace view sandbox.v_session asSelect * from.                 --- HEADER FORMAT(select 'header:string,’||. – sort column to ensure header is first record~id,‘session_id:int,start_date:string,,quote_id:string,'||. ‘date_created:string,last_interaction_date:string,policy_effective_date:string,’||'~label' session_data     -- LABEL attributefrom sandbox.dummy.   – DUMMY table with 1 rowunion allselect 'row,’||‘SES’||id||','||id||’,’||start_date||’,’||quote_id||','||date_created||’,’||last_interaction_date||’,’||policy_effective_date||’,’||'session'. – LABEL for vertexfrom sandbox.session )

order by 1, ID;

以下示例代码,为建立边源视图的视图定义:

create or replace view sandbox.v_session_quote as
  SELECT * FROM (
    SELECT 'header:string,~id,~from,~to,' || '~label' AS session_quote_data
    FROM sandbox.dummy
    UNION ALL
    SELECT 'row,SESQTE'|| sessionid || ',SES' || sessionid || ',QTE' || quote_id|| ',' ||
    'sess_quote'
    FROM (SELECT c.sessionid, s.quote_id
          FROM sandbox.customer c,
               sandbox.session s,
              sandbox.quote q
              WHERE c.sessionid= s.id AND
                   q.quote_number = s.quote_id )
  ) WHERE 
   ORDER BY 1,2;

要创建顶点与边转储文件,请在Amazon Redshift当中使用UNLOAD命令。详见以下操作代码:

unload ('select * from sandbox.v_session')
to 's3://cjsandboxpov9999/neptune_vertex/v_session.dat' 
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
PARALLEL OFF;

unload ('select * from sandbox.v_session_quote')
to 's3://cjsandboxpov9999/neptune_edge/v_session_quote.dat' 
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
NOPARALLEL OFF;

将顶点与边转储文件加载至Neptune数据库

顶点与边转储文件被创建在两个不同的Amazon S3文件夹当中。顶点转储文件首先以边的形式进行加载,这就要求数据库中预先存在边的起点与终点。您可以一次加载一个文件,也可以整体加载一个文件夹。以下代码为Neptune中的加载命令:

curl -X POST \
-H 'Content-Type: application/json' \
https://<instance_name>>.us-east-1.neptune.amazonaws.com:8182/loader -d '
{
"source" : "s3://<<<location>> ",
"format" : "csv",
"iamRoleArn" : "ARN for IAM role with access to read from S3 Bucket",
"region" : "us-east-1",
"failOnError" : "FALSE",
"parallelism" : "HIGH"
}’

加载作业会为每一项作业创建一个不同的加载器ID。您可以将各加载器ID以变量形式传递给wget命令,借此监控加载作业的执行进度。

要查看加载状态,请使用以下操作代码:

wget https://<<instance>.us-east-1.neptune.amazonaws.com:8182/loader/{loader_id}?details=true

要检查加载状态中的错误,请使用以下操作代码:

wget ‘https://<<instance>.us-east-1.neptune.amazonaws.com:8182/loader/{loader_id}?details=true&errors=true’

对关联数据库执行价值证明

加载完成并不是终点,我们还需要验证新解决方案能否切实解决业务问题。现在,关联后的Neptune数据库可以为客户服务代表创建一套整体视图。本文中使用的示例Python代码,描述了如何与最终用户共享一套根据关联数据创建的客户360度视图。请完成以下操作步骤:

  1. 使用客户ID(例如电子邮件)标记起始顶点。以下代码为Gremlin查询示例:
  1. g.V().hasLabel(‘customer’). \\ select customer vertex has(email,’xxxx@mail.com’). \\ identify individual through properties

valueMap().fold().next()) \\ get all properties

  1. 导航至对接客户顶点的地址顶点,获取全部关联地址信息。参阅以下代码示例:

g.V().hasLabel(‘customer’).has(’email’,’xxxxxx@mail.com’).out().hasLabel(‘address’).project(‘address’,’city’,’zip’,’state’).by(‘address’).by(‘city’).by(‘zip’).by(‘state’).toList()

  1. 导航至全部电话号码顶点,按时间顺序捕捉所有呼叫数据。
  2. 导航至会话与Web分析访问,捕捉所有Web活动。
  3. 导航至相关报价,而后查看保险选择模式。
  4. 识别客户的当前状态。

对客户数据中连接体系的可视化,有助于Neptune图数据库实现最大业务价值。我们可以分析Neptune中的图以发现新的数据集与数据模式,而后将这些新模式作为机器学习模型的输入素材,帮助优化客户旅程。图数据库还能缩短发现时间,理解种子数据中的数据模式以构建机器学习模型。

您可以使用图数据库提供的图形化浏览器界面以查看不同实体之间的关联关系。在大多数图浏览器中,Show Neighborhood选项可帮助大家遍历客户所采用的不同路径。请完成以下步骤:

  1. 首先使用gremlin查询以获取客户顶点。
  2. 在当前客户顶点中选择Show neighborhood 以查看相关电话号码、地址、Web会话以及报价。
  3. 要查看某项会话的全部访问,请在该会话中选择Show neighborhood。
  4. 要查看与某一客户相关的全部呼叫,请在电话号码中选择Show neighborhood。
  5. 要查看与特定报价相关的全部保险服务政策,请在该报价中选择Show neighborhood。
  6. 要查看全部相关页面浏览记录,请在访问中选择Show neighborhood。

您可以使用关联客户旅程视图以增强用户界面(移动或Web端),借此针对人口统计覆盖面最广的选项设置默认的承保范围方案(一键式保险)。在遍历图中的邮政编码或城市名称时,您可以标识客户及相关的报价与保险政策,借此了解客户最乐于接受的免赔额度与承保范围模式。在对客户的中途放弃问题进行分析时,大家可以单纯选择报价(不带任何具体政策)以导航至Web分析,了解客户放弃是否源自过度复杂的用户界面、表单或事件。要减少呼叫中心的呼叫量,我们可以遍历图以捕捉触发客户呼叫的各类事件性趋势。

将数据湖与Neptune集成起来

在确定关联数据的价值之后,我们可以将关联Neptune数据库引入日常数据管理以实现业务增强。在完成数据策划并将其发布至数据湖后,我们即可收集关联数据以添加新的顶点、边与属性。大家可以触发AWS Glue作业以捕捉新客户、将新应用会话与客户关联起来,并将增量Web分析数据与会话及客户信息相集成。下面,我们将通过AWS Glue作业示例解释如何维护关联体系中的各顶点与边。

在发布新的策划数据事实之后,触发AWS Lambda函数以启动AWS Glue作业并捕捉新近添加或修改的实体,保证其相关数据的正常发布,而后据此添加或修改顶点、边或属性信息。例如,当新的会话数据抵达数据湖内的指定区域时,系统会启动一项作业、将与客户会话相关的各实体添加至Neptune数据库中。

代码示例

以下代码示例,用于通过Python及Gremlin创建一套客户360度知识库:

#!/usr/bin/python
from __future__  import print_function  # Python 2/3 compatibility
from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.traversal import *
from gremlin_python.process.strategies import *
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
import json
graph = Graph()
### create database connection
remoteConn = DriverRemoteConnection('wss://cjsandbox.xxxxx.us-east-1.neptune.amazonaws.com:8182/gremlin','g')
g = graph.traversal().withRemote(DriverRemoteConnection('wss://cjsandbox.xxxxxx..us-east-1---neptune.amazonaws.com:8182/gremlin','g'))
### run Gremlin queries and store detail in json variables
## get customer data
vcust = g.V().hasLabel('customer').has('email','xxxxxxx@aol.com').project('email','dob').by('email').by('dob').toList()
## get session data
vcount = g.V().hasLabel('customer').has('email','xxxxxxx@aol.com').out().hasLabel('session').count().toList()
vqcount = g.V().hasLabel('customer').has('email','xxxxxxx@aol.com').out().hasLabel('quote').count().toList()
## get address count and  data
vaddcount = g.V().hasLabel('customer').has('email','xxxxxxxx@aol.com').out().hasLabel('address').count().toList()
vaddress = g.V().hasLabel('customer').has('email','xxxxxx@aol.com').out().hasLabel('address').project('address','city','zip','state').by('address').by('city').by('zip').by('state').toList()
v_fullcust=format(g.V().hasLabel('customer').has('email','xxxxx@aol.com').valueMap().fold().next())
vsess= g.V().hasLabel('customer').has('email','XXXX@aol.com').out().hasLabel('session').order().by('start_date').project('start_date','session_id','partner_id').by('start_date').by('session_id').by('partner_id').toList()
vcust_json = json.dumps(vcust[0])
vcount_json = json.dumps(vcount[0])
vaddcount_json = json.dumps(vaddcount[0])
vsesscount = json.loads(vcount_json)
vaddrcount = json.loads(vaddcount_json)
### print customer information
print("====== Customer Information=========================")
print("====== Customer Information=========================")
print(" Email:  %s " % json.loads(vcust_json)['email'])
print(" Date of birth: %s " % json.loads(vcust_json)['dob'])
print(" Number of sessions:   %s sessions  " % vsesscount)
print(" Number of properties:   %s   " % vaddcount)
print(" Property Address details: :    " )
for i in range(vaddrcount,0,-1):
        address_json= json.dumps(vaddress[i-1])
        print(address_json)
print(" Customer misc details :  ")
print(v_fullcust)
print("-----------------------------------------------------\n\n")
print("====== Custoner session Information =====sorted by date creation descending ========== " )
### print session by looping
for i in range(vsesscount,0,-1):
        vsess_json= json.dumps(vsess[i-1])
        print("Session : ")
        vsessid=json.loads(vsess_json)['session_id']
        print(vsess_json)
        vsessinfo=g.V().has('session','session_id',vsessid).valueMap().select('date_created','policy_effective_date','marketing_id','form_number').fold().next()
        print(vsessinfo)
        print("--------------------------\n ")
remoteConn.close()

以下代码示例中的AWS Glue程序,用于创建Neptune CSV文件以容纳经过策划的增量数据:


import sys
import boto3
import logging
###awsglue libs
from awsglue.utils import getResolvedOptions
from awsglue.transforms import ApplyMapping
from awsglue.transforms import RenameField
from awsglue.transforms import SelectFields
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
##pyspark libs
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.functions import format_string
## add lib
sc = SparkContext()
#sc.addPyFile("s3://aws-glue-scripts-999999-us-east-1/glue_neptune/glue_neptune.zip")
### gremlin/glue_neptune needs zip library
from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.traversal import *
from glue_neptune.NeptuneConnectionInfo import NeptuneConnectionInfo
from glue_neptune.NeptuneGremlinClient import NeptuneGremlinClient
from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms
####
###Logging Context
MSG_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(format=MSG_FORMAT, datefmt=DATETIME_FORMAT)
logger = logging.getLogger("cjsandboxglue")
logger.setLevel(logging.INFO)
logger.info("Hello from glue message")
####
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_PATH','DATABASE_NAME','TABLE_NAME','PIPELINE_ID','PIPELINE_JOB_ID"])
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
nodes_path = '{}nodes'.format(args['S3_OUTPUT_PATH'])
edges_path = '{}edges'.format(args['S3_OUTPUT_PATH'])
jobname= args['JOB_NAME']
database = args['DATABASE_NAME']
session_table = args['TABLE_NAME']
pipelinename = args['PIPELINE_ID']
jpbid = args['PIPLEINE_JOB_ID']
def writeCsvFile(datasource, path):
    dataframe = DynamicFrame.toDF(datasource).repartition(1)
    datasource = DynamicFrame.fromDF(dataframe, glueContext, 'write-csv')
    glueContext.write_dynamic_frame.from_options(frame = datasource, connection_type = "s3", connection_options = {"path": path}, format = "csv", transformation_ctx = "write-csv")
# Arguments
print ("job name =", jobname)
print ("s3 path =",nodes_path)
print ("table prefix= ",session_table)
print ("catalog database= ",database)
print ("pipeline= ",pipelinename)
print ("job id = ",jobid)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = session_table, transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("session_id", "bigint", "session_id:int", "int"), ("marketing_id", "string", "marketing_id:String", "string"), ("quote_id", "bigint", "quote_id:int", "int"), ("date_created", "string", "date_created:String", "string")], transformation_ctx = "applymapping1")
applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'session_id:int', 'SES'),('~to', 'quote_id:int', 'QTE')])
selectfields1 = SelectFields.apply(frame = applymapping1, paths = ["~id", "session_id:int", "marketing_id:String", "quote_id:int", "date_created:String"], transformation_ctx = "selectfields1")
writeCsvFile(GremlinCsvTransforms.addLabel(selectfields1, 'session'), nodes_path)
# Edges
applymapping1 = RenameField.apply(applymapping1, "~id", "~from")
applymapping1 = GremlinCsvTransforms.create_edge_id_column(applymapping1, '~from', '~to')
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["~id", "~from", "~to"], transformation_ctx = "selectfields2")
writeCsvFile(GremlinCsvTransforms.addLabel(selectfields2, 'has_quote'), edges_path)
# End
job.commit()

总结

大家可以使用Neptune为客户360度解决方案建立企业知识库,并将这套解决方案与不同数据源关联起来,据此建立各类客户状态看板、增强分析方案、在线推荐引擎乃至组群趋势分析等。

Neptune数据库支持的这类客户360度解决方案具有以下突出优势:

  • 以近实时方式跟踪客户旅程中发生的所有活动。您可以遍历图关系以提供更好的客户支持体验。
  • 能够建立推荐引擎,根据相似人群的总体偏好(基于人口统计、地理位置及偏好等指标)提供反馈,在客户旅程中及时为客户提供针对性帮助。
  • 基于地理位置与人口统计进行细分分析,了解当前市场中存在的挑战与新的商业机遇。
  • 商业案例分析,通过遍历neighborhood缩小业务影响因素的具体范围。
  • 测试并验证新应用的部署效果,进而改善客户旅程中的体验。

大家也可以使用这套解决方案建立其他类似的知识库,例如车辆知识库或账户知识库等。以车辆360度知识库为例,我们能够借此贯穿售前、经销商交互、定期维护、保修、召回以及二手车销售等整个客户旅程,据此计算出汽车产品的核心吸引力与商业价值。

本篇作者

Ram Bhandarkar

AWS专业服务部门高级数据架构师。Ram专门负责设计及部署大规模AWS数据平台(数据湖、NoSQL、Amazon Redshift等),还协助众多客户将大型遗留Oracle数据库与数据仓库平台迁移至AWS云。