亚马逊AWS官方博客

滴水不漏 — AWS:SQS 助力FREEWHEEL BI报表分布式发布

一. Freewheel BI系统介绍

FreeWheel 是Comcast旗下一家集成了视频广告交易市场、广告库存预测及决策支持、受众定向及跨屏支持等一系列技术产品的视频广告投放解决方案公司。

作为Freewheel的客户,可以使用我们在Analytics应用服务中所提供的数据模型来获取自己的广告投放,展现,计费信息报表用于广告效果的评估,并可订阅按周期发布这些报表来追踪历史广告效果或与自己的商业伙伴分享。

二. 业务上云后的技术债务与解决方案

作为第一批 Freewheel 业务上云的服务产品,Analytics 团队稳定高效地将上一代 Freewheel BI 产品 MRMA 整体迁入到 AWS 基础设施中去并持续为客户提供准确业务数据,并且接入Freewheel在 AWS:EC2 及其AutoScaling 服务上所部属的:

  •  Looker BI 数据模型开发服务,来增强Analytics模型的灵活度
  •  Presto 和 Clickhouse 等 OLAP 查询引擎,来提升数据查询效率

在新的 Analytics 服务中,运行于 AWS:EC2 之上的 WEB / API 服务,通过 AWS:RDS 持久化保存业务状态信息,并将输出报表固化在 AWS:S3 上,最后再通过定时脚本同步至 Freewheel FTP 服务器以供授权的客户下载。

在第一个版本的 AWS 迁移完成后,为了兼容  Freewheel 与客户之间的FTP接口,Analytics 延续了往自有机房内 FTP 定期 Rsync 的方式来传递数据的方式。然而一些私有云与自有机房之间架构、网络延迟的差异所带来的问题一直为我们的日常运营带来困扰:

  • 对待发布文件进行批处理,未处理完成的单个大文件会阻塞服务进程获取下一处理批次(客户定制文件大小不一,从几KB到几GB不等)。
  • 在源地址的 S3 存储与最终目标 FTP 地址之间,需要在EC2所挂载的EBS存储设备上产生临时存储用于文件传输。
  • 由于使用定时 rsync 的脚本批量同步目录, 无法精确获取指定文件的发布状况。

目标

为了偿还快速上云后文件发布不稳定的技术债务,Analytics团队计划借助AWS架构的丰富服务,继续产品服务的高可用性高扩展性演进:

  • 文件发布功能模块的解耦合;
  • 文件发布功能模块的可伸缩性改造;
  • 弃用批处理和文件夹同步等工作方式来,保证单个文件发布状态的准确性;
  • 作为长期方案为客户提供以 S3 Bucket 为目标地址的文件发布的支持;

解决方案

在对AWS一些可用于分布式应用搭建的服务进行调研后,我们根据 Freewheel 数据产品团队的AWS技术使用现状:

  • AWS:EKS 容器化集群服务用于团队微服务应用的部署
  • AWS:Elasticache 提供的Redis服务用于团队Airflow等公共服务的数据存储

在此基础上 Analytics 团队引入了 AWS:SQS 来作为应用服务与文件发布模块之间的消息传递接口,从而使新的文件发布功能模块进化成一个高可用高扩展的微服务子系统,它应具有如下特性:

    • 部署在 AWS:EKS 集群之上;
    • 借助 AWS:Elasticache 的redis数据库进行服务发现;
    • 使用 AWS: SQS 进行任务消息传递;
    • 支持以S3 Bucket为目的地址的文件发布。

三. AWS:SQS 在项目中的应用示例

在文件发布子系统模块的 Daemon 在 EKS 上启动时,会去 AWS:Elasticache 上使用自己的应用名称查找模块的全局配置用来:

  1. 实例的worker manager根据全局最大需求 worker 数量和本实例最大 worker 数量来灵活启动 worker;
  2. 运行实例获取消息的AWS:SQS地址,用于task manager:
      1. 从 task queue 获取 task 信息并分配给各个 worker;
      2. 将 worker 处理 task 的结果推送到 brief queue;

当 Daemon 完成上述配置后,Deamon内的 worker manager会启动若干 worker routine,每一个 worker 为一个单独的go routine,根据从SQS中获取的消息来进行文件处理:

  • 从查询结果文件源地址读取 parquet/ orc 文件内容;
  • 发布文件的格式  csv / excel 来格式化文件;
  • 根据待发布文件的地址 ( FTP / S3 Bucket )来创建连接写出结果文件。

同时 Daemon 还会会暴露一个 HTTP POST API 用来直接接收重试的 task。文件发布子系统为Analytics应用服务提供一个可在编译时引用的Adaptor组件:

  1. 将应用服务中需要发布的文件序列化成Task消息放入 Task Queue;
  2. 定时从 Brief Queue 中获取Task的trace信息;
  3. 定时从绑定于 Task Queue的 死信队列中备份处理失败的 task 用于故障恢复后的人工重试。

控制信息与业务信息分离实现快速的开发和部署

在消息队列传输的报文中,我们使用了:

  • 使用protobuf来对任务消息进行序列化和反序列化并进行Base64Encoding来传输
syntax = "proto3";
import "google/protobuf/any.proto";

message TaskEnvelop {
	string	App = 1;
	string	TraceID =2;
	string	TaskType =3;
}

message TaskMessage {
	TaskEnvelop Envelop = 1;
	google.protobuf.Any	Body = 2; // the task body could decode by the worker with the TaskEnvelop.TaskType.
}

