AWS Big Data Blog
Integrating IoT Events into Your Analytic Platform
Veronika Megler, Ph.D., is a Senior Consultant with AWS Professional Services
“We have a fleet of vehicles, with GPS and a bunch of other sensors,” said Bob, the VP at a delivery company. “Today they send their update ‘breadcrumbs’ to another IoT service. We’re planning to have them send their breadcrumbs to AWS IoT instead; that’ll let us remove the third-party service and integrate the breadcrumbs into our AWS infrastructure. Then, we’d like to store the breadcrumbs, do both near-realtime and batch analytics on them, and see the current locations on a map. How can we do that?”
This post shows you how.
Overall architecture
AWS IoT is a managed cloud platform that lets connected devices communicate easily and securely. It also makes it easy to interact with cloud applications and other devices. With AWS IoT, you can filter, transform, and act upon device data on the fly, based on business rules that you define. You can update your rules to implement new device and application features at any time.
AWS IoT makes it easy to integrate and control your devices from other AWS services for even more powerful IoT applications. In particular, IoT provides tight integration with AWS Lambda, Amazon Kinesis, Amazon S3, Amazon Machine Learning, Amazon DynamoDB, Amazon CloudWatch, and Amazon OpenSearch Service.
In this post, you’ll explore two of these integrations: Amazon S3 and Amazon Kinesis Firehose. The architecture you’ll design is shown in the figure below. You’ll set up your breadcrumbs to flow into S3 via two paths: directly, and also via Firehose. From there, you’ll access them in Amazon EMR using Apache Hive, and show your vehicles on a map using Hue. You’ll also use CloudWatch Logs for debugging purposes only.
AWS IoT: some basics
In IoT, each defined sensor or device – in this case, each vehicle – is called a “thing”. Each thing that communicates with the IoT service is defined in IoT’s Device Registry.
You can create a definition in the device registry by using the “aws iot –create-thing” AWS CLI command, by using the AWS Management Console, or by using one of the SDKs. (Note: if you’re using the AWS CLI, make sure you have version 1.10.24 or higher).
You give your thing a name, and you can define up to three attributes. These attributes are useful in grouping or locating specific devices; common choices for these attributes are the model number, serial number, or owner. The attributes are part of the metadata about the device itself, separate from its operational characteristics (including the sensors and their values). After you have defined a thing, you can connect your device securely to it using one of the supported protocols: HTTP, HTTP/2, or MQTT.
Topics
Sensor readings and updates flow from the thing to the AWS IoT gateway. The thing may be only intermittently connected. However, IoT allows for asynchronous communication between IoT and the device, via a pub/sub approach. It does this by creating and managing an always-available “device shadow”. As part of the shadow, IoT creates a set of topics with predefined names for each thing and publishes events to the topics. The topics most relevant to us are:
$aws/things/<thing-name>/shadow/update $aws/things/<thing-name>/shadow/update/documents
When connected, the device reports its status by publishing an event to the “update” topic. For each change, IoT also publishes an event to the “documents” topic, containing both the previous and current state.
You can communicate with the thing shadow (and therefore with the thing itself) by subscribing and publishing to topics. You can send “desired states” to its device shadow (that is, by sending a message to the appropriate topic), whether or not the thing is currently connected. The next time the thing connects, it reads its shadow and, hopefully, acts to synchronize its state with your desires.
The events themselves are ephemeral and are not stored in AWS IoT (other than the latest state). However, IoT has a built-in rules engine, which provides message processing and integration with other AWS services. In the rules engine, you use an SQL-like language to select data from message payloads, process the data, and send the data to other services.
Device shadows
The device shadow is a JSON document used by the IoT service to store and retrieve the current state information that it has been sent – so in this case it contains the breadcrumb information. You can see the current state of the thing by selecting the thing in the AWS IoT console; the matching CLI command is “aws iot get-thing-shadow –thing-name <name>”. Here is the shadow content for one vehicle:
{ "reported": { "thingName": "iotfleet_thc_121", "location": { "lat": 45.53404355284516, "lon": -122.788986789 }, "address": "10895, Northwest Lost Park Drive, Cedar Mill, Washington County, Oregon, 97229, United States of America", "speed": "45", "heading": "S", "ignition": "1" } }
The shadow state also includes a version number. Each time the shadow is updated, the version number is automatically increased. In your downstream app, you can use the version number to keep track of the order of changes or which changes you’ve processed.
Case study
Because of IoT’s pub/sub design, you can imitate a set of devices by creating the messages that those devices would send, and publishing them onto the relevant topics.
For this post, you’ll use an event generator that creates a set of vehicles in the IoT device registry. It then generates a set of update breadcrumbs, modeled on the real breadcrumbs of a set of vehicles.
First, you’ll define a rule that selects specific messages from IoT. Next, you’ll set up actions that explore different ways to write the IoT events to S3, so you can compare the results in two different ways:
- Directly from IoT to S3, using two different filename approaches
- From IoT to Firehose to S3.
Finally, you’ll start an EMR cluster with Hive and Hue to analyze and display the data.
Before diving in to the scenario, some words about using IoT topics for testing, and debugging.
Using IoT topics
A simple approach to understanding the shadows, the message flows and their content as you start developing an IoT application is to use the tools on the IoT console.
- Open the IoT console and choose MQTT Client.
- Under MQTT Client Actions, choose Device Gateway Connection, enter a client ID (e.g., “12345”), and choose Connect.
- Choose Subscribe to Topic, enter the wildcard # for the subscription topic (includes all topics), and choose Subscribe.
You’ll get a new browser tab telling you that you don’t have any messages on this subscription.
- In the IoT console, follow the instructions in Create a Device in the Thing Registry to create a new thing, called “testthing”.
- Select testthing, choose View Thing, and review its details.
- Under Update Shadow, enter the following:
{“reported”:{“location”:{“lat”:45,”lon”:122}}}
- Choose Update Shadow, Detail to see a shadow state, status, and version. (You may need to choose refresh, next to Last Update.)
Back under your MQTT Client, the # tab shows the name of each topic that received a message, along with the message contents. In this case, the “$aws/things/testthing/update/accepted” topic shows the payload sent, and the “$aws/things/testthing/update/documents” topic shows a JSON document with the previous state (here, null for no state), the current state and some metadata.
Debugging your integrations
One of the most useful resources for debugging your IoT integration with other services is CloudWatch Logs. Follow the instructions at Setting Up CloudWatch Logs to create an IAM role for logging and a trust policy for the IoT service endpoint (iot.amazonaws.com), and to register the role with IoT. For now, use logLevel=“DEBUG”.
The CloudWatch log entries appear under the log group name of “AWSIoTLogs”. Each thing has a separate log, under its name.
There’s also a log that begins with your account number, followed by a long alphanumeric string; here you’ll find most of the entries for debugging integration issues. You’ll see a log entry for a rule being matched (an EVENT:MatchingRuleFound entry). The “traceid” value allows you to match the various entries for each event. For example:
016-05-20 00:28:53.689 TRACEID:7a4a43c7-1a8c-4533-93ae-d11cfb4a88ac PRINCIPALID:123456789101/AIDAAIDAAIDAAIDAAID/AIDAAIDAZAIDAAIDAAIDA [INFO] EVENT:MatchingRuleFound TOPICNAME:$aws/things/iotfleet_thc_121/shadow/update/documents CLIENTID:n/a MESSAGE:Matching rule found: iotfleetrule
The actions specified for that rule appear as a separate message later in the log.
Rules, rules, rules
Now start to refine which events you’d like to capture and store. IoT uses a simplified SQL syntax that provides you with a small set of functions, and lets you access the topic name and the event content.
In the IoT console, under Resources, create a new rule. Give it a name (iotfleetrule) and a description (“Get all updates for the vehicle fleet”). When defining a rule, you specify:
- The fields you want (Attribute, aka SELECT)
- The topics you’re interested in (Topic Filter, aka FROM)
- (Optionally) The event characteristics to which the rule should apply (Condition, aka WHERE)
For this app, you want to:
- Keep all the information from the original event
- Add the thing name and the topic name (because the thing name, as defined in the device registry, will not be in the event itself)
- Limit the entries to only the ‘documents’ topic; this gives you both the “before” and “after” content for the update
- Limit the things to only those whose name start with ‘iotfleet_’
Here’s the rule that achieves that:
Attribute: topic(3) as thingnm, topic() as thetopic, * Topic Filter: $aws/things/+/shadow/update/documents Condition: startswith(topic(3), 'iotfleet_')
This rule outputs the entire event contents (*). You could have selected only specific fields from the event, using the IoT-provided JSON extensions. You’ve added two other fields into the output: the whole topic name (“topic()”), and the thing name, extracted from the topic (“topic(3)”). The topic filter specifies that you want all things (the ‘+’ wildcard in the “<thing-name>” section of the topic name), but you only want the “update/documents” topics. You then limit which specific things you’re interested in to those that start with the given naming convention.
Now, add the actions (here, three of them) that you wish to take for the events that match this rule.
Writing IoT events directly to S3
Choose Store the message in a file and store in the cloud (S3). Choosing this action is the shortest path for getting events written from IoT to S3. Using this action, IoT writes each event to a single file in S3.
One of the advantages of this method is that you can specify the file naming convention to use for the individual files. One option is to use a rule such as that suggested in the S3 Action subsection with a prefix:
/iot/iotfleet/s3t/${topic()}/${timestamp()}
Each event for the topic is written into a separate file, with the filename made up of the prefix, topic name, and timestamp. For example:
<your-bucket>/iot/iotfleet/s3t/$aws/things/testthing/shadow/update/documents/1464121638971
However, a number of the partitions in this name are redundant (‘/$aws/things/’, ‘/shadow/update/documents/’), and others are inconvenient (‘$aws’ is interpreted by Hive as a request for the value of a variable called aws to be provided at runtime).
On the console, make the following selections:
- Choose the S3 bucket to which to write the events. For simplicity, choose a bucket in US-West-2.
- Create the pattern to use for the S3 key name. Use the following:
iot/iotfleet/s3/${topic(3)}/${timestamp()}
- Choose a role that allows IoT to write to S3; or, allow IoT to create an appropriate role for you.
This pattern generates file names like the following:
<your-bucket>/iot/iotfleet/s3/testthing/1464128482223
Now create a second S3 action for the same rule, using the following pattern:
iot/iotfleet/s3p/thingname=${topic(3)}/${timestamp()}
This pattern allows you to create a Hive table partitioned on thingname.
For event rates significantly higher than those generated by your sample application, consider the likely data usage. S3 performance considerations dictate a naming convention with differentiation earlier in the key if you are generating many S3 requests. Performing GETs of many small files may not be optimal for your target big data application, and may affect its performance. In addition, each S3 PUT, GET or LIST event incurs a small charge, so this approach may not be the most cost-effective choice.
Writing IoT events to S3 via Firehose
Now add another (your third) action: Send the message to a real-time data stream that stores to a bucket (Kinesis Firehose). Here, make three selections: the Firehose delivery stream name, a separator, and the role:
- If you choose Create a New Resource, you’ll be taken to the Firehose Create Delivery Stream page. Here, you can define your delivery stream: give it a name (e.g., “iotfleet”); select an S3 bucket, and provide a prefix (e.g., “iot/iotfleet/firehose/”; and make sure you include the “/” at the end of the prefix name). Keep the other defaults, but choose an IAM role: either an existing role, or allow IAM to create a new role and associated policy with the right privileges. This role allows Firehose to write to S3.
- Choose n (newline) for your separator. This adds a newline at the end of each event, so that your Firehose file contains a line per event. Without the separator, all events are written to a single line in the file, making them uninterpretable by many downstream tools.
- Choose a role name that lets IoT send events to Firehose, or create a new role.
Events written to Firehose are batched based on the Firehose parameters, and each batch is written out as a single file. You can add a key prefix to the beginning of the output file names but the remainder of the file naming convention is set; they are created with the predefined Firehose naming convention:
YYYY/MM/DD/HH/< DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString
For example:
<your-bucket>/iot/iotfleet/firehose/2016/05/24/21/iotfleet-3-2016-05-24-21-05-41-6afed8b7-863b-4890-b0f7-e62824afd035
Each event is represented as a single line in the output file. Each file that Firehose outputs contains the events that matched the rule within a similar time period; thus, a file may contain events generated by a variety of things.
It is now particularly important to have terms in the event that allow the event to be matched to the source thing. If each thing is easily recognized from the events it sends, that’s easy. If not, or if the identifier in the event is not easily relatable to the thing name in the device registry (which is how you communicate back to the device), adding the topic or thing name to the event in your rule (as you did above) makes the output much more usable.
You can use Firehose parameters to choose the batch size (S3 buffer size, in MB) and frequency (Firehose buffer interval, in seconds). You can trade off latency (as a new file is not written until either the time or size limit is reached) with the efficiency of larger files.
Generate some events
Now create a fleet of vehicles in IoT and move them around, generating a set of breadcrumbs with their current location and their new speed and heading as they move.
Run the fleet-and-event generator (https://aws-bigdata-blog.s3.amazonaws.com/artifacts/Integrating_IoT_Events/iotfleet.py) on a system with AWS credentials configured. The generator is written in Python 2.7, and uses Boto3 to send events to IoT using the configured credentials. It requires updated versions of boto3 (to ensure the latest IoT command support) and geopy, e.g.,:
pip install boto3 --upgrade pip install geopy
Run the generator without parameters for descriptions of the parameters. For your sample, execute:
python iotfleet.py --new --vehicles 15 --move 20 --deletefleet
To check for correct operation, look at the log files created by the generator (by default, in the local directory) for success or errors, the CloudWatch logs to see whether IoT rules are being matched, and the S3 buckets that you’ve configured for the output files. For Firehose, check the Monitoring tab for your delivery stream; with some delay (give it a few minutes) in the displays, you should see activity in the IncomingRecords and then DeliveryToS3 charts.
Add the files to the Hive metastore
If you’ve been following along, you now have several different sets of files being created in S3, using different file naming conventions:
<your-bucket>/iot/iotfleet/firehose/2016/ …
<your-bucket>/iot/iotfleet/s3/<thingname>/…
<your-bucket>/iot/iotfleet/s3p/thingname=<thingname>/…
Now, add the files to the Hive metadata store in EMR, and set it up to automatically update the metadata store on a regular basis with new files.
To do so, start an EMR V5.0 cluster with Hadoop, Hive, and Hue. Ensure the cluster is in the same region as your S3 bucket. Under Advanced Options, choose add steps. Add the following two steps:
- Step type: Hive Program:
- Script s3 location: s3://aws-bigdata-blog/artifacts/Integrating_IoT_Events/iotfleet-hive.sql
- Input & Output S3 locations: (leave blank)
- Arguments: -hiveconf bucket=<your-bucket>
- Action on Failure: Terminate
- Step type: Custom Jar:
- JAR location: s3://elasticmapreduce/libs/script-runner/script-runner.jar
- Arguments: s3://aws-bigdata-blog/artifacts/Integrating_IoT_Events/iotfleet-hive-update.sh
- Action on Failure: Continue
If you are not running your cluster in US-East-1, you will need to copy iotfleet-hive.sql and iotfleet-hive-update.sh into your own S3 bucket and adjust the paths above accordingly.
What do these two steps do?
Step 1: Defining the tables in Hive
The first step (iotfleet-hive.sql) defines three tables, one for each of the file naming conventions. All the files contain JSON, so it adds a JSON SerDe to Hive. Because in each scenario the events are stored in many files at the bottom of a hierarchical naming structure, all files in all subdirectories of the key prefix must be recognized by Hive. To do this, the script sets the following Hive options:
SET mapred.input.dir.recursive=true;
add jar s3://us-west-2.elasticmapreduce/samples/hive-ads/libs/jsonserde.jar;
Then, the script creates the table definitions:
create external table fleet_fh ( breadcrumbs string)
location 's3://<your-bucket>/iot/iotfleet/firehose/';
create external table fleet_s3 ( breadcrumbs string)
location 's3://<your-bucket>/iot/iotfleet/s3/';
create external table fleet_s3p ( breadcrumbs string)
PARTITIONED BY (thingname string)
location 's3://<your-bucket>/iot/iotfleet/s3p/';
Each JSON SerDe is a little different in its capabilities and restrictions, and some experimenting is often needed to get good results. In each table, each row is defined as a single string, for later parsing into JSON. With this approach, each event can be read and parsed in different ways using string functions if desired. Note that in the third definition, for table fleet_s3p, the “thingname=” added into the S3 filenames allows Hive to interpret it as a Hive partitioned table.
Now, you’re ready to set up views over the tables. Here is the CREATE statement for the first table. The view parses the JSON using json_tuple, then uses ‘LATERAL VIEW’ to further decompose each nested JSON entry:
CREATE VIEW fleetinfo_fh AS
SELECT thingname, topic, latitude, longitude, location, ignition, speed, from_unixtime(int(bctime)) as updtime, INPUT__FILE__NAME as filenm
FROM fleet_fh ac
LATERAL VIEW json_tuple(ac.breadcrumbs, 'thingnm', 'thetopic', 'state','metadata') s
AS thingname, topic, state, metadata
LATERAL VIEW json_tuple(s.state,'reported','desired') r as reported, desired LATERAL VIEW json_tuple(r.reported,'latitude','longitude', 'location', 'ignition','speed') l as latitude, longitude, location, ignition, speed LATERAL VIEW json_tuple(s.metadata,'reported') mr as reported LATERAL VIEW json_tuple(mr.reported, 'latitude') ml as lat LATERAL VIEW json_tuple(ml.lat, 'timestamp') mt as bctime ;
The other two views (creatively named fleetinfo_s3 and fleetinfo_s3p) are similar, with fleetinfo_s3p defined as a Hive partitioned table.
Step 2: Updating the table definitions
After the tables are defined, for partitioned tables, Hive must be told to read S3 and populate its metastore with the actual physical files that begin with those key prefixes. To do that, the shell script in the second step (hiveupdt.sh) executes a short Hive script that sets the same parameters as above and runs the following command:
msck repair table fleet_s3p;
This script runs the Hive script on a regular basis (currently every 60 seconds), to catalog the new files that have been added to S3.
Review your data in Hue
Now, you can analyze and visualize the data. For example, to show your fleet’s locations for the last ten minutes on a map (assuming you’ve generated events that recently), start up the Hive Editor in Hue. Issue the following query:
select thingname, cast(latitude as float) as lat, cast(longitude as float) as lon, iottime from fleetinfo_fh f where cast(iottime as int)> (unix_timestamp() - 600);
When the query returns results, choose the Chart tab. Choose a chart type of Marker Map. In the list fields, choose lat, lon, and thingname, and you should see the last 10 minutes of data displayed. Note that fleetinfo_fh’s results will lag by the time taken for Firehose to create its next batch (configured in Firehose with “S3 buffer interval (sec)”). You’ll need to reissue the query every few minutes to see your latest data.
To issue the same query against fleetinfo_s3 and fleetinfo_s3p, precede it by the same Hive set recursive parameter as above. It should give the same results. However, the performance characteristics of Hive across these 3 versions will differ; for example, with many individual files in s3, you may see timeouts in Hue. The partitioning approach reduces that risk in the case where your query can be limited to a subset of partitions (which the above query does not do). For production, partitioning will be needed for good query performance; for example, partitioning by (year, month, day, hour) for a large fleet or if there are frequent updates.
Conclusion
The story does not end here, and probably shouldn’t. When I showed him the dashboard, Bob said (predictably), “Great! Now I’d like to know which truck drivers are speeding, and how often – it could affect our insurance rates. And for the refrigerated trucks with temperature sensors, notify us when the temperature rises so we can take immediate action – it’ll reduce the spoilage rate. And… “ But these are topics for a future blog post.
Now that the event contents are easily accessible, it’s easy to write SQL to perform any desired transformation and store the results in your preferred data format, such as Parquet. Depending on your data volumes and analytic needs, you may wish to consolidate the smaller S3 files on a regular basis, such as nightly. Other suggestions can be found in the Using CombineInputFormat to Combat Hadoop’s Small Files Problem blog post. You can then have a lifecycle policy to delete the S3 files or archive them, if desired.
You’ve now seen how to interact with IoT, create events, and identify the subset of those events you’re interested in storing for later processing. You’ve seen two methods for storing your IoT events in S3, with a choice of data layouts. You’ve set up the Hive metastore to recognize the new event files as they are created, and set up views over the files so you can easily perform analysis on the contents. You’ve created a map that shows the locations of your vehicle fleet.
Now you can now analyze the data flowing in from your IoT events, using all the usual capabilities of your big data platforms. Your analysis programs can even send messages back to your things for them to act on. Enjoy!
If you have questions or suggestions, please comment below.
—————————–
Related
Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR