AWS 기술 블로그

AWS DMS를 이용한 CDC 데이터 실시간 수집 및 분석 데이터 파이프라인 구축하기

관계형 데이터베이스(RDBMS)는 데이터를 저장하고, 분석하고자 할 때 가장 많이 사용되는 솔루션 입니다. 하지만, RDBMS로 처리하기 어려울 만큼 많은 양의 데이터를 저장하고 관리해야 하는 상황이 자주 발생합니다. 또한, MSA (마이크로 서비스아키텍처)를 도입해서 비즈니스 도메인 단위로 RDBMS를 물리적으로 격리하는 경우에 데이터 분석을 위해서 여러 RDBMS의 데이터를 한 곳에 모아서 저장하고, 관리해야할 필요가 있습니다. 즉, 데이터 레이크를 구축해서 다양한 데이터를 저장하고, 분석할 수 있는 환경을 만들고, 여러 RDBMS의 데이터를 데이터 레이크(Data Lake)에 저장하기 위한 데이터 파이프라인을 구축해야하는 경우가 많습니다.

이 블로그에서 소개 드릴 솔루션은 RDBMS의 데이터를 Amazon S3를 이용한 데이터 레이크에 실시간으로 저장할 수 있는 데이터 파이프라인 입니다. AWS Database Migration Service(AWS DMS)는 데이터베이스 및 분석 워크로드를 AWS로 빠르고 안전하게 이동하여 가동 중단 시간 및 데이터 손실을 방지하는 데 도움이 되는 관리형 마이그레이션 및 복제 서비스입니다. AWS DMS는 20개 이상의 데이터베이스 및 분석 엔진 간의 마이그레이션을 지원합니다.

AWS DMS를 이용하면, RDMS에 Insert, Update, 또는 Delete되는 데이터(Chang Data Capture)를 실시간으로 Amazon S3에 저장할 수 있습니다. 여기에 Amazon Kinesis Data StreamsAmazon Data Firehose를 사용하면, Change Data Capture(CDC) 데이터를 Amazon S3에 저장하는 동시에, Amazon OpenSearch Service에 저장해서 실시간으로 데이터를 분석할 수 있습니다.
또한, 많은 양의 데이터를 배치 방식으로 분석하고 싶은 경우에는 서버리스 쿼리엔진 (Serverless Query engine)인 Amazon Athena를 이용해서 Amazon S3에 저장된 데이터를 조회하고 분석할 수 있습니다.
이렇게 실시간으로 발생하는 데이터를 수집해서, 저장 한 후에 분석을 할 수 있는 일괄 처리 (Batch Processing) 방식으로 데이터를 처리하는 배치 레이어와 실시간으로 데이터를 조회하고 분석하는 스피드 레이어(Speed Layer)로 구성된 데이터 분석 아키텍처를 람다 아키텍처(Lambda Architecture)라고 합니다.

이 블로그에서는 AWS DMS, Amazon Kinesis Data Streams, Amazon Data Firehose, Amazon OpenSearch Service, Amazon S3를 사용해서 람다 아키텍처의 Batch Layer와 Speed Layer를 구현한 사례를 소개하고자 합니다.

그림 1. 람다 아키텍처 (Lambda Architecture)

솔루션 개요

이번 블로그 포스팅에서 소개할 솔루션은 데이터베이스 CDC 데이터를 실시간으로 분석하기 위해 AWS DMS, Amazon Kinesis Data Streams, Amazon Data Firehose, Amazon OpenSearch Service를 사용합니다. 데이터의 수집, 처리, 그리고 분석 과정은 다음과 같습니다.

  1. AWS DMS는 Amazon Aurora MySQL 데이터베이스의 binlog를 읽어서 insert, update, 또는 delete 되는 CDC 데이터를 실시간으로 Kinesis Data Streams에 저장합니다.
  2. Kinesis Data Streams에 수집된 CDC 데이터는 Amazon Data Firehose를 이용해서 Amazon S3와 OpenSearch Service에 저장됩니다.
  3. Amazon OpenSearch를 이용해서 실시간으로 데이터를 분석할 수 있습니다. 동시에 Amazon S3는 데이터 레이크로 사용해서 대용량 데이터 분석을 하거나 머신 러닝에 필요한 학습 데이터 준비 작업에도 활용할 수 있습니다.

그림 2. 실시간 CDC 스트림 데이터 파이프라인 아키텍처

이 아키텍처에서 AWS DMS의 마이그레션 대상(Target Endpoint)으로 Amazon OpenSearch Service를 지정하지 않고, Amazon Kinesis Data Streams를 사용했습니다. CDC 데이터의 발생 빈도나 데이터 양이 갑자기 커질 경우에 Amazon OpenSearch Service에 데이터 적재에 대한 부하가 커질 수 있습니다. 그래서, AWS DMS와 Amazon OpenSearch Service 사이에 버퍼 역할을 해줄 수 있는 Amazon Kinesis Data Streams을 사용합니다. 이렇게 Amazon Kinesis Data Streams를 AWS DMS의 마이그레이션 대상으로 사용함으로써 CDC 데이터를 다양한 데이터 저장소로 복제할 수 있습니다.

이 솔루션에서는 실시간 분석을 위해서 Amazon OpenSearch Service로 데이터를 적재하는 동시에 장기간의 데이터 보관 및 배치 분석을 위해서 Amazon S3에도 데이터를 저장합니다. 그리고, 이 아키텍처에서는 Amazon Kinesis Data Streams의 데이터를 실시간으로 Amazon OpenSearch와 S3에 전송하기 위해서 Amazon Data Firehose를 도입했습니다.

람다 아키텍처 관점에서 AWS DMS, Kinesis Data Streams, Data Firehose, OpenSearch Service, S3 등은 다음과 같은 역할을 수행합니다.

  • Stream Ingestion – AWS DMS
  • Stream Storage – Amazon Kinesis Data Streams
  • Stream Delivery – Amazon Data Firehose
  • Raw Data Storage (Data Lake) – Amazon S3
  • Real-time View – Amazon OpenSearch Service

단계 요약

  • 단계 1: CDK 스택 배포 준비
  • 단계 2: AWS DMS의 Source endpoint 사용할 Aurora MySQL Cluster 생성 및 데이터 준비
  • 단계 3: AWS DMS의 Target endpoint로 사용할 Amazon Kinesis Data Streams 생성
  • 단계 4: AWS DMS Migration Task 생성
  • 단계 5: CDC 데이터의 실시간 수집과 분석을 위한 Amazon OpenSearch Service와 Amazon Data Firehose 생성
  • 단계 6: Amazon Data Firehose의 Amazon OpenSearch Service 접근 권한 설정

사전 준비 사항

솔루션 배포를 위해 AWS Cloud Development Kit (AWS CDK)를 사용합니다. 실습을 진행하기 위해 아래와 같이 준비합니다.

  • AWS 계정
  • AWS CLI: AWS CLI 사용을 위한 credential 설정
    aws configure --profile [your-profile]
    AWS Access Key ID [None]: xxxxxx
    AWS Secret Access Key [None]:yyyyyyyyyy
    Default region name [None]: us-east-1
    Default output format [None]: json
  • AWS CDK >= 2.128.0
  • Amazon EC2 Key Pair
  • jq – JSON 처리 Linux Uitility

단계 1: CDK 스택 배포 준비

  1. GitHub 리포지토리에서 솔루션 코드를 다운로드 받습니다.
    git clone https://github.com/aws-samples/aws-dms-cdc-data-pipeline
    cd aws-dms-cdc-data-pipeline
  2. CDK로 솔루션을 배포할 수 있도록 Python virtualenv를 이용해서 가상 환경을 구성하고, 필요한 Python 패키지를 설치합니다.
    python3 -m venv .venv
    source .venv/bin/activate
    pip install -r requirements.txt
  3. CDK 스택을 배포하는데, 필요한 설정 값을 저장하기 위해서 cdk.context.json 파일을 아래와 같이 생성합니다.
    {
      "db_cluster_name": "dms-src",
      "dms_data_source": {
        "database_name": "testdb",
        "table_name": "retail_trans"
      },
      "kinesis_stream_name": "retail_trans_stream",
      "opensearch_domain_name": "retail-trans",
      "opensearch_index_name": "trans",
      "ec2_key_pair_name": "bastion"
    }

    ec2_key_pair_name 에는 EC2 Key Pair 파일의 확장자 (pem)를 제외 시켜야 합니다. 예를 들어 KeyPair 파일 이름이bastion.pem 라면, bastion 으로 설정해야 합니다.

  4. 배포할 CDK 스택들은 cdk list 명령어를 이용해서 확인할 수 있습니다.
    (.venv) $ cdk list
    VpcStack
    AuroraMysqlStack
    AuroraMysqlBastionHost
    DMSTargetKinesisDataStreamStack
    DMSRequiredIAMRolesStack
    DMSAuroraMysqlToKinesisStack
    OpenSearchStack
    FirehoseStack

