亚马逊AWS官方博客

借助AWS Secrets Manager管理特权凭证

背景介绍

对于一个企业而言,特权账号广泛存在于IT环境中,其包括了分布在 IT 环境中的各种账户,例如 UNIX 的Root账户、Windows 的 administrator账户和数据库管理账户等等。这些帐户通常用于设置 IT 基础架构、安装新硬件和软件、运行关键服务以及执行维护操作等。

特权账号的保护对一个企业而言至关重要。一旦特权账号发生泄漏,被不法分子所利用,它将成为企业安全中的致命弱点,造成不可弥补的损失。例如,如果特权账号保管不善,导致登录凭证泄露、丢失,被恶意攻击者或者别有用心者获取,然后攻击者就有机会登录非授权访问的业务系统,进而可能导致系统数据被删除、恶意增加管理员权限或者非法下载大量数据等。

当我们对特权账号进行管理的时候,主要可以针对以下几个方面进行管理,实现对于特权账号的保护和控制。

  • 集中化的凭证管理仓库。将特权帐户存储在安全存储库中,通过单一访问点强化多因素身份验证,从而完全控制特权帐户的使用。
  • 按规则的凭证轮换计划。通过自动化计划让 IT 团队免去耗时的手动任务(例如批量密码更新),从而提高 IT 生产力。
  • 强制使用强密码和密钥。对特权账户的密码凭证设置强密码规则,降低特权账号被破解的潜在风险。
  • 以不同访问权限、范围控制用户对特权账户的访问。缩小攻击面,有效应对不断增长的外部攻击、身份盗用和内部威胁的风险。
  • 凭证签出和签入的通知或警报。通过审批工作流程,以及特权帐户使用情况的实时警报,建立预防性和检测性安全控制机制。

方案介绍

本文中将会介绍如何借助于AWS Secrets Manager来对特权账号进行管理,所管理的特权账号涵盖了两种类型,第一种是Windows实例的本地管理员账号,第二种是Windows实例的域管理员账号。

下文我们会讨论如何利用AWS Secrets Manager等一系列服务,实现下面几个对特权账号的管理功能:

  • 使用AWS Secrets Manager存储特权账号
  • 用户通过AWS Secrets Manager获取特权账号
  • 使用CloudTrail对特权账号的访问进行审计
  • 自动轮转特权账号的密码

使用AWS Secrets Manager存储特权账号

您可以参考以下文档,在Secrets Manager 中创建密钥,用于存储特权账号的密码。在创建密钥时,请选择Other type of secret(其他密钥类型)作为Secret type(密钥类型),并在Key/value pairs(键值对)中,输入password作为Key,以及账号的密码作为value。

https://docs.aws.amazon.com/zh_cn/secretsmanager/latest/userguide/create_secret.html

在这里,为了方便后面对特权账号密码的轮换,请参考下面的规则来设定密钥的名称。对于AD用户,密钥的名称格式为{Directory ID}/{AD User Name},例如d-836739e1d4/admin。对于Windows本地用户,密钥的名称格式为{Instance ID}/{Local User Name},例如i-07b4f7936dd54add0/Administrator。

用户通过AWS Secrets Manager获取特权账号

当您在Secrets Manager中创建了用于存储特权账号的密钥之后,您就可以在Secrets Manager服务中检索到该特权账号对应的密钥,并打开该密钥,并通过Retrieve secret value(获取密钥值)的方法来获取特权账号的密码。

此外,为了保护存储在 Secrets Manager服务中的特权账号,您可以结合AWS Identity and Access Management (IAM) 来对密钥进行访问权限的控制,从而实现只有授权的用户才能对Secrets Manager中的特权账号进行访问或者其他操作。具体操作请参考下面的文档。

https://docs.aws.amazon.com/zh_cn/secretsmanager/latest/userguide/auth-and-access.html

使用CloudTrail对特权账号的访问进行审计

AWS CloudTrail 是一项 AWS 服务,可帮助对您的 AWS 账户进行监管、合规性检查、操作审核和风险审核。用户、角色或 AWS 服务执行的操作将记录为 CloudTrail 中的事件。为了对于存储在Secrets Manager中的特权账号进行审计,您可以创建CloudTrail日志,从而记录所有对特权账号的操作行为。

您可以参考以下文档来创建CloudTrail日志。在这里,为了可以实现下面“自动轮转转特权账号的密码”的功能,请打开将日志发送到CloudWatch Logs的选项。在您创建了CloudTrail日志并将日志发送到CloudWatch之后,您就可以在CloudWatch中查询到对特权账号的访问行为了。

https://docs.aws.amazon.com/zh_cn/secretsmanager/latest/userguide/retrieve-ct-entries.html

自动轮转特权账号的密码

为了更好地对特权账号进行保护,需要对Secrets Manager中管理特权账号的密码进行自动的轮转。在本文中,我们会实现一种场景,当任何特权账号的密码被用户获取后,该特权账号的密码会在8个小时之后被自动轮转,被轮转的用户密码会被再次更新到Secrets Manager的密钥中。

下面是该方案的架构图。该方案中主要使用到了Secrets Manager、CloudTrail、CloudWatch、Lambda和Step Function这些服务。方案中的主要流程如下,当任何授权IAM用户通过Secrets Manager来获取特权账号的密码时,CloudTrail会记录该用户的访问行为,并将该记录发送到指定的CloudWatch日志组中,通过对CloudWatch日志组对GetSecretValue事件的订阅,从而触发Lambda函数,并在Lambda函数中调用一个Step Function,完成在8个小时后对特权账号的密码的修改。

首先,在本方案中,会通过AWS Systems Manager的SendCommand方法,在Windows EC2实例上运行命令行,从而完成对本地管理员密码的修改。为了使得Systems Manager服务可以管理您的EC2实例,请确保您已经完成了以下步骤的配置工作。

https://docs.aws.amazon.com/zh_cn/systems-manager/latest/userguide/systems-manager-setting-up-ec2.html

其次,本方案会通过Boto3 SDK来修改AWS Directory Service中域管理员账号的密码,具体使用到的函数说明如下。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ds.html#DirectoryService.Client.reset_user_password

为了实现该方案,请参考以下步骤:

1.  创建Lambda函数RotateUserPassword-ChangePassword创建一个Lambda函数,名称为RotateUserPassword-ChangePassword。该函数用于修改Windows本地账号或者AD账号的密码,并将新的密码更新到Secrets Manager的密钥中。函数代码如下所示:

import json
import gzip
import json
import base64
import boto3
import uuid
import time
import random
import string
import array

def lambda_handler(event, context):
    secret_id = event["secretid"]
    user_type = event["usertype"]

    #get the information of the secret
    sm_client = boto3.client('secretsmanager')
    sm_response = sm_client.get_secret_value(
        SecretId=secret_id
    )
    secret_name = sm_response["Name"]
    str_length = len(secret_name)   
    slash_index = secret_name.find("/")
    platform_id = secret_name[0:slash_index]
    user_name = secret_name[slash_index+1:str_length]
    
    #generate a random password
    new_passwd = get_random_password()
    
    #change AD user's password
    if(user_type=="AD"):
        change_ad_user_password(platform_id, user_name, new_passwd)
        
    #change local user's password
    if(user_type=="Local"):
        change_local_user_password(platform_id, user_name, new_passwd)
        
    #change password's value in Secret Manager
    change_secret_value(secret_id, new_passwd)
    
    
def change_ad_user_password(ad_id, user_name, new_passwd):
    ad_client = boto3.client('ds')
    ad_response = ad_client.reset_user_password(
        DirectoryId=ad_id,
        UserName=user_name,
        NewPassword=new_passwd
    )
    
def change_local_user_password(instance_id, user_name, new_passwd):
    #change local user password by SSM's SendCommand
    command = "& ""${env:SYSTEMROOT}\\system32\\net.exe"" user "+ user_name +" " + new_passwd + " /ACTIVE:YES /LOGONPASSWORDCHG:NO /EXPIRES:NEVER /PASSWORDREQ:YES"
    ssm_client = boto3.client('ssm')
    response = ssm_client.send_command(
        InstanceIds=[
            instance_id,
        ],
        DocumentName='AWS-RunPowerShellScript',
        Parameters={
            'commands': [
                command,
            ]
        }
    )
    
def change_secret_value(secret_id, new_passwd):
    secret_string = '{"password":"'+new_passwd+'"}'
    sm_client = boto3.client('secretsmanager')
    sm_response = sm_client.put_secret_value(
        SecretId=secret_id,
        SecretString=secret_string
    )

def get_random_password():
    # maximum length of password needed
    # this can be changed to suit your password length
    MAX_LEN = 16
 
    # declare arrays of the character that we need in out password
    # Represented as chars to enable easy string concatenation
    DIGITS = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] 
    LOCASE_CHARACTERS = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h',
                         'i', 'j', 'k', 'm', 'n', 'o', 'p', 'q',
                         'r', 's', 't', 'u', 'v', 'w', 'x', 'y',
                         'z']
 
    UPCASE_CHARACTERS = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
                         'I', 'J', 'K', 'M', 'N', 'O', 'P', 'Q',
                         'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y',
                         'Z']
     
    SYMBOLS = ['@', '#', '$', '%', '=', ':', '?', '.', '/', '|', '~', '>',
               '*', '(', ')', '<']
 
    # combines all the character arrays above to form one array
    COMBINED_LIST = DIGITS + UPCASE_CHARACTERS + LOCASE_CHARACTERS + SYMBOLS
     
    # randomly select at least one character from each character set above
    rand_digit = random.choice(DIGITS)
    rand_upper = random.choice(UPCASE_CHARACTERS)
    rand_lower = random.choice(LOCASE_CHARACTERS)
    rand_symbol = random.choice(SYMBOLS)
 
    # combine the character randomly selected above
    # at this stage, the password contains only 4 characters but
    # we want a 12-character password
    temp_pass = rand_digit + rand_upper + rand_lower + rand_symbol
     
     
    # now that we are sure we have at least one character from each
    # set of characters, we fill the rest of
    # the password length by selecting randomly from the combined
    # list of character above.
    for x in range(MAX_LEN - 4):
        temp_pass = temp_pass + random.choice(COMBINED_LIST)
     
        # convert temporary password into array and shuffle to
        # prevent it from having a consistent pattern
        # where the beginning of the password is predictable
        temp_pass_list = array.array('u', temp_pass)
        random.shuffle(temp_pass_list)
 
    # traverse the temporary password array and append the chars
    # to form the password
    password = ""
    for x in temp_pass_list:
            password = password + x
            
    return password

