The Internet of Things on AWS – Official Blog

Integrating IoT data with your data lake with new AWS IoT Analytics features

Post by Asim Kumar Sasmal, an AWS Senior Data Architect, and Vikas Panghal, an AWS Senior Product Manager

 

AWS IoT Analytics offers two new features to integrate IoT data ingested through AWS IoT Analytics with your data lake in your own AWS account: customer-managed Amazon S3 and dataset content delivery to Amazon S3.

Previously, your AWS IoT Analytics data could only be used with an Amazon S3 bucket managed by the AWS IoT Analytics service. This post describes a new end-to-end flexible, cost-effective, secure, and reliable solution to integrate IoT data with your data lake using these two recently released features.

Overview

AWS IoT Analytics is a fully managed IoT service that lets you filter, transform, and enrich real-time streaming IoT data. It stores data in a data store for analysis using a built-in SQL query engine and performs complex analytics such as machine learning inference.

The service also provides out-of-the-box integration for data visualization with Amazon QuickSight. AWS IoT Analytics also offers advanced data-exploration capabilities with Jupyter notebooks hosted and managed by Amazon SageMaker. You can containerize and schedule the execution of the Jupyter notebooks with just a few clicks.

Customers often ask how to bring their IoT data ingested using AWS IoT Analytics to their non-IoT data assets in their data lake on AWS seamlessly in near real time (~1–5 minutes). These data assets can include reference data such as ERP, financial, third-party weather data etc.

The goals are as follows:

  • Build an open, flexible, and consistent data store downstream on S3 for all your data assets in the company.
  • Democratize access to users for self-serve analytics.
  • Use all the undifferentiated heavy-lifting offered by AWS IoT Analytics for high-velocity streaming data ingestion and processing.

Solution

With the customer-managed Amazon S3 feature, you can create AWS IoT Analytics channels and data stores in an S3 bucket that you manage in your own AWS account. With the dataset content delivery to Amazon S3 feature, you can send AWS IoT Analytics dataset content results (materialized views of your AWS IoT Analytics data) to your S3 bucket in your AWS account.

As a result, you can now automatically create an AWS Glue catalog table containing the schema of your AWS IoT Analytics dataset content results and run queries with Amazon Athena. Because the dataset content results are saved in your S3 bucket, you can apply your own S3 permissions and manage them according to your governance policies.

Following architecture diagram illustrates the high-level end-to-end solution you will build in this blog post.

Walkthrough

This step-by-step walkthrough consists of the following sections:

  1. Prerequisites
  2. Setting up the AWS IoT Analytics channel, pipeline, and data store
  3. Ingesting sample data into the AWS IoT Analytics channel
  4. Creating a dataset with dataset content delivery to S3
  5. Integrating IoT data with other data stored in your data lake on AWS

Prerequisites

For this use case, make sure that you have the following resources:

  1. An AWS account in the same AWS Region where AWS IoT Analytics is available. This use case uses the US East (N. Virginia) Region. However, you can choose another AWS Region where AWS IoT Analytics is available.
  2. The AdministratorAccess policy granted to your AWS account (for production, we recommend restricting this further).
  3. The AWS CLI installed and configured to use with your AWS account.

This post provides an AWS CloudFormation template here to set up the required prerequisites. The template requires the following eight parameters with their default values. Replace aks with your own unique prefix. This avoids any conflict with AWS global resources across AWS accounts such as bucket names, which are unique across AWS accounts.

  • Parameter: DataLakeGoldIoTADatasetS3Bucket; Default Value: aks-datalake-gold-iota-dataset
  • Parameter: DataLakeRawIoTAChannelS3Bucket; Default Value: aks-datalake-raw-iota-channel
  • Parameter: DataLakeSilverIoTADatastoreS3Bucket; Default Value: aks-datalake-silver-iota-datastore
  • Parameter: DataLakeGoldS3Bucket; Default Value: aks-datalake-gold
  • Parameter: IoTAGlueDB; Default Value: aks_datalake_iota_glue_db
  • Parameter: IoTAGlueTable; Default Value: aks_device_telemetry_stats
  • Parameter: DataLakeGoldGlueDB; Default Value: aks_datalake_gold_glue_db
  • Parameter: DataLakeGoldDeviceSpecsGlueTable; Default Value: aks_device_specs

The AWS CloudFormation template creates the following resources:

  • Four S3 buckets in your data lake, with the default encryption as AES-256 (SSE-S3):
    • aks-datalake-raw-iota-channel
    • aks-datalake-silver-iota-datastore
    • aks-datalake-gold-iota-dataset
    • aks-datalake-gold
  • A bucket policy for aks-datalake-silver-iota-datastore to grant necessary permissions to AWS IoT Analytics.
  • An IAM role named iota_cmsb_role with trust relationships for AWS IoT, AWS Glue, and AWS IoT Analytics, and to grant necessary IAM permissions. This role allows the AWS IoT Core rules engine to publish MQTT messages to the AWS IoT Analytics channel and AWS IoT Analytics to process the real-time streaming data and create datasets.
    • If you create the AWS IoT Analytics resources (channel, pipeline, data store, dataset) from the AWS Management Console, the console creates the IAM role to grant necessary permissions. However, in an actual production environment, set up your IAM role ahead of time, with minimal permissions per your security standards.
  • An AWS Glue database named aks_datalake_iota_glue_db to use for AWS IoT Analytics dataset creation. It stores the necessary schema/metadata of the dataset content written to S3 for Athena to query the data content.
  • An AWS Glue Data Catalog table named aks_device_telemetry_stats in aks_datalake_iota_glue_db for Athena to query the dataset content.
  • An AWS Glue database named aks_datalake_gold_glue_db to use in an Athena query. It integrates your AWS IoT Analytics dataset written to S3 with your existing device specifications data in your data lake.
  • An AWS Glue Data Catalog table named aks_device_specs that has the schema/metadata for the device specification file on S3 (your existing data lake).

Copy and save the Amazon Resource Name (ARN) for the IAM role named iota_cmsb_role created by the AWS CloudFormation template.

Using the AWS CLI, copy the device specifications CSV file named device_specs.csv from here to aks-datalake-gold bucket, as follows:

aws s3 cp device_specs.csv s3://aks-datalake-gold/aks_datalake_gold_glue_db/aks_device_specs/ --region 'us-east-1'

upload: ./device_specs.csv to s3://aks-datalake-gold/aks_datalake_gold_glue_db/aks_device_specs/device_specs.csv

Setting up the AWS IoT Analytics channel, pipeline, and data store

Create an AWS IoT Analytics channel named aks_datalake_raw_iota_channel with Customer-managed Amazon S3 bucket as the storage type. Replace the IAM role with the ARN that you noted earlier and the aks prefix with your own prefix.

aws iotanalytics create-channel --cli-input-json file://mychannel.json --region 'us-east-1'

{
    "channelName": "aks_datalake_raw_iota_channel",
    "channelArn": "arn:aws:iotanalytics:us-east-1:841719529236:channel/aks_datalake_raw_iota_channel"
}

The file mychannel.json contains the following code:

{
    "channelName": "aks_datalake_raw_iota_channel",
    "channelStorage": {
        "customerManagedS3": {
            "bucket": "aks-datalake-raw-iota-channel",
            "keyPrefix": "myhome_raspberrypi/",
            "roleArn": "arn:aws:iam::xxxxxxxx:role/iota_cmsb_role"
        }
    }
}

Create an AWS IoT Analytics rule that sends messages to the channel that you created earlier. Replace the IAM role ARN with the one that you noted earlier and the aks prefix with your own prefix.

aws iot create-topic-rule --rule-name aks_iota_datalake_raw_iota_channel --topic-rule-payload file://rule.json --region 'us-east-1'

The file rule.json contains the following code:

{
    "sql": "SELECT * FROM 'iota/topic/myhome_raspberrypi'",
    "ruleDisabled": false,
    "awsIotSqlVersion": "2016-03-23",
    "actions": [
        {
            "iotAnalytics": {
                "channelName": "aks_datalake_raw_iota_channel",
                "roleArn": "arn:aws:iam::841719529236:role/iota_cmsb_role"
            }
        }
    ]
}

Create an AWS IoT Analytics data store named aks_datalake_silver_iota_datastore with Customer-managed Amazon S3 bucket as the storage type. Replace the IAM role ARN with the one that you noted earlier and the aks prefix with your own prefix.

aws iotanalytics create-datastore --cli-input-json file://mydatastore.json --region 'us-east-1'

{
"datastoreName": "aks_datalake_silver_iota_datastore",
"datastoreArn": "arn:aws:iotanalytics:us-east-1:841719529236:datastore/aks_datalake_silver_iota_datastore"
}

The file mydatastore.json contains the following code:

{
    "datastoreName": "aks_datalake_silver_iota_datastore",
    "datastoreStorage": {
        "customerManagedS3": {
            "bucket": "aks-datalake-silver-iota-datastore",
            "keyPrefix": "myhome_raspberrypi/",
            "roleArn": "arn:aws:iam::841719529236:role/iota_cmsb_role"
        }
    }
}

Create an AWS IoT Analytics pipeline named aks_datalake_iota_pipeline with the pipeline source as aks_datalake_raw_iota_channel and the pipeline output as aks_datalake_silver_iota_datastore. Replace the aks prefix with your own prefix.

aws iotanalytics create-pipeline --cli-input-json file://mypipeline.json --region 'us-east-1'

{
"pipelineName": "aks_datalake_iota_pipeline",
"pipelineArn": "arn:aws:iotanalytics:us-east-1:841719529236:pipeline/aks_datalake_iota_pipeline"
}

The file mypipeline.json contains the following code:

{
    "pipelineName": "aks_datalake_iota_pipeline",
    "pipelineActivities": [
        {
            "channel": {
                "name": "mychannelactivity",
                "channelName": "aks_datalake_raw_iota_channel",
                "next": "mystoreactivity"
            }
        },
        {
            "datastore": {
                "name": "mystoreactivity",
                "datastoreName": "aks_datalake_silver_iota_datastore"
            }
        }
    ]
}

Ingesting sample data into the AWS IoT Analytics channel

A Unix shell script generates the sample MQTT messages with the following code (replace the mqtttopic, region, and profile parameters per your environment). If you have the AWS CLI available and configured for your AWS demo environment, execute the shell script from your laptop or Amazon EC2. For this use case, send 1000 messages (iterations=1000).

#!/bin/bash

mqtttopic='iota/topic/myhome_raspberrypi'
iterations=1000
wait=5
region='us-east-1'
profile='default'

for (( i = 1; i <= $iterations; i++)) {

  CURRENT_TS=`date +%s`
  DEVICE="P0"$((1 + $RANDOM % 5))
  FLOW=$(( 60 + $RANDOM % 40 ))
  TEMP=$(( 15 + $RANDOM % 20 ))
  HUMIDITY=$(( 50 + $RANDOM % 40 ))
  VIBRATION=$(( 100 + $RANDOM % 40 ))

  # 3% chance of throwing an anomalous temperature reading
  if [ $(($RANDOM % 100)) -gt 97 ]
  then
    echo "Temperature out of range"
    TEMP=$(($TEMP*6))
  fi

  echo "Publishing message $i/$ITERATIONS to IoT topic $mqtttopic:"
  echo "current_ts: $CURRENT_TS"
  echo "deviceid: $DEVICE"
  echo "flow: $FLOW"
  echo "temp: $TEMP"
  echo "humidity: $HUMIDITY"
  echo "vibration: $VIBRATION"

  aws iot-data publish --topic "$mqtttopic" --payload "{\"deviceid\":\"$DEVICE\",\"current_ts\":$CURRENT_TS,\"flow\":$FLOW,\"temp\":$TEMP,\"humidity\":$HUMIDITY,\"vibration\":$VIBRATION}" --profile "$profile" --region "$region"

  sleep $wait
}

After the Unix shell script gets kicked off, you see the following output:

Publishing message 1/ to IoT topic iota/topic/myhome_raspberrypi:
current_ts: 1559504319
deviceid: P03
flow: 92
temp: 29
humidity: 81
vibration: 127
Publishing message 2/ to IoT topic iota/topic/myhome_raspberrypi:
current_ts: 1559504324
deviceid: P01
flow: 67
temp: 21
humidity: 87
vibration: 134
……

The sample script ingests data to the AWS IoT Analytics channel – aks_datalake_raw_iota_channel. At ingestion, AWS IoT Analytics partitions the data per day for both the channel and the data store to help with query performance. Go to your respective S3 buckets for your channel and data store. Confirm that you see data similar to the following screenshots.

In the AWS IoT Analytics console, choose Channel, aks_datalake_raw_iota_channel to monitor IncomingMessages, as shown in the following graph.

Creating a dataset with dataset content delivery to S3

Streaming data is being ingested to the AWS IoT Analytics channel and data store using the pipeline. Create an AWS IoT Analytics dataset named aks_datalake_gold_iota_dataset with Customer-managed Amazon S3 bucket as the storage type.

Recently, AWS IoT Analytics released a new enhancement to support faster SQL dataset refresh intervals, so you can now refresh your SQL datasets as frequently as 1 minute. For this use case, refresh the schedule every 5 minutes as shown in the following code. Replace the IAM role ARN with the one that you noted earlier and the aks prefix with your own prefix.

aws iotanalytics create-dataset --cli-input-json file://mydataset.json --region 'us-east-1'

{
    "datasetName": "aks_datalake_gold_iota_dataset",
    "datasetArn": "arn:aws:iotanalytics:us-east-1:841719529236:dataset/aks_datalake_gold_iota_dataset"
}

The file mydataset.json contains the following code:

{
    "datasetName": "aks_datalake_gold_iota_dataset",
    "actions": [
        {
            "actionName": "myaction",
            "queryAction": {
                "sqlQuery": "SELECT current_timestamp dtts, deviceid, avg(temp) avg_temp, avg(flow) avg_flow, avg(humidity)  avg_humidity, avg(vibration) avg_vibration FROM aks_datalake_silver_iota_datastore where   from_unixtime(cast(current_ts as double)) > current_timestamp - interval '5' minute group by deviceid"
            }
        }
    ],
    "contentDeliveryRules": [
        {
            "destination": {
                "s3DestinationConfiguration": {
                    "bucket": "aks-datalake-gold-iota-dataset",
                    "key": "aks_datalake_iota_glue_db/aks_device_telemetry_stats/!{iotanalytics:scheduleTime}_!{iotanalytics:versionId}.csv",
                    "glueConfiguration": {
                        "tableName": "aks_device_telemetry_stats",
                        "databaseName": "aks_datalake_iota_glue_db"
                    },
                    "roleArn": "arn:aws:iam::841719529236:role/iota_cmsb_role"
                }
            }
        }
    ],
    "triggers": [
        {
            "schedule": {
                "expression": "cron(0/5 * * * ? *)"
            }
        }
    ]
}

After the dataset is created, wait five minutes for the dataset to run as scheduled or run it one time manually from the AWS CLI as follows:

aws iotanalytics create-dataset-content --dataset-name "aks_datalake_gold_iota_dataset" --region 'us-east-1'

{
    "versionId": "02f6f531-ee49-4a8e-a43d-bf808e00a26b"
}

Wait for the content to be created by running the following command. For your dataset content to be available on S3, the state should show as “SUCCEEDED.”

aws iotanalytics get-dataset-content --dataset-name "aks_datalake_gold_iota_dataset" --region 'us-east-1'

{
    "entries": [
        {
            "dataURI": "https://<bucket-name>.s3.amazonaws.com/results/4e5e1fd7-0cea-46c6-ae5b-0fff951a33f4.csv?X-Amz-Security-Token=AgoGb3JpZ2luEKL%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCXVzLWVhc3QtMSKAAhMtFbDcxGLCP9k3zF1jBMD3jUY5lKG5DUQPIc6CE%2F5qxkchbjfo0qVnY4qjmH04n9bCXl%2F0kY8uKBM3h%2F8p9ZWlKGmZZ01eQd2tXmmjGjUZQ3uoJfO8nMrv4lpe%2BSNOyhKhLT8sjTN7jf5hRGRDV9zWmNra1Pp1d8bMWfLiMAwe4FNy%2Bncma9xuutMuhGUuClxcxD12Ez1YanrS1qcvkLBMR3AwJAOuEDiyYOjV8GTWTDjdn%2Bz1EperZjgRd4mbCwIOTSsJu4fVkoEPuP8DJKidcSI2IE7mIit5Tpl6O0RaZ0aDy6sBVFsHHygdhpvr9LgcqgJoff4Eefez%2FgBOJRYq8gIIdxABGgwyMjE5OTE5MjIyNDciDGs74tSk4qge1k9cvCrPAqw%2Fvgbh%2FUpvbAAVOZB%2Fh%2FfW0uXyUm%2FSqAjs9kptzfI8lXxGci6etkBeFZqLuSXUSEIiy0OScGS%2Bu3sPudxQL9WgoRj0NzgReJN6Mw7IGGdBgmKouAhb1WeNq2QavtZ1jpnxGZwzKg0rC9qC7p%2Fyklx%2BGbG7xpkWynWHVnW%2FFcV2dlWcmy61aKjIo%2B5OHi1mYoP8dNRFIIofE%2BjnSwP1PX9nWwgPwOeuU6hUxduitUTDSMq30e1oAfDJp6wh43PPSGodeXCEHOKohx0bFmQ54Ua51YhiXkT7DHrRF78oxuFVuinYcIceMczpLb9lXK%2Fs1TM5g2dYFdz9Nq17vWP8XLqTVEe6eYBLcQq80ZDe4YYKgml3rFV2a4NjV1ZMXxq9C5YjZLpKTFmslewRXK7uOoLPV5UaymokNrFUF0mLff1e0r9JXTtnyJDynwCLBSt5MIGr9OcF&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20190609T141945Z&X-Amz-SignedHeaders=host&X-Amz-Expires=7200&X-Amz-Credential=ASIATHL575JD6AI7UM53%2F20190609%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=66635d16dbe1631d6fea60113a27c801e8ae1a57d6d0ec58766b790b5f54f960"
        }
    ],
    "timestamp": 1560089957.29,
    "status": {
        "state": "SUCCEEDED"
    }
}

To see 10 rows from the AWS IoT Analytics dataset that you created in the previous step:

  1. In the Athena console, for Database, choose aks_datalake_iota_glue_db.
  2. For New query 1, set the new query to run against the AWS Glue Data Catalog table named aks_device_telemetry_stats, and choose Run query.

Integrating IoT data with other data stored in your data lake on AWS

At the beginning of this post, you have set up an AWS Glue catalog table named aks_device_specs in your data lake. This table points to the file stored on S3 (your existing data lake) and is available for querying using Athena.

To see the data from the aks_device_specs table:

  1. In the Athena console, for Database, select aks_datalake_gold_glue_db (which is your existing data lake).
  2. For New query 1, set the new query to run against the AWS Glue Data Catalog table aks_device_specs, and choose Run query.

You can now integrate IoT data from the aks_device_telemetry_stats table in your AWS IoT Analytics with the aks_device_stats table in your existing data lake. Use the following commands to see how many times each device crossed the maximum temperature and humidity thresholds in the last two hours using an Athena query:

select
        a.deviceid ,
        sum(case 
                when cast(a.avg_temp as double) > b.max_temp_c 
                then 1 
                else 0 
        end) count_max_temp_crossed,
        sum(case 
                when cast(a.avg_humidity as double) > b.max_humidity 
                then 1 
                else 0 
        end) count_max_humidity_crossed
from
        "aks_datalake_iota_glue_db"."aks_device_telemetry_stats" a
inner join "aks_datalake_gold_glue_db"."aks_device_specs"                b on a.deviceid = b.deviceid
where date_parse(substr(dtts,1,19),'%Y-%m-%d %H:%i:%s') > current_timestamp - interval '2' hour
group by 
        1
order by 
        1;

The following table shows the results.

Now, instead of using the five-minute average from the aks_device_telemetry_stats data, do the same threshold check on the lowest grain data stored in the AWS IoT Analytics data store (aks_datalake_silver_iota_datastore). This check avoids losing the peaks and valleys of the actual readings from your devices.

You already have the lowest grain telemetry data in near-real time in the AWS IoT Analytics data store S3 bucket, named aks-datalake-silver-iota-datastore. Before you can query the same data using Athena, you must set up an AWS Glue crawler to run every 5 minutes, which is the lowest-scheduled frequency that you can set). Keep an AWS Glue catalog table named aks_datalake_silver_iota_datastore up-to-date with the telemetry data at near-real time (within 5 minutes of data ingestion):

aws glue create-crawler --cli-input-json file://mycrawler.json --region us-east-1

The file mycrawler.json contains the following code:

{
    "Name": "aks_datalake_silver_iota_datastore_glue_crawler",
    "Role": "arn:aws:iam::841719529236:role/iota_cmsb_role",
    "DatabaseName": "aks_datalake_iota_glue_db",
    "Description": "",
    "Targets": {
        "S3Targets": [
            {
                "Path": "s3://aks-datalake-silver-iota-datastore/myhome_raspberrypi/datastore/aks_datalake_silver_iota_datastore"
            }
        ]
    },
    "Schedule": "cron(0/5 * * * ? *)"
}

After the AWS Glue crawler is successfully created, wait five minutes for the crawler to execute as scheduled or run it once manually from AWS CLI using the following command:

aws glue start-crawler —name "aks_datalake_silver_iota_datastore_glue_crawler" —region us-east-1

After the crawler run completes, execute the following Athena query using the lowest grain data from the AWS IoT Analytics data store. See how many times each device crossed the maximum temperature and humidity thresholds in the last two hours:

select
        a.deviceid ,
        sum(case 
                when cast(a.temp as double) > b.max_temp_c 
                then 1 
                else 0 
        end) count_max_temp_crossed,
        sum(case 
                when cast(a.humidity as double) > b.max_humidity 
                then 1 
                else 0 
        end) count_max_humidity_crossed
from
        "aks_datalake_iota_glue_db"."aks_datalake_silver_iota_datastore" a
inner join "aks_datalake_gold_glue_db"."aks_device_specs"                b on a.deviceid = b.deviceid
where from_unixtime(current_ts) > current_timestamp - interval '2' hour
group by 
        1
order by 
        1;

The following table shows the results.

Notice from the lowest grain data that the number of times the devices crossed the maximum temperature and humidity thresholds is much higher compared to the five-minute average. There are many other integration use cases you can do. For example, you could integrate with third-party weather data stored in your data lake to see if there is any correlation with weather changes.

Also, you can use Amazon QuickSight to visualize the last two hours of the lowest grain data from the aks_datalake_silver_iota_datastore overlain with the maximum temperature and humidity thresholds from aks_device_specs (the device-specification data in your data lake). Use the following sample Athena query (refer to the Amazon QuickSight documentation to create an Amazon QuickSight dataset with an Athena query):

select
        from_unixtime(current_ts) as timestamp        ,
        a.deviceid    ,
        a.temp    ,
        a.humidity,
        b.min_temp_c  ,
        b.max_temp_c  ,
        b.min_humidity,
        b.max_humidity
from
        "aks_datalake_iota_glue_db"."aks_datalake_silver_iota_datastore" a
inner join "aks_datalake_gold_glue_db"."aks_device_specs"                b on a.deviceid = b.deviceid
where from_unixtime(current_ts) > current_timestamp - interval '2' hour

Summary

In this post, you learnt how to use two new AWS IoT Analytics features to integrate IoT data with the rest of your data stored in your data lake on AWS. Try using the customer-managed Amazon S3 and dataset content delivery to Amazon S3 features on your own projects.

Hopefully, you have found this post informative and the proposed solution helpful. As always, AWS welcomes feedback. Please submit comments or questions below.