message TaskBrief { // TaskBrief to send order process brief to data application
	TaskEnvelop Envelop = 1;
	enum State {
		UNKOWN = 0;
		SUCCEED = 1;
		FAILED = 2;
		TIMEOUT = 3;
	}
	State	Result = 2; // the process result, auto set
    ...
  • TaskMessage::TaskEnvlop 将与具体业务无关的Trace信息封装,在微服务的 task manager 读取

这种流程控制与业务信息分离的设计使得我们在实现的过程中获得了很多好处:

  • Daemon 与 Adaptor 的框架部分代码完全复用;
  • S3和FTP的发布功能仅仅需要实现同功能的worker及定义对应的 TaskMessage::Body 来实现;
  • 同时所有类型的 worker 在返回结果都遵循统一接口 TaskBrief 消除状态的歧义;
package demo;

message DemoTask {
	string foo = 1;
	string bar = 2;
}
// worker interface 
...
// RWorker defines the interface to process the tasks
type RWorker interface {
    // BoundTask implements to return the bounding task name to verify the order message
    BoundTask() string

    /*Process implements to process the order with custom result string and predefined status
     * OStatus Could be UNKNOWN, SUCCEED & FAILED
     * OStatus DUPLICATED & TIMEOUT have handled by framework already. */
    Process(context []byte, logger *logrus.Entry) (string, OStatus, error)

    // Terminate implements to terminate the current processing when server going to shutdown   Terminate(logger *logrus.Entry) error
    Terminate(logger *logrus.Entry) (err error)
}

Demo worker 的实现示例

这样控制和业务分离的框架与策略:

  1. 通过简单的接口实现就能定义新的任务,并使得业务功能测试完全不依赖于具体 AWS 框架,在单元测试阶段既可保证业务逻辑。
  2. 将 worker 加入到 daemon runtime 框架编译后,使用 helmchart 部署启动成为EKS上的实例,并自动注册进 AWS:Elasticache 数据库中 task → worker 服务列表;
  3. 自动绑定到应用名称和 Task 类型的自动化部署代码,使得 AWS:SQS 的管理接近0成本;

利用死信队列建立重试和失败回收机制

在创建 AWS:SQS 的时候通过指定 redrivePolicy 来将所创建队列中被读取超过 maxReceiveCount 次数的 message 转移至 ARN 为 deadLetterTargetArn 的队列中去。通过这一设置提高了SQS 队列的可用性:使得不稳定运行的task既有了maxReceiveCount 次数的重试机会,又能在多次重试均无法恢复的情况下移出队列避免时间和运算资源的浪费。

          ...
          dqAttr, err = svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
              QueueUrl: aws.String(*DeadLetterQueueURL),
              AttributeNames: []*string{
                  aws.String("All"),
              },
          })
          if err == nil {
              redrivePolicy = aws.String(fmt.Sprintf("{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":%d}", *dqAttr.Attributes["QueueArn"], taskRetry))
          }
          ...
          input = &sqs.CreateQueueInput{
          QueueName: aws.String(name),
          Attributes: map[string]*string{
              "RedrivePolicy":             *dqAttr.Attributes["QueueArn"],
              ...
          },
      }
     ...

队列创建代码示例:

利用 VisibilityTimeout 来实现重试逻辑

在使用死信建立重试和失败回收机制的基础上,我们需要对队列消息的消费生命周期来进行管理。在这一步我们使用了AWS:SQS的message的一些操作并加上一些自定义来解决消息重复消费的问题:

    1. 每个微服务实例中的 task manager 在获取调用 AWS:SQS 的 recieveMessage API获取消息时,会根据 task 的单任务运行最大时间 taskTimeout 来设置这个 message 在队列中的 VisibilityTimeout
    2. 在 task 被 worker 成功处理时,task manager 会使用对应消息的 ReceiptHandle 来在SQS队列中删除对应的消息
    3. 如果 task 处理失败或者超时,task manager不对SQS队列进行任何操作,VisibilityTimeout 超时后 message 继续在队列内可以进行下一次重试
    4. 当 message 被尝试次数超过 maxReceiveCount 后被转移到死信队列,表示着task处理重试失败需要人工介入
     ...
     // Create the task query with default visibility configure
     // specifies taskDefaultTimeout as the dafault max duration for signle task
     // specifies taskVisiableGap 10 seconds as the visiableGap in the queue for the proceed task
     input = &sqs.CreateQueueInput{
          QueueName: aws.String(name),
          Attributes: map[string]*string{
              "VisibilityTimeout":             aws.String(strconv.FormatInt(taskDefaultTimeout + taskVisiableGap, 10)),
               ...
          },
      }
     ...
	 output, err = svc.CreateQueue(input)
     ...
     ...
      // Specified an overwritedTaskTimeout with an overwrited configure
      // specifies taskVisiableGap 10 seconds as the visiableGap in the queue for the proceed task
      rMsgInput = &sqs.ReceiveMessageInput{
          QueueUrl:            aws.String(qURL),
          MaxNumberOfMessages: aws.Int64(maxMsg),
          VisibilityTimeout:   aws.Int64(overwritedTaskTimeout + taskVisiableGap),
          WaitTimeSeconds:     aws.Int64(rcvWaitSecond),
      }

设置visibility timeout的golang代码示例:

四. 结语

在AWS的SQS, EKS, ElasticCache等服务的支持下,Freewheel BI 服务目前每日为位于6个时区的数百个客户公司提供日均1000+的格式订阅报表。保证每个时区在前一天的数据结算和当天办公时间开始之间的数小时时间窗口内,完成当前时区的当日所有报表发布任务。新的文件发布服务相比以前的批处理脚本代码有着如下优势:

  • 高可用性:依托 EKS 无状态的任务处理worker可以随时启动停止,而 SQS 的特性又保证了没有处理完成的任务不会丢失;
  • 高扩展性:EKS, ElasticCache的支持使得Worker Daemon实例在EKS集群中能够随时启停,并且支持在Redis上修改worker的最大数量来增加或减少worker routine的个数,使得我们可以根据任务的堆积数量来随时调整系统处理能力;
  • 鲁棒性:基于 SQS 的特性实现的重试与故障排除机制与任务中的幂等性实现,保证了由于网络带宽或者其他偶发性故障有机会自动重试和归档,从而使整个系统的运作有了更强的容错能力。

本篇作者

陈寅

陈寅,Freewheel业务数据产品Lead Sofetware Engineer;2011年毕业于中国科学技术大学,曾就职于百度凤巢检索端团队;全栈开发者,长期专注于高可用服务优化、持续集成及数据产品设计;秉持“以简洁高效的代码实现优雅直观的设计”这一原则来促进产品不断演进。