由于Lambda函数RotateUserPassword-ChangePassword中需要对secretsmanager、System Manager和Directory Service服务进行相应的操作,故请将以下Policy添加到该Lambda函数的Execution role中。

{
            "Effect": "Allow",
            "Action": [
                "secretsmanager:PutSecretValue",
                "secretsmanager:GetSecretValue",
                "ssm:SendCommand",
                "ds:ResetUserPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": [
                "*"
            ]
 }

2. 创建Step Function RotateUserPasswordStepFunction。创建一个Step Function,名称为 RotateUserPasswordStepFunction。在该Step Function添加两个任务,第一个任务使用Wait State,配置该任务等待8个小时;第二个任务使用Amazon Lambda: Invoke a function,调用上面创建的Lambda函数RotateUserPassword-ChangePassword。
下面是该Step Function的定义,供参考。请注意将下面代码中的{AWS Account ID}替换为您的AWS Account ID。

{
  "Comment": "A description of my state machine",
  "StartAt": "Wait 8 hours",
  "States": {
    "Wait 8 hours": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "Lambda - Change Password",
      "OutputPath": "$"
    },
    "Lambda - Change Password": {
      "Type": "Task",
      "Resource": "arn:aws-cn:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws-cn:lambda:cn-northwest-1:{AWS Account ID}:function:RotateUserPassword-ChangePassword:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}

3. 创建Lambda函数RotateUserPassword-RunStepFunction。创建一个Lambda函数,名称为RotateUserPassword-RunStepFunction。该函数用于调用上面创建的Step Function,触发特权账号的轮转。请注意将下面代码中的{AWS Account ID}替换为您的AWS Account ID。

import json
import gzip
import json
import base64
import boto3
import uuid
import time

def lambda_handler(event, context):
    cw_data = event['awslogs']['data']
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    payload = json.loads(uncompressed_payload)
    #print('how many events:', len(payload['logEvents']))
    log_events = payload['logEvents']

    #iterate all log events    
    for event in log_events:
        output = event['message']
        data = json.loads(output)
        secret_id = data['requestParameters']['secretId']
        user_type = ""
        #Run step function to change user password
        if ":secret:d-" in secret_id:
            user_type = "AD"
        if ":secret:i-" in secret_id:
            user_type = "Local"
        if user_type == "AD" or user_type == "Local":
            client = boto3.client('stepfunctions')
            sf_input = "{\"secretid\" : \""+ secret_id +"\", \"usertype\" : \""+ user_type + "\"}"
            response = client.start_execution(
                stateMachineArn='arn:aws-cn:states:cn-northwest-1: {AWS Account ID}:stateMachine:RotateUserPasswordStepFunction',
                input = sf_input
            )

由于该Lambda函数中需要对Step Function服务进行相应的操作,请将以下Policy添加到该Lambda函数的Execution role中。

{
            "Effect": "Allow",
            "Action": [
                "states:StartExecution"
            ],
            "Resource": [
                "*"
            ]
 }

4. 配置CloudWatch日志组的Subscription Filter(订阅筛选条件)在CloudWatch日志组中创建Subscription Filter(订阅筛选条件),从而实现当有GetSecretValue事件发生时触发Lambda函数RotateUserPassword-RunStepFunction。关于如何创建Subscription Filter(订阅筛选条件),请参考以下文档。https://docs.aws.amazon.com/zh_cn/AmazonCloudWatch/latest/logs/SubscriptionFilters.html

在创建Subscription Filter(订阅筛选条件)的时候,请将filter-pattern配置为{ ($.eventName = "GetSecretValue") && ($.userIdentity.type = "IAMUser") },并将触发的目标设为Lambda函数RotateUserPassword-RunStepFunction

结论

本文提供了一个借助于AWS Secrets Manager等服务对特权账号进行管理的云原生方案,实现了对于AWS云上Windows本地账号和AD账号的控制和保护。一方面,帮助客户将特权账号安全的存储起来,并通过IAM权限进行访问控制,防止特权账号的非授权泄漏。另一方面,也利用对Secrets Manager中密钥访问事件的监控,自动触发对特权账号的密码轮转,从而进一步地提高了特权账号的安全性。

本篇作者

姚亮

AWS高级解决方案架构师,负责跨国企业客户的解决方案咨询,应用架构设计和优化,同时致力于AWS数据分析类服务的应用和推广。