亚马逊AWS官方博客
Amazon EMR 对 Spark 集群提升弹性并增强恢复能力的实现
为了避免其中的某些问题并帮助客户在使用 Spark 时充分利用 Amazon EMR 的弹性功能,Amazon EMR 对开源 Spark 进行了定制,增强了节点丢失的恢复能力。将重新计算的工作量将至最低,并且加快了作业从节点故障和 EC2 实例终止恢复的速度。Amazon EMR 发行版 5.9.0 及更高版本提供这些改进功能。
本博文概述了开源 Spark 处理节点丢失时存在的问题,以及为了解决这些问题对 Amazon EMR 做出的改进。
Spark 如何处理节点丢失
在 Spark 作业运行期间,如果节点关闭,会带来以下风险:
- 该节点上正在运行的任务可能无法完成,而必须转移到其他节点上运行。
- 节点上的Cached RDDs(弹性分布式数据集resilient distributed dataset)可能会丢失。虽然这确实会影响性能,但不会导致故障或影响应用程序的稳定性。
- 内存中的Shuffle output文件或节点上写入磁盘的文件可能会丢失。由于 Amazon EMR 默认启用External Shuffle Service务,因此shuffle output将写入磁盘。丢失shuffle output可能会导致应用程序停止运行,直到在另一个活跃节点上重新计算才会继续运行,因为后续任务可能依赖于它们。有关shuffle操作的更多信息,请参阅Shuffle operations。
要想从节点丢失恢复,Spark 应执行以下操作:
- 如果丢失了正在运行的任务,则必须将这些任务安排到其他节点上。另外,还必须恢复对未安排的剩余任务进行计算。
- 必须重新执行产生这些shuffle output的任务,从而重新计算在丢失的节点上计算的shuffle output。
以下是节点丢失后 Spark 恢复事件的顺序:
- Spark 将节点上正在运行的任务视为失败,然后在另一个活跃节点上重新运行这些任务。
- 如果该节点具有后续任务所需的shuffle output文件,则其他活跃节点上的目标执行程序在尝试从故障节点获取缺少的shuffle blocks时,会收到 FetchFailedException。
- 发生 FetchFailedException 时,目标执行程序会在一段时间内重新尝试从故障节点获取数据块,具体时间由 spark.shuffle.io.maxRetries 和 spark.shuffle.io.retryWait 配置值决定。执行完所有重试尝试后,会将故障传播到driver。
- Driver收到 FetchFailedException 后,会将故障发生时正在运行的shuffle stage标记为失败,然后停止执行。它还会把无法从中获取shuffle数据块的节点或执行程序上的shuffle output标记为不可用/丢失,以便可以对其执行重新计算。这将触发上一个map stage重新尝试重新计算缺少的shuffle数据块。
- 计算出缺少的shuffle output后,将触发对失败的shuffle stage的重新尝试,以从停止的位置恢复作业。然后,它会运行失败或尚未安排的任务。
Spark 在处理节点丢失方面存在的问题
Spark 的恢复过程可帮助其恢复在任何云环境中可能发生的+执行程序和节点故障。但是,Spark 仅在节点已发生故障且在尝试获取shuffle数据块时收到了 FetchFailedException 之后,才会开始恢复进程。这会导致本部分所述的一些问题。
Amazon EMR 可以根据手动调整大小、EC2 触发的 Spot 实例终止或自动扩展事件,判断哪些节点即将关闭,因此可以尽早开始恢复。它可以立即向 Spark 发出有关这些节点的通知,以便 Spark 可以主动采取措施来妥善处理节点丢失问题并尽早开始恢复。但是,Spark 目前没有用来接收节点即将关闭(例如 YARN 停用)的通知的机制。所以,它不能立即采取相关措施来帮助更快地恢复。因此,Spark 恢复存在一些问题:
- 节点在map stage过程中关闭,如下图所示:
在这种情况下,不必安排shuffle stage,且应用程序必须等收到 FetchFailedException 后,才能重新计算丢失的shuffle。这非常耗时。因此,最好是在map stage立即重新计算所有丢失的shuffle输出,然后再继续执行shuffle stage。
- 节点在shuffle stage进行过程中关闭,如下图所示:
如果可以立即就节点丢失向 Spark 发出通知,而不是依靠 FetchFailedException 并重试获取,则可以节省恢复时间。
- 当获得第一个 FetchFailedException 时,Spark Driver将开始重新计算。它将丢失的节点上的shuffle文件视为缺少的文件。但是,如果多个节点同时关闭,则在上一个map stage的第一次重试中,Spark Driver仅重新计算从其收到 FetchFailedException 的第一个节点的shuffle输出。从首次收到获取失败消息到开始重试之间的短暂时间内,driver可能会从其他故障节点收到获取失败的消息。因此,它可以在同一次重试中为多个丢失的节点重新计算shuffle输出,但无法保证一定能做到。在大多数情况下,即使节点同时关闭,Spark 也需要对映射和shuffle stage进行多次重试,以重新计算所有丢失的shuffle输出。这很容易导致作业长时间阻塞。理想情况下,Spark 可以在一次重试中重新计算在大致在同一时间丢失的所有节点上的shuffle输出。
- 只要 Spark 可以访问即将关闭的节点,就可以继续在其上安排更多任务。这导致要计算更多的shuffle输出,最终导致可能需要重新计算。理想情况下,可以将这些任务重定向到运行状况良好的节点,以防止重新计算并缩短恢复时间。
- Spark 对中止作业之前某个stage允许的连续失败尝试设置了次数限制。次数可以通过 spark.stage.maxConsecutiveAttempts 进行配置。当节点发生故障并发生 FetchFailedException 时,Spark 会将正在运行的shuffle stage标记为失败,并在计算缺少的shuffle输出之后触发重试。在shuffle stage频繁扩展节点很容易导致stage故障,致使达到阈值并中止作业。理想情况下,当某个stage因有效理由(如手动缩减、自动扩展事件或 EC2 触发 Spot 实例终止)失败时,应该可以通知 Spark 不要将其计入该stage的spark.stage.maxConsecutiveAttempts。
Amazon EMR 如何解决这些问题
本部分介绍了 Amazon EMR 为解决上一部分指出的问题,而对 Spark 进行的三项主要改进。
与 YARN 的停用机制集成
Amazon EMR 上的 Spark 使用 YARN 作为集群资源的基础管理器。Amazon EMR 自身拥有适用于 YARN 的从容停用机制(Graceful Decommission),该机制无需在即将停用的节点上调度新容器即可从容关闭 YARN 节点管理器。对于正在运行的容器上的现有任务,Amazon EMR 通过在节点停用前等待其完成或超时来实现这一机制。目前,该停用机制已回馈给开源 Hadoop。
我们将 Spark 与 YARN 的停用机制集成在一起,以便当节点在 YARN 中进入即将停用或已停用状态时通知 Spark Driver。如下图所示:
此通知使Driver可以采取适当的措施并尽早开始恢复,因为所有节点在删除之前都要经历停用过程。
扩展 Spark 的黑名单机制
YARN 的停用机制非常适合 Hadoop MapReduce 作业,因为它不会在即将停用的节点上再启动容器。这有助于防止在该节点上安排更多 Hadoop MapReduce 任务。但是,它不太适合 Spark 作业,因为在 Spark 中,为每一个执行程序都分配了一个 YARN 容器,YARN 容器是一直存在并持续接收任务的。
阻止启动新容器只会阻止将更多执行程序分配给节点。处于活跃状态的执行程序/容器将继续安排新任务,直到该节点关闭。它们最终可能会失败,并且必须重新运行。同样,如果这些任务写入shuffle输出,它们也会丢失。这增加了重新计算工作量和恢复所需的时间。
为了解决这个问题,Amazon EMR 扩展了 Spark 的黑名单机制,以在 Spark Driver收到 YARN 停用信号时将其列入黑名单。如下图所示:
这样可以防止在列入黑名单的节点上安排新任务,转而将任务安排在运行状况良好的节点上。节点上运行的任务完成后即可安全地停用该节点,而不会产生任务失败或丢失的风险。由于即将关闭的节点没有产生新的shuffle输出,因此加快了恢复过程。这有助于减少需要重新计算的shuffle输出的数量。如果节点从即将停用状态恢复正常,并再次处于活跃状态,则 Amazon EMR 将从黑名单中删除该节点,以便可以为其安排新任务。
在 Amazon EMR 中,此“黑名单扩展”默认是启用的,而且 spark.blacklist.decommissioning.enabled 属性设置为 true。您可以使用 spark.blacklist.decommissioning.timeout 属性控制将节点列入黑名单的时间,默认设置为 1 小时,等于 yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs 的默认值。我们建议将 spark.blacklist.decommissioning.timeout 设置为等于或大于 yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs 的值,以确保 Amazon EMR 在整个停用期间将节点列入黑名单。
适用于已停用节点的操作
节点停用后,将不会再为其安排新任务,且活跃容器会变为空闲(或超时到期),节点进入停用状态。 当 Spark Driver接收到已停用的信号后,它可以额外采取以下措施来加快恢复过程,而不是等待获取失败:
- 已停用节点上的所有shuffle输出均未注册,因此将其标记为不可用。Amazon EMR 默认启用该功能,而且 spark.resourceManager.cleanupExpiredHost 设置为 true。这具有以下优势:
- 如果节点在map stage丢失并停用,Spark 会启动恢复过程并重新计算已停用节点上丢失的shuffle输出,然后再执行下一stage。这可以防止shuffle stage获取失败,因为 Spark 在map stage结束时已计算了所有shuffle数据块并且这些数据块处于可用状态,从而显著提高了恢复速度。
- 如果节点在shuffle stage丢失,则尝试从丢失的节点获取shuffle数据块的目标执行程序会立即发出shuffle输出不可用的通知。然后,它会将故障发送给Driver,而不是多次重新尝试获取并多次失败。随后,Driver立即停止执行stage,并开始重新计算丢失的shuffle输出。这减少了尝试从丢失的节点获取shuffle数据块所花费的时间。
- 当集群节点数量扩展到很大时,取消注册shuffle输出的优势最明显。因为所有节点都在大致相同的时间关闭,所以它们会在大致相同的时间停用,并且会取消注册它们的shuffle输出。当 Spark 安排第一次重新尝试计算缺少的数据块时,它将发出有关已停用节点缺少数据块的通知,并且仅尝试一次即可恢复。这可以显著加快开源 Spark 的恢复过程,在这一过程中可以多次重新安排stage,以重新计算所有节点上缺少的shuffle输出,并防止作业长时间卡在故障和重新计算状态。
- 默认情况下,当由于从停用的节点获取失败而导致stage失败时,Amazon EMR 不会将stage失败计入 spark.stage.maxConsecutiveAttempts 设置的stage允许的最大故障数。这一点,由 spark.stage.attempt.ignoreOnDecommissionFetchFailure 的设置决定,设置为True可实现上述功能。在stage由于一些有效原因(例如,手动调整大小、自动扩展事件或 EC2 触发的 Spot 实例终止)而多次失败时,这可以防止作业失败。
小结
本文介绍了 Spark 如何处理节点丢失以及在 Spark 作业运行期间扩展集群时可能发生的一些问题。还介绍了 Amazon EMR 针对 Spark 定制的功能,以及可使 Spark 在 Amazon EMR 上提高恢复能力的配置,可帮助您充分利用 Amazon EMR 提供的弹性功能。
如果您有任何问题或建议,请留言。