亚马逊AWS官方博客

基于Amazon Kinesis Video Streams实现 IP Camera云存项目

近年来市场上IP Camera产品价格持续走低,硬件利润单薄,很多厂商通过增值服务发力,增值服务比较成熟的业务形态是云端存储,其实现方式是设备检测报警事件触发一段视频上传云端并提供终端用户回看从而产生收费。Amazon Kinesis Video Streams实现云存业务优势明显,提供Device SDK摄取视频并上传Endpoint,终端用户可以基于HLS/DASH主流的协议播放观看。

本案例基于Serverless方式实现云存方案,Devices推流过程中记录metadata,通过 Amazon API Gateway , Amazon Lambda , Amazon DynamoDB留存video metadata,前端应用通过video metadata生成播放URL,通过HLS Player播放。

 

准备工作:

  • 下载amazon-kinesis-video-streams-producer-sdk-cpp 源码并编译libKinesisVideoProducerJNI
  • 下载amazon-kinesis-video-streams-producer-sdk-java源码
  • 安装IntelliJ IDEA

 

架构图

创建Amazon Kinesis Video Streams

登录console,区域选择新加坡, 选择Kinesis Video Streams服务,创建视频流名称 “kvs-stream”, Data retention选择7 (7天云存)

创建访问密钥

新建一个用户”CloudStorageUser”, 勾选”Programmatic access”,
增加2个权限如下图,这里仅用于测试和演示,生产环境建议最小化权限。拷贝并保存Access Key ID和Secret Access Key

 

选择AWS Secrets Manager服务添加密钥,新增Secret key/value, 将AK/SK填入指定密钥名称“ipc-cloudstorage-access-kvs-secretkey”
拷贝Python3的sample code, 后续Lambda Function中使用.

 

创建Lambda

创建function选择名称“save_devices_video_metadata”,Runtime选择Python 3.6

HttpMethod 为PUT,接收metadata并存表

if event['httpMethod'] == 'PUT' :

        id = _body['clientID']

        deviceID = _body['deviceID']

        streamName = _body['streamName']

        begTime = _body['begTime']

        endTime = _body['endTime']

        duration = _body['duration']

        res = save_dynamodb_tb(id,deviceID,begTime,endTime,duration,streamName)

HTTPMethod为GET,获取HLS URL

if event['httpMethod'] == 'GET' :

            streamName = _body['streamName']

            begTime = _body['begTime']

            endTime = _body['endTime']

            duration = _body['duration']

            descStream =kvs.describe_stream(StreamName=streamName)

            Stream_ARN = descStream['StreamInfo']['StreamARN']

            get_hls_response =

 kvs.get_data_endpoint(APIName="GET_HLS_STREAMING_SESSION_URL",StreamARN=Stream_ARN)

            hls_endpoint = get_hls_response['DataEndpoint']

            kvs_client = boto3.client("kinesis-video-archived-media",

                                endpoint_url=hls_endpoint,

                                region_name=REGION_NAME,

 aws_access_key_id=AWS_ACCESS_KEY_ID,

                                aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

            # 获取hls url

            res = get_hls_url(kvs_client,streamName,begTime,endTime,duration)

Lambda源码:https://github.com/beiyue/save_devices_video_metadata/blob/main/lambda_function.py

 

Lambda 授权

IAM创建Policy 选择JSON, 编辑内容如下, 保存Policy名称“getSecretValueForIPC ”

{

    "Version": "2012-10-17",

    "Statement": {

        "Effect": "Allow",

        "Action": "secretsmanager:GetSecretValue",

        "Resource": "<SECRET_ARN>"

    }

}

 

选择Lambda Configuration编辑Role附加策略“getSecretValueForIPC”

附件策略“AmazonKinesisVideoStreamsReadOnlyAccess”

 

创建DynamoDB

创建DynamoDB表,新建表名“tb_device_metadata”元数据包含设备ID,Kinesis Stream名称,视频片段开始时间和结束时间,视频时长,写入时间

 

创建API

在Amazon API Gateway中选择 ”Create API”,选择 ”REST API” Build, 选择API名称 ”video_record_metadata” , Endpoint Type选择 ”Regional”。

完成后,选择“Actions”,选择“Create Resource”资源名称为“ipc-video-metadata”


选择“Actions”,选择“Create Method”,选择PUT,

选择Lambda Function, 勾选Use Lambda Proxy integration,选型Lambda Region “ap-southeast-1”,选择Lambda Function“save_device_video_metadata”并保存

GET方法同上

 

创建API模型

左侧选择Models,创建Model,模型名称“IPCMetaData”

Model shema复制如下

{

  "$schema": "http://json-schema.org/draft-04/schema#",

  "title": "IPCMetaData",

  "type": "object",

  "properties": {

    "clientID": { "type": "string" },

    "deviceID": { "type": "string" },

    "streamName": { "type": "string" },

    "begTime": { "type": "string" },

    "endTime": { "type": "string" },

    "duration": { "type": "string" }

    }

}

 

PUT Method 增加请求模型

选择Resources, 选择PUT,选择Method Request,选择Request Body,增加Model

部署API

选择Actions , 选择Deploy API,Stage name选择“test”, 选择Deploy.

 

创建JAVA SDK

在test Stage Editor选择SDK Generation,选择Platform,选择“Java SDK”. 输入参数如下:

点击Generate SDK, 下载genrate code.zip文件,解压打开,打开Terminal执行
mvn install

Maven安装本地仓库

mvn install:install-file -Dfile=./target/cloudstorage-demo-1.0.jar -DgroupId=ipc-sdk

–DartifactId=cloudstorage-demo -Dversion=1.0 -Dpackaging=jar

pom.xml中增加dependency

<dependency>

    <groupId>ipc-sdk</groupId>

    <artifactId>cloudstorage-demo</artifactId>

    <version>1.0</version>

</dependency>

 

创建推流端

下载amazon-kinesis-video-streams-producer-sdk-java,修改DemoAppMain.java

public static void main(final String[] args) {
    try {
        final KinesisVideoClient kinesisVideoClient = KinesisVideoJavaClientFactory
                .createKinesisVideoClient(
                        Regions.AP_SOUTHEAST_1, //指定新加坡区域
                        AuthHelper.getSystemPropertiesCredentialsProvider());
        //文件媒体源
        final MediaSource mediaSource = createFileMediaSource();
        kinesisVideoClient.registerMediaSource(mediaSource);
        mediaSource.start();

    } catch (final KinesisVideoException e) {
        throw new RuntimeException(e);
    }
}

NativeKinesisVideoProducerStream.java增加方法putDurationMetaData,提交元数据,这里用到前面的SDK,IPCMetaData用于封装元数据

//记录一段视频片段的Metadata
private void putDurationMetaData(@Nonnull final KinesisVideoFrame frame){
 
// 每一段视频以关键帧为起始

    if(startTime == 0L && frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME){

        startTime = frame.getDecodingTs();
 
// 这里视频时长以20秒为单位进行记录

    }else if(frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME && (frame.getDecodingTs() - startTime) > 20 * Time.HUNDREDS_OF_NANOS_IN_A_SECOND ){

        long index = frame.getIndex();

        long endTime = frame.getDecodingTs();

        long durationTime = (endTime - startTime)/Time.HUNDREDS_OF_NANOS_IN_A_SECOND;

        IPCCloudStorageSdk client = IPCCloudStorageSdk.builder().connectionConfiguration(

                new ConnectionConfiguration()

                        .maxConnections(100)

                        .connectionMaxIdleMillis(1000))

                .timeoutConfiguration(

                        new TimeoutConfiguration()

                                .httpRequestTimeout(5000)

                                .totalExecutionTimeout(10000)

                                .socketTimeout(3000))

                .build();
    //API Gateway JAVA SDK 生成的Model类

        IPCMetaData metaData = new IPCMetaData();
   //视频片段的唯一标示

metaData.setClientID(String.valueOf(Time.getCurrentTime())+String.format("%06d", index));
    //设备唯一标示

        metaData.setDeviceID(mDeviceInfo.getName());

        //开始时间,毫秒为单位
metaData.setBegTime(String.valueOf(startTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
    //流名称,对应KVS Stream Name

        metaData.setStreamName(mStreamInfo.getName());

        //结束时间, 毫秒为单位
metaData.setEndTime(String.valueOf(endTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
    //时长,秒为单位

        metaData.setDuration(String.valueOf(durationTime));

        // API Gateway JAVA SDK 生成的方法请求类

        MetaDataInputRequest req = new MetaDataInputRequest().iPCMetaData(metaData);

        MetaDataInputResult result = client.metaDataInput(req);

        mLog.info("Duration Metadata : %s, %s, %s ,%s ,%s ,%s ",metaData.getClientID(),metaData.getDeviceID(),metaData.getStreamName(),metaData.getBegTime(),metaData.getEndTime(),metaData.getDuration());

        //重置开始时间,上一个视频结束时间为下一个视频的开始时间。

        startTime = frame.getDecodingTs();

    }

}

推流源码:https://github.com/beiyue/amazon-kinesis-video-streams-producer-sdk-java.git

 

推流测试

注意这里AK/SK方式仅用于测试和演示,实际生产环境建议使用更安全方式,比如通过IoT证书获取临时身份,参见https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-iot.html

Amazon DynamoDB中可以看到生成了视频段的metadata

获取播放URL

从Amazon DynamoDB中随机选取一个item的客户端ID,流名称,开始时间,结束时间, 时长,terminal执行:

curl -v -X GET  'https://xxxxxx.execute-api.ap-southeast-1.amazonaws.com/test/ipc-video-metadata'  -d ' {  "clientID": "162369215xxxxxxxx52","streamName": "kvs-stream", "begTime":"1623728297399","endTime":"1623728321043","duration":"22"}'

*   Trying 52.221.1XX.1XX...

* TCP_NODELAY set

* Connected to xxxxxx.execute-api.ap-southeast-1.amazonaws.com (52.221.1XX.1XX) port 443 (#0)

* ALPN, offering h2

* ALPN, offering http/1.1

* Cipher selection: ALL:!EXPORT:!EXPORT40:!EXPORT56:!aNULL:!LOW:!RC4:@STRENGTH

* successfully set certificate verify locations:

*   CAfile: /etc/ssl/cert.pem

  CApath: none

* TLSv1.2 (OUT), TLS handshake, Client hello (1):

* TLSv1.2 (IN), TLS handshake, Server hello (2):

* TLSv1.2 (IN), TLS handshake, Certificate (11):

* TLSv1.2 (IN), TLS handshake, Server key exchange (12):

* TLSv1.2 (IN), TLS handshake, Server finished (14):

* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):

* TLSv1.2 (OUT), TLS change cipher, Client hello (1):

* TLSv1.2 (OUT), TLS handshake, Finished (20):

* TLSv1.2 (IN), TLS change cipher, Client hello (1):

* TLSv1.2 (IN), TLS handshake, Finished (20):

* SSL connection using TLSv1.2 / ECDHE-RSA-AES128-GCM-SHA256

* ALPN, server accepted to use h2

* Server certificate:

*  subject: CN=*.execute-api.ap-southeast-1.amazonaws.com

*  start date: Aug 29 00:00:00 2020 GMT

*  expire date: Sep 29 12:00:00 2021 GMT

*  subjectAltName: host " xxxxxx.execute-api.ap-southeast-1.amazonaws.com" matched cert's "*.execute-api.ap-southeast-1.amazonaws.com"

*  issuer: C=US; O=Amazon; OU=Server CA 1B; CN=Amazon

*  SSL certificate verify ok.

* Using HTTP2, server supports multi-use

* Connection state changed (HTTP/2 confirmed)

* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0

* Using Stream ID: 1 (easy handle 0x7f80e8005600)

> GET /test/ipc-video-metadata HTTP/2

> Host: xxxxxx.execute-api.ap-southeast-1.amazonaws.com

> User-Agent: curl/7.54.0

> Accept: */*

> content-type: application/json

> day: Thursday

> Content-Length: 138

>

* Connection state changed (MAX_CONCURRENT_STREAMS updated)!

* We are completely uploaded and fine

< HTTP/2 200

< date: Tue, 15 Jun 2021 08:00:11 GMT

< content-type: application/json

< content-length: 261

< x-amzn-requestid: 9323XXX-4ff1-4dcf-ae57-365551b74636

< x-amz-apigw-id: A9Oh0FJWXXXX_Q=

< x-amzn-trace-id: Root=1-60c85e0b-01f095137c54211e74c3e1f3;Sampled=0

<

* Connection #0 to host xxxxxx.execute-api.ap-southeast-1.amazonaws.com left intact

"https://b-xxxxxx.kinesisvideo.ap-southeast-1.amazonaws.com/hls/v1/getHLSMasterPlaylist.m3u8?SessionToken=XXXXdf9Ro85h4AeC_n2yjc96_YLu1vigKX5qterUpPzxIQYibs4go4_KCqXEiWnXXXXWng92QY5HiGbEIkSa38f9d3XXXXX:~ "

打开https://www.hlsplayer.org/  填入URL地址

播放展示


小结,本方案展示了IPC Camera 运行 Amazon Kinesis Video Streams实现推流功能;运行Amazon API Gateway + Amazon Lambda + Amazon DynamoDB 留存视频的metadata,根据metadata获取指定时段的视频片段的session url, 播放器可正常播放。很多客户希望构建云存场景,结合这几个服务一起构建完整的云存解决方案。

 

本篇作者

周晓明

Amazon Web Services 解决方案架构师,负责基于 Amazon Web Services 的云计算方案架构的咨询和设计,同时致力于物联网方向研究和推广,在安防监控领域有丰富实践经验。