단계 2: AWS DMS의 Source endpoint 사용할 Aurora MySQL Cluster 생성 및 데이터 준비

2.1 AWS DMS의 데이터 소스로 사용할 Aurora MySQL cluster를 생성합니다.

cdk deploy VpcStack AuroraMysqlStack AuroraMysqlBastionHost

2.2 AWS DMS가 데이터베이스의 CDC를 읽을 수 있도록 binlog가 켜져 있는지 확인하고, binlog 보존 기간을 설정합니다.

  1. Bastion Host를 이용해서 Aurora MySQL에 접속합니다.
     $ BASTION_HOST_ID=$(aws cloudformation describe-stacks --stack-name AuroraMysqlBastionHost | \
     jq -r '.Stacks[0].Outputs | .[] | select(.OutputKey | endswith("EC2InstanceId")) | .OutputValue')
    
     $ aws ec2-instance-connect ssh --instance-id ${BASTION_HOST_ID} --os-user ec2-user
    
     [ec2-user@ip-172-31-7-186 ~]$ mysql -h {db-cluster-name.cluster-xxxxxxxxxxxx.region-name.rds.amazonaws.com} -uadmin -p
     Enter password:
     Welcome to the MariaDB monitor.  Commands end with ; or \g.
     Your MySQL connection id is 20
     Server version: 8.0.23 Source distribution
    
     Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
    
     Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
    
     MySQL [(none)]>
    

    MySQL 접속에 필요한 username, passwordAWS Secrets Manager 에서 확인할 수 있습니다.
    아래와 같이 DatabaseSecret-xxxxxxxxxxxx 와 같은 형태의 Secret name을 가지고 있는 Secret를 선택합니다.

    그리고, Secret value에서 MySQL 접속에 필요한 정보를 확인할 수 있습니다.

  2. 데이터베이스의 binlog가 활성화 되어있는지 확인합니다.
    MySQL [(none)]> SHOW GLOBAL VARIABLES LIKE "log_bin";
     +---------------+-------+
     | Variable_name | Value |
     +---------------+-------+
     | log_bin       | ON    |
     +---------------+-------+
     1 row in set (0.00 sec)

    binlog가 활성화 되어 있지 않은 경우에는 Amazon Aurora 개발자 가이드 문서를 참고해서 binlog을 활성화 시킵니다.

  3. binlog 보존 기간을 24시간으로 설정합니다.
    MySQL [(none)]> CALL mysql.rds_set_configuration('binlog retention hours', 24);
     Query OK, 0 rows affected (0.01 sec)

2.3 테스트용 데이터베이스(testdb)와 테이블(retail_trans)을 생성합니다.

CREATE DATABASE IF NOT EXISTS testdb;
USE testdb;

CREATE TABLE IF NOT EXISTS testdb.retail_trans (
    trans_id BIGINT(20) AUTO_INCREMENT,
    customer_id VARCHAR(12) NOT NULL,
    event VARCHAR(10) DEFAULT NULL,
    sku VARCHAR(10) NOT NULL,
    amount INT DEFAULT 0,
    device VARCHAR(10) DEFAULT NULL,
    trans_datetime DATETIME DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY(trans_id),
    KEY(trans_datetime)
) ENGINE=InnoDB AUTO_INCREMENT=0;

테스트용 데이터베이스에 대한 설정을 모두 마쳤습니다. 이제 CDK 스택을 배포 했던 Terminal로 다시 되돌아갑니다.

단계 3: AWS DMS의 Target endpoint로 사용할 Amazon Kinesis Data Streams 생성

AWS DMS에서 전송하는 CDC 데이터를 실시간으로 수집하기 위한 Amazon Kinesis Data Streams를 생성합니다.

cdk deploy DMSTargetKinesisDataStreamStack

단계 4: AWS DMS Migration Task 생성

이전 단계에서 생성한 MySQL 데이터베이스(예: testdb)의 테이블(예: retail_trans)에서 생성되는 CDC 데이터를 Amazon Kinesis Data Streams로 실시간으로 전송하기 위한 DMS Migration Task를 생성합니다.

cdk deploy DMSRequiredIAMRolesStack DMSAuroraMysqlToKinesisStack

단계 5: CDC 데이터의 실시간 수집과 분석을 위한 Amazon OpenSearch Service와 Amazon Data Firehose 생성

MySQL CDC 데이터를 수집해서 실시간으로 분석하기 위해서 Amazon OpenSearch Service를 생성합니다. 그리고, Amazon Kinesis Data Streams에서 Amazon OpenSearch Service로 데이터를 전송할 수 있도록 Amazon Data Firehose를 차례로 생성합니다.

5.1 Amazon OpenSearch Service 생성하기

cdk deploy OpenSearchStack

만약, OpenSearchStack을 배포하는 중에 아래와 같은 에러가 발생할 수 있습니다. 이러한 에러가 발생하는 이유는 Amazon OpenSearch Domain 생성에 필요한 AWSServiceRoleForAmazonOpenSearchService 라는 Service-Linked Role(SLR)이 없기 때문입니다.

11:11:30 AM | CREATE_FAILED        | AWS::OpenSearchService::Domain      | OpenSearch587998CD
Resource handler returned message: "Invalid request provided: Before you can proceed,
you must enable a service-linked role to give Amazon OpenSearch Service permissions to access your VPC.
(Service: OpenSearch, Status Code: 400, Request ID: 8e9618af-1554-4605-93a2-8c4cc22e2412)"
(RequestToken: ccad0316-8daa-5c2a-89a1-056e1e88f23a, HandlerErrorCode: InvalidRequest)

AWSServiceRoleForAmazonOpenSearchService 은 aws cli를 이용해서 아래와 같은 명령어로 생성가능합니다.

aws iam create-service-linked-role --aws-service-name opensearchservice.amazonaws.com

5.2 Amazon Data Firehose 생성하기

cdk deploy FirehoseStack 

단계 6: Amazon Data Firehose의 Amazon OpenSearch Service 접근 권한 설정

Amazon Data Firehose에서 Amazon OpenSearch Service로 데이터를 적재하기 위해서는 OpenSearch접근 권한이 필요합니다. OpenSearch에 대한 접근 권한은 Amazon IAM과 별개로 OpenSearch에서 직접 관리합니다. 따라서, 다음과 같은 순서로 Amazon Data Firehose에 OpenSearch에 대한 접근 권한을 부여합니다.

6.1 Amazon OpenSearch Service 클러스터가 private subnet에 있기 때문에 웹 브라우저에서 OpenSearch 대시보드에 접근하기 위해서 SSH 터널링을 사용합니다.

ssh config 파일에 아래와 같이 추가합니다. (Mac 사용자의 경우 ~/.ssh/config 파일에 해당됩니다.)

Host opstunnel
    HostName {Bastion Host Public IP}
    User ec2-user
    IdentitiesOnly yes
    IdentityFile {/path/to/ec2_key_pair_name}.pem
    LocalForward 9200 {OpenSearch Service Endpoint}:443

예시)

~$ ls -1 .ssh/
config
my-ec2-key-pair.pem

~$ tail .ssh/config
Host opstunnel
    HostName 214.132.71.219
    User ec2-user
    IdentitiesOnly yes
    IdentityFile ~/.ssh/my-ec2-key-pair.pem
    LocalForward 9200 vpc-retail-trans-qvwlxanar255vswqna37p2l2cy.us-east-1.es.amazonaws.com:443

6.2 Terminal에서 ssh -N opstunnel 명령어를 실행하고 웹 브라우저에서 https://localhost:9200/_dashboards/ 로 접속합니다.


OpenSearch Service 접속에 필요한 usernamepasswrod는 OpenSearch Cluster를 생성하는 동안에 자동으로 되어 Amazon Secrets Manager에 저장되어있습니다.

아래와 같이 OpenSearchMasterUserSecret-xxxxxxxxxxxx 와 같은 형태의 Secret name을 가지고 있는 Secret를 선택합니다.


Secret value에서 OpenSearch Service 접속에 필요한 정보를 확인할 수 있습니다.

6.3 OpenSearch의 Security 메뉴에서 Amazon Data Firehose를 위한 Role을 생성하고, 필요한 권한을 부여합니다.

  1.  아래와 같이 OpenSearch Dashboards의 Security 메뉴에 접속합니다.
  2. Amazon Data Firehose가 OpenSearch에 Index를 생성하고, 데이터를 적재할 수 있도록 아래와 같은 권한을 갖는 Role을 생성합니다.
    • Cluster permissions: cluster_composite_ops, cluster_monitor
    • Index permissions:
      • Index: retail-trans*
      • Index permissions: crud, create_index, manage
  3. 이전 단계에서 생성한 Role(예: firehose-role)을 Amazon Data Firehose에 매핑하기 위해서 Mapped users 설정으로 이동해서 Manage mapping 을 클릭합니다.
  4. Map user 설정에서 Backend roles 에 Amazon Data Firehose의 ARN을 입력합니다.
    참고로 Amazon Data Firehose에 OpenSearch 접근 권한을 설정하지 않는 경우, Amazon Data Firehose에서 OpenSearch에 데이터를 전송할 때, 다음과 같은 에러 메시지가 발생합니다.

    Error received from the Amazon OpenSearch Service cluster or OpenSearch Serverless collection.
    If the cluster or collection is behind a VPC, ensure network configuration allows connectivity.
    
    {
      "error": {
        "root_cause": [
          {
            "type": "security_exception",
            "reason": "no permissions for [indices:data/write/bulk] and User [name=arn:aws:iam::123456789012:role/KinesisFirehoseServiceRole-retail-trans-us-east-1, backend_roles=[arn:aws:iam::123456789012:role/KinesisFirehoseServiceRole-retail-trans-us-east-1], requestedTenant=null]"
          }
        ],
        "type": "security_exception",
        "reason": "no permissions for [indices:data/write/bulk] and User [name=arn:aws:iam::123456789012:role/KinesisFirehoseServiceRole-retail-trans-us-east-1, backend_roles=[arn:aws:iam::123456789012:role/KinesisFirehoseServiceRole-retail-trans-us-east-1], requestedTenant=null]"
      },
      "status": 403
    }

6.4 (선택 사항) MySQL의 Time Zone을 UTC로 설정했기 때문에, OpenSearch Dashboard에서 UTC 기준으로 데이터를 조회할 수 있도록 Advance Settings 에서 Timezone for date formatting을 Etc/UTC 로 설정합니다.

테스트

이제 AWS DMS를 이용해서 MySQL 데이터베이스의 CDC 데이터를 실시간으로 Amazon OpenSearch Service와 S3에 저장할 수 있는 데이터 파이프라인 구축을 완료했습니다.

지금부터는 아래와 같은 방법으로 AWS DMS를 이용한 데이터 파이프라인이 정상적으로 작동 하는지 확인해 보겠습니다.

  1. DMS Replication Task를 실행 시켜서 MySQL CDC 데이터를 실시간으로 가져올 수 있도록 준비합니다.
    $ DMS_TASK_ARN=$(aws cloudformation describe-stacks --stack-name DMSAuroraMysqlToKinesisStack \
    | jq -r '.Stacks[0].Outputs | map(select(.OutputKey == "DMSReplicationTaskArn")) | .[0].OutputValue')
    
    $ aws dms start-replication-task \
              --replication-task-arn ${DMS_TASK_ARN} \
              --start-replication-task-type start-replication
  2. Bastion Host에 접속해서, 테스트 데이터를 생성하는 Python 스크립트를 실행합니다. 만약, Python 모듈 Import Error가 발생하는 경우에는 아래와 같은 Python 패키지를 추가로 설치합니다.
    python3 gen_fake_mysql_data.py \
        --database testdb \
        --table retail_trans \
        --user {Database Username} \
        --password {Database Password} \
        --host {Database Endpoint}\
        --max-count 200

    만약, Python 모듈 Import Error가 발생하는 경우에는 아래와 같은 Python 패키지를 추가로 설치합니다.

    boto3
    dataset==1.5.2
    Faker==13.3.1
    PyMySQL==1.0.2
  3. 테스트 데이터 생성 Python 스크립트에 의해 MySQL에 저장되는 데이터는 Amazon Kinesis Data Streams의 Data Viewer를 이용해서 실시간으로 확인할 수 있습니다. Amazon Kinesis Data Streams 콘솔 접속해서, AWS DMS의 타켓 Kinesis Data Streams(예: retail_trans_stream)을 선택한 다음 Data Viewer 탭을 선택합니다. Data Viewer 탭에서 레코드를 보려는 샤드(shard)를 선택하고 시작 위치(Starting position)를 선택한 다음 레코드 가져오기(Get records)를 클릭하면, 아래와 같이 AWS DMS를 통해서 Kinesis Data Streams에 전송된 MySQL의 CDC 데이터를 확인할 수 있습니다.
  4. MySQL의 CDC 데이터가 OpenSearch에 실시간으로 저장되는 것을 확인하기 위해서 OpenSearch Dashboards에 접속합니다. 아래 그림과 같이 OpenSearch Dashboards 메뉴에서 Dev Tools를 선택합니다.
    OpenSearch Dashboards의 Dev Tools에서 다음과 같이 검색 쿼리를 실행합니다.

    GET retail-trans/_search
    {
        "query": {
            "match_all": {}
        }
    }

    검색 쿼리를 실행하면, 아래와 같이 OpenSearch에 저장된 CDC 데이터를 확인할 수 있습니다.

리소스 정리하기

실습을 완료한 후 불필요한 과금을 방지하기 위해 사용했던 리소스를 모두 삭제합니다.

  1. 리소스 삭제를 위해서 DMS Replication task를 중지 시킵니다.
    $ DMS_TASK_ARN=$(aws cloudformation describe-stacks --stack-name DMSAuroraMysqlToKinesisStack \
    | jq -r '.Stacks[0].Outputs | map(select(.OutputKey == "DMSReplicationTaskArn")) | .[0].OutputValue')
    
    $ aws dms stop-replication-task --replication-task-arn ${DMS_TASK_ARN}
  2. cdk destroy 명령어를 이용해서 CDK로 배포했던 모든 스택들을 삭제합니다.
    cdk destroy --force --all

맺음말

이상으로 Amazon DMS, Kinesis Data Streams, Data Firehose, OpenSearch Service, 그리고, S3를 이용한 실시간 CDC 데이터 파이프라인을 구축해봤습니다. 이 솔루션에서 사용된 AWS 서비스들은 서버리스 또는 관리형 서비스 입니다. 따라서, 이 솔루션을 사용하면, 운영 부담이 거의 없고, 안전하게 데이터를 RDBMS에서 Amazon OpenSearch Service와 S3로 실시간으로 옮길 수 있습니다. 특히, Amazon DMS를 Amazon DMS Serverless로 Amazon OpenSearch Service 대신 Amazon OpenSearch Serverless를 사용한다면, 서버리스 데이터 파이프라인을 이용한 람다 아키텍처를 구현할 수 있습니다. RDBMS의 CDC 데이터를 S3로 실시간으로 저장해서 관리하고, 분석하고 싶은 분들에게 이 솔루션이 좋은 시작점이 될 것이라고 생각됩니다.

CDC 데이터를 S3에 저장할 때, Update와 Delete를 In-place로 처리하고 싶은 경우에는 S3에 Apache Iceberg, Hudi, Delta Lake와 같은 오픈 테이블 포맷(Open Table Format)을 사용해서 처리할 수 있습니다. 이 부분에 대해서 궁금하신 분들은 아래 참고 자료를 살펴보시는 것을 추천 드립니다.

참고 자료

Sungmin Kim

Sungmin Kim

김성민님은 AWS의 솔루션즈 아키텍트 입니다. Startup 고객들과 협력하여 비즈니스 성과를 실현하는데 도움을 드리고 있습니다.

Jisoo Min

Jisoo Min

Jisoo Min is a Startup Solutions Architect at Amazon Web Services (AWS). She loves working with startups and data analytics services.