AWS Machine Learning Blog

Building a lawn monitor and weed detection solution with AWS machine learning and IoT services

April 2023 Update: Starting January 31, 2024, you will no longer be able to access AWS DeepLens through the AWS management console, manage DeepLens devices, or access any projects you have created. To learn more, refer to these frequently asked questions about AWS DeepLens end of life.
August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

For new home buyers, a common challenge is to understand how to manage their lawn needs effectively. Now imagine if you’re a farmer and have to do this for many acres of land. As a farmer, some of the challenges you’d typically face include the when (when is the right time to water), the where (where exactly to water or add fertilizer), and the how (how to handle weeds). In a research study conducted by the Weed Science Society of America (WSSA), combined losses in corn and soybean crops due to uncontrolled weeds would total $43 billion annually. For more information, see WSSA Calculates Billions in Potential Economic Losses from Uncontrolled Weeds on the WSSA website.

What if you could use technology and the latest advancements in the field of computer vision and machine learning (ML) to solve this problem?

This is a complex problem to solve, and many enterprises are working on a solution. This post covers how to get started and build a solution using an AWS Starter Kit. The solution has two components:

  • Weed detection using image classification and AWS DeepLens
  • Near real-time monitoring of lawn conditions (soil moisture level, fertility level, and sunlight) using AWS IoT

Prerequisites

To implement this solution, you must have the following prerequisites:

Detecting weeds using image classification

As herbicide resistance is becoming more and more common, weed control is gaining greater importance in agricultural domains. It’s important to detect weeds and take preventive actions early to enhance farm productivity. This is now possible with AWS DeepLens. AWS DeepLens is the world’s first deep learning enabled video camera for developers. With AWS DeepLens, you can get started in minutes with a fully programmable video camera, tutorials, code, and pre-trained models designed to expand deep learning skills. It also lets you learn and explore the latest artificial intelligence (AI) tools and technology for developing computer vision applications based on a deep learning model.

With AWS DeepLens, you can classify images as weeds or grass in real time. You can also connect AWS DeepLens to a microcontroller to trigger a spray to kill the weeds.

This post demonstrates how to detect weeds using the image classification algorithm in Amazon SageMaker. Amazon SageMaker is a fully managed ML service. With Amazon SageMaker, you can quickly and easily build and train ML models, and directly deploy them into a production-ready hosted environment. It provides an integrated Jupyter notebook for easy access to your data sources for exploration and analysis, so you don’t have to manage servers. This post demonstrates how to train a model in the fully managed and on-demand training infrastructure of Amazon SageMaker. In addition, it also demonstrates how to deploy the trained model on AWS DeepLens.

The following architecture diagram gives a high-level overview of the weed detection solution:

This solution uses the image classification algorithm in Amazon SageMaker in transfer learning mode to fine-tune a pre-trained model (trained on ImageNet data) to classify a new dataset. Transfer learning allows you to train deep networks with significantly less data than if you had to train a model from scratch. With transfer learning, you are essentially transferring the knowledge that a model has learned from a previous task to your current task. The idea is that the two tasks are not disjointed, and you can use whatever network parameters that model has learned through its extensive training without having to do that training yourself.

The architecture includes the following steps:

  1. Download the dataset of your images consisting of weeds and grass to your local computer. Organize the images into two distinct folders on your local computer.
  2. Create two .lst files using the RecordIO tool im2rec. One file is for the training portion of the dataset (80%). The other is for testing (20%). For more information, see Create a Dataset Using RecordIO on the MXNet website.
  3. Generate both .rec files from the .lst files and copy both .rec files (training and validation images) to an Amazon S3
  4. Train your model using the Amazon SageMaker image classification algorithm in transfer learning mode.
  5. After you train the model, the training job uploads model artifacts to your S3 buckets. Alternatively, deploy your trained model on AWS DeepLens for real-time inferencing at the edge.

The following sections describe the detailed implementation of the solution.

Understanding the dataset

The Open Sprayer images dataset on the Kaggle website includes pictures of broad-leaved docks (weeds) and pictures of the land without broad-leaved docks (grass). The dataset comes with 1,306 images of weeds and 5,391 images of grass, with a typical size of about 256 pixels by 256 pixels. The dataset provides a recommended train/validate split. The following image shows a collection of weed and grass images.

Preparing the image dataset

This post demonstrates how to use the RecordIO file format for image classification using pipe mode. In pipe mode, your training job streams data directly from Amazon S3. With pipe mode, you reduce the size of the Amazon EBS volumes for your training instances. For more information, see Using pipe input mode for Amazon SageMaker algorithms. You can use the image classification algorithm with either file or pipe input modes. Even though pipe mode is recommended for large datasets, file mode is still useful for small files that fit in memory and where the algorithm has a large number of epochs.

MXNet provides a tool called im2rec to create RecordIO files for your datasets. To use the tool, you provide listing files that describe the set of images. For more information about im2rec, see the im2rec GitHub repo.

To prepare your image dataset, complete the following steps using a local Python interpreter or through a Jupyter notebook on Amazon SageMaker. You execute these steps from the path where you’ve stored your image dataset.

  1. Generate listing files using im2rec.py. See the following code:
    python3 im2rec.py --list --recursive --train-ratio .75 --test-ratio .25 </path/to/folder>
  2. Use the im2rec utility to create the RecordIO files by entering the following code:
    python3 im2rec.py --num-thread 2 --resize 50 --quality 80 </path/to/folder>

    im2rec takes the following parameters:

    • list – im2rec creates an image list by traversing root folder and output to <prefix>.lst.
    • recursive – Recursively walks through subdirectories and assigns a unique label to images in each folder.
    • train-ratio – Ratio of images to use for training.
    • test-ratio – Ratio of images to use for testing.
    • num-thread – Number of threads to use for encoding. The greater this value, the faster the processing.
    • resize – Resize the shorter edge of an image to the new size; original images are packed by default.
    • quality – JPEG quality for encoding, 1–100; or PNG compression for encoding, 1–9 (default: 95).

For more information, see MXNet made simple: Image RecordIO with im2rec and Data Loading.

Implementing the code

To demonstrate each step of the solution, this post uses the Jupyter notebook from the GitHub repo.

Setting up your S3 bucket

Create an S3 bucket to upload the training and validation files in the RecordIO format as well as model artifacts. Use the default_bucket() session parameter from the SageMaker Python SDK. For instructions on creating an S3 bucket manually, see Creating a bucket.

bucket = sagemaker.Session().default_bucket()

The RecordIO files serve as an input to the image classification algorithm. The training data should be inside a subdirectory called train and validation data should be inside a subdirectory called validation:

def upload_to_s3(channel, file):
    s3 = boto3.resource('s3')
    data = open (file, "rb")
    key = channel + '/' + file
    s3.Bucket(bucket).put_object(Key=key, Body=data)

s3_train_key = "image-classification-transfer-learning/train"
s3_validation_key = "image-classification-transfer-learning/validation"
s3_train = 's3://{}/{}/'.format(bucket, s3_train_key)
s3_validation = 's3://{}/{}/'.format(bucket, s3_validation_key)

upload_to_s3(s3_train_key, 'Data_train.rec')
upload_to_s3(s3_validation_key, 'Data_test.rec')

For more information, see the GitHub repo.

Training the image classification model using the Amazon SageMaker built-in algorithm

With the images available in Amazon S3, you are ready to train the model. This post has a few hyperparameters of interest:

  • Number of classes and training samples
  • Batch size, epochs, image size, and pre-trained model

For more information, see Image Classification Hyperparameters.

The Amazon SageMaker image classification algorithm requires you to train models on a GPU instance type, such as ml.p2.xlarge. Set the hyperparameters to the following values:

  • num_layers = 18. Number of layers (depth) for the network. This post uses 18, but you can use other values, such as 50 and 152.
  • image_shape = 3,224,224. Input image dimensions, num_channels, height, and width for the network. It should be no larger than the actual image size. The number of channels should be the same as the actual image. This post uses image dimensions of 3, 224, 224, which is similar to the ImageNet dataset.
  • num_training_samples = 4520. Number of training examples in the input dataset.
  • num_classes = 2. Number of output classes for the new dataset. This post uses 2 because there are two object categories: weeds and grass.
  • mini_batch_size = 128. Number of training samples used for each mini-batch.
  • epochs = 10. Number of training epochs.
  • learning_rate =001. Learning rate for training.
  • top_k = 2. Reports the top-k accuracy during training. This parameter has to be greater than 1 because the top-1 training accuracy is the same as the regular training accuracy that has already been reported.
  • use_pretrained_model = 1. Set to 1 to use a pre-trained model for transfer learning.

After you upload the dataset to Amazon S3 and set the hyperparameters, you can start the training with the Amazon SageMaker CreateTrainingJob API. Because you’re using the RecordIO format for training, you specify both train and validation channels as values for the InputDataConfig parameter of the CreateTrainingJob request. You specify one RecordIO (.rec) file in the train channel and one RecordIO file in the validation channel. Set the content type for both channels to application/x-recordio. See the following code:

"InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_train,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "application/x-recordio",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_validation,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "application/x-recordio",
            "CompressionType": "None"
        }
    ]

When the training job is complete, you should see the following message:

Training job ended with status: Completed

The output model is stored in the output path specified by training_params['OutputDataConfig']:

"OutputDataConfig": {
        "S3OutputPath": 's3://{}/{}/output'.format(bucket, job_name_prefix)

For the full code for model training, see the GitHub repo.

Deploying the model for real-time inference

You now want to use the model to perform inference. For this post, that means predicting the images as weeds or grass.

This section involves the following steps:

  1. Create a model for the training output.
  2. Host the model for real-time inference. Create an inference endpoint and perform real-time inference. This consists of the following steps:
    1. Create a configuration that defines an endpoint.
    2. Use the configuration to create an inference endpoint.
    3. Perform inference on some input data using the endpoint.

Creating a model

Create a SageMaker Model from the training output using the following Python code:

model_name="deeplens-image-classification-model"
info = sage.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)

hosting_image = get_image_uri(boto3.Session().region_name, 'image-classification')

primary_container = {
    'Image': hosting_image,
    'ModelDataUrl': model_data,
}

create_model_response = sage.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

Real-time inference

You can now host the model with an endpoint and perform real-time inference.

Creating an endpoint configuration

Create an endpoint configuration that Amazon SageMaker hosting services use to deploy models. See the following code:

endpoint_config_name = job_name_prefix + '-epc-' + timestamp
endpoint_config_response = sage.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

Creating an endpoint

Create the endpoint that serves up the model by specifying the name and configuration you defined previously. The result is an endpoint that you can validate and incorporate into production applications. This takes approximately 9–11 minutes to complete on an m4.xlarge instance. See the following code:

endpoint_name = job_name_prefix + '-ep-' + timestamp
print('Endpoint name: {}'.format(endpoint_name))

endpoint_params = {
    'EndpointName': endpoint_name,
    'EndpointConfigName': endpoint_config_name,
}
endpoint_response = sagemaker.create_endpoint(**endpoint_params)

Create the endpoint with the following code:

response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print('EndpointStatus = {}'.format(status))
# wait until the status has changed
sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
# print the status of the endpoint
endpoint_response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response['EndpointStatus']
print('Endpoint creation ended with EndpointStatus = {}'.format(status))

if status != 'InService':
    raise Exception('Endpoint creation failed.')

You can confirm the endpoint configuration and status on the Endpoints tab in the Amazon SageMaker console.

You can now create a runtime object from which you can invoke the endpoint.

Performing inference

To validate the model for use, obtain the endpoint from the client library using the result from previous operations and generate classifications from the trained model using that endpoint. The code uses a sample image of a weed for our tests below. See the following code:

runtime = boto3.Session().client(service_name='runtime.sagemaker')
file_name = 'rsz_photo-1.jpg'
# test image
from IPython.display import Image
Image(file_name)

import json
import numpy as np
with open(file_name, 'rb') as f:
    payload = f.read()
    payload = bytearray(payload)
response = runtime.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='application/x-image', 
                                   Body=payload)
result = response['Body'].read()
# result will be in json format and convert it to ndarray
result = json.loads(result)
# the result will output the probabilities for all classes
# find the class with maximum probability and print the class index
index = np.argmax(result)
object_categories = ['weed','grass']
print("Result: label - " + object_categories[index] + ", probability - " + str(result[index]))

Result: label - weed, probability - 0.9998767375946045

Running your model at the edge using AWS DeepLens

AWS DeepLens lets you experiment with deep learning at the edge, which gives you an easy way to deploy trained models and use Python code to come up with interesting applications. For your weed identifier, you could mount an AWS DeepLens device on a wire overlooking your lawn. The device feeds cropped images of detected weeds and crops to Amazon S3. It could even trigger a text to your mobile phone when it detects weeds.

An AWS DeepLens project consists of a trained model and an AWS Lambda function. The function uses AWS IoT Greengrass on AWS DeepLens to perform the following tasks:

  • Capture the image from a video stream
  • Perform an inference using that image against the deployed ML model
  • Provide the results to AWS IoT and the output video stream

AWS IoT Greengrass lets you execute Lambda functions locally, which reduces the complexity of developing embedded software. For more information, see Create and Publish an AWS DeepLens Inference Lambda Function.

When you use a custom image classification model produced by Amazon SageMaker, there is an additional step in your AWS DeepLens inference Lambda function. The inference function needs to call MXNet’s model optimizer before performing any inference using your model. To optimize and load the model, see the following code:

model_path = '/opt/awscam/artifacts/image-classification.xml'
error, model_path = mo.optimize(model_name,224,224,aux_inputs={'--epoch':10}) model = awscam.Model(model_path, {'GPU': 1})

Performing model inference on AWS DeepLens

Model inference from your Lambda function is very similar to the previous steps for invoking a model using an Amazon SageMaker hosted endpoint. The following Python code finds weeds and grass in a frame that the AWS DeepLens video camera provides:

frame_resize = cv2.resize(frame, (512, 512))

# Run the images through the inference engine and parse the results using
# the parser API.  Note it is possible to get the output of doInference
# and do the parsing manually, but since it is a ssd model,
# a simple API is provided.
parsed_inference_results = model.parseResult(
                                 model_type,
                                 model.doInference(frame_resize))

For information about a complete inference Lambda function to use on AWS DeepLens with this image classification model, see the GitHub repo. Lambda function uses AWS IoT Greengrass SDK to publish text-based output on IoT topic in JSON format. Here are instructions on how to view the output in IoT Console.

Below picture shows our sample setup :

Near real-time monitoring of the farm conditions

The right amount of water and nutrients are key to improving farm productivity. One of the ways you can achieve that is through real-time monitoring of the field, which helps you save costs on water and fertilizer consumption. The information collected from real-time monitoring can help you identify when and where to water or fertilize the soil.

To monitor the soil conditions, this post used four soil sensors and monitored a patch of a 2-feet-by-1-foot plant bed. These sensors can detect the current moisture level and fertility of the soil, light received, and temperature of the surroundings. It collects this information via Raspberry Pi with Bluetooth enabled. For actual production, you should use a much longer-range protocol like LoRa. Raspberry Pi polls the sensors, collects this information, and sends it to AWS IoT Core. You use an IoT rule to collect data and forward it to Amazon Kinesis Data Analytics for real-time analysis. Now, when the moisture level for a specific part of the farm falls below the threshold value, this would trigger an alert notification. You can also use this information to automate the control of sprinklers system.

The system also has built-in logic to poll the local weather conditions. You can use this information to decide when and if to turn the sprinklers on.

Architecture

The following diagram illustrates the architecture of this workflow.

The architecture includes the following high-level steps:

  • Raspberry Pi acts as an edge gateway device and collects information from soil moisture sensors via Bluetooth.
  • After authentication, Raspberry Pi subscribes to an IoT topic and starts publishing messages to it.
  • Using the IoT rule engine, data moves to Amazon Kinesis Data Firehose, which has two destinations: Amazon S3 for persistent storage and Kinesis Data Analytics for real time processing.
  • The output of Kinesis Data Analytics is a Lambda function, which triggers SNS alerts when the moisture level falls below the threshold value.
  • Finally, you use Amazon S3 data to build an Amazon QuickSight

To prepare this architecture, you need a Raspberry Pi and soil moisture sensors.

Setting up your IoT device

To set up a Raspberry Pi to generate the data, complete the following steps:

  1. Configure the Raspberry Pi and connect it to AWS IoT. For more information, see Using the AWS IoT SDKs on a Raspberry Pi
  2. Download the latest version of Python AWS IoT SDK on Raspberry Pi by either SSH or command prompt in PuTTY. You can use the pip3 program command. See the following code:
    pip3 install AWSIoTPythonSDK

    Use Python 3.4 or higher because the bltewrap library used for miflora sensors only works with Python 3.4 or above.

  3. Download the miflora sensor library to talk to sensors with the following code:
    pip3 install miflora

    The code also automatically downloads the bltewrap library required for miflora. For more information, see the GitHub repo.

  4. Download the farmbot.py script from GitHub and update the following parameters:
    • clientID – IoT and IoT shadow client ID (from step 1)
    • configureEndpoint – IoT and IoT shadow endpoint from your AWS Management Console
    • configureCredentials – IoT and IoT shadow credentials location on Raspberry Pi (from step 1)
    • Sensors – Update the mac address values of the sensors
  1. SSH into Raspberry Pi and start the Python py script. The script polls the soil moisture sensors configured in the previous step. See the following code:
    python3 farmbot.py

Creating a Firehose delivery stream with Lambda transformation

This post uses an Amazon Kinesis Data Firehose delivery stream to stream the data. You use the data from the Firehose delivery stream as input for Kinesis Data Analytics and also store it in Amazon S3 for building an Amazon QuickSight dashboard. To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. Create a Firehose delivery stream with the name IoT-data-Stream.
  3. Select the source as the default Direct PUT or other sources and leave Server side encryption to default as unchecked.
  4. Choose Next
  5. For Transform source records for AWS Lambda, select Enabled.
  6. Choose Create new.
  7. Using this Lambda function, add the new line character to the end of each record for the data sent from the delivery stream to Amazon S3. For more information, see the GitHub repo.

This is required to create a visualization in Amazon QuickSight. See the following code:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
  
    /* Process the list of records and transform them */
    /* The following must be the schema of the returning record 
      Otherwise you will get processing-failed exceptions
      {recordId: <id>, result: 'Ok/Processing/Failed', data: <base64 encoded JSON string> } 
    */ 
    const output = event.records.map((record) => ({
        /* This transformation is the "identity" transformation, the data is left intact */
        recordId: record.recordId,
        result: 'Ok',
        data: record.data+"Cg==",
    }));
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};
  1. For Convert record format, select disabled.
  2. Choose Next.
  3. For Destination, choose Amazon S3.
  4. Create a new S3 bucket named <your unique name>-kinesis.
  5. Keep all other fields default.
  6. Choose Next.
  7. Under S3 buffer conditions, for Buffer interval, enter 60 seconds.
  8. For Buffer size, enter 1 MB.
  9. Choose Create an IAM role. Kinesis Data Firehose uses this role to access your bucket.
  10. Keep all other fields default.
  11. Choose Next.
  12. Review all the fields and choose Create delivery stream.

Setting up AWS IoT to forward data to the delivery stream

To forward data to your delivery stream, complete the following steps:

  1. On the AWS IoT Core console, choose Act.
  2. Choose Create rule.
  3. Create a new AWS IoT rule with the following field values:
     Name: 'IoT_to_Firehose'
     Attribute :' * '
     Rule Querry Statement : SELECT * FROM 'farmbot/#'
     Add Action : " Send messages to an Amazon Kinesis Data Firehose stream (select    IoT-Source-Stream from the Stream name dropdown)"
     Select Separator: " \n (newline) "
  4. For the IAM role, choose Create new role.

The console creates an IAM role with the appropriate permissions for AWS IoT to access Kinesis Data Firehose. If you prefer to use an existing role, select the role from the drop-down menu and choose Update role. This adds the required permissions to your selected IAM role.

Creating the destination for the analytics application

To create the destination for the analytics application, complete the following steps:

  1. On the Amazon SNS console, create a topic and subscribe to it via email or text. For more information, see step 1 and step 2 of Getting Started with Amazon SNS
  2. Copy the ARN for the SNS topic in a text file.

Creating an analytics application to process the data

To create your analytics application, complete the following steps:

  1. On the Amazon Kinesis console, choose Data Analytics.
  2. Choose Create application.
  3. For your application name, enter Farmbot-application.
  4. For your runtime, select SQL and choose Create application
  5. For the source, choose Connect streaming data and select Choose source.
  6. Choose Kinesis Data Firehose delivery stream.
  7. Choose IoT-Source-Stream.
  8. Leave Record pre-processing with Lambda
  9. Under Access Permission, let the console create and update an IAM role to use with Kinesis Data Analytics.

If you already have an appropriate IAM role that Kinesis Data Analytics can assume, choose that role from the drop-down menu.

  1. Choose Discover Schema.
  2. Wait for the application to show results and then choose Save and continue.

If you have configured everything correctly, you see a table with the sensor names, parameters, and timestamp. See the following screenshot.

  1. For real-time analytics processing, choose Go to SQL editor.
  2. Enter the following parameters:
  • SOURCE_SQL_STREAM_001 – Contains name and sensor parameters with value and timestamp from the incoming stream.
  • INTERMEDIATE_SQL_STREAM – Contains all records with Moisture value less than 25 and filters down to fewer parameters, which for this post is Moisture and Conductivity.
  • DESTINATION_SQL_HHR_STREAM – Performs functions on the aggregate row over a 2-minute sliding window for a specified column. It detects if a sensor is constantly reporting low moisture level for 2 minutes.

See the following example code:

-- Create an output stream with seven columns, which is used to send IoT data to the destination
SELECT STREAM "Moisture","Temperature","Name","Conductivity","Light","Battery","DateTime" FROM "SOURCE_SQL_STREAM_001";


CREATE OR REPLACE STREAM "INTERMIDIATE_SQL_STREAM" (Moisture INT, Conductivity INT, Name VARCHAR(16));
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "INTERMIDIATE_SQL_STREAM"

-- Select all columns from source stream
SELECT STREAM "Moisture","Conductivity","Name"
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE "Moisture" < 25;


-- ** Aggregate (COUNT, AVG, etc.) + Sliding time window **
-- Performs function on the aggregate rows over a 2 minute sliding window for a specified column. 
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               

CREATE OR REPLACE STREAM "DESTINATION_SQL_HHR_STREAM" (Name VARCHAR(16), high_count INTEGER);
-- Create a pump which continuously selects from a source stream (SOURCE_SQL_STREAM_001)

-- performs an aggregate count that is grouped by columns ticker over a 2-minute sliding window
CREATE OR REPLACE PUMP "STREAM_PUMP_002" AS INSERT INTO "DESTINATION_SQL_HHR_STREAM"
-- COUNT|AVG|MAX|MIN|SUM|STDDEV_POP|STDDEV_SAMP|VAR_POP|VAR_SAMP)
SELECT STREAM *
FROM (
      SELECT STREAM 
             Name, 
             COUNT(*) OVER THIRTY_SECOND_SLIDING_WINDOW AS high_count
      FROM "INTERMIDIATE_SQL_STREAM"
      WINDOW TWO_MINUTE_SLIDING_WINDOW AS (
        PARTITION BY Name
        RANGE INTERVAL '2' MINUTE PRECEDING)
) AS a
WHERE (high_count >3);

On the Real-time analytics tab, you can see the results of the SQL query. See the following screenshot.

Connecting the destination for the analytics application

To connect the destination for your analytics application, complete the following steps:

  1. For Destination, select AWS Lambda Function.
  2. Create a new Lambda function and choose the Lambda blueprint.
  3. In the search bar, enter SNS.
  4. Choose kinesis-analytics-output-sns .
  5. Enter the function name.
  6. Select output format as JSON
  7. For the execution role, select Create a new role with basic Lambda permissions.

The Lambda function processes the results of the application and sends the results to the SNS topic you created.

  1. Modify the following function code and add the topic ARN you recorded:
    import base64
    import json
    import boto3
    from botocore.vendored import requests
    
    #import requests
    
    snsClient = boto3.client('sns')
    print('Loading function')
        
        for record in event['records']:
            # Kinesis data is base64 encoded so decode here
            print 'Number of records {}.'.format(len(event['records']))
            payload = json.loads(base64.b64decode(record['data']))
            #payload = base64.b64decode(record['kinesis']['data'])
            print payload
            response = snsClient.publish(
            TopicArn= 'arn:aws:sns:<region>:<account_id>:<topic_name>',
            Message='Based on current soil moisture level you need to water your lawn! ',
            Subject='Lawn maintenance reminder ',
            MessageStructure='string',
            MessageAttributes={
            'String': {
                'DataType': 'String',
                'StringValue': 'New records have been processed.'
                }
            }
        )
        return 'Successfully processed {} records.'.format(len(event['records']))
  2. Choose Create function.
  3. On the Kinesis Data Analytics console, under In-application stream, select Existing in-application stream.
  4. From the drop-down menu, choose DESTINATION_SQL_HHR_STREAM with JSON output format.
  5. Under Access permission select create or update an IAM role to use with Kinesis Data Analytics.

If you already have an appropriate IAM role that Kinesis Data Analytics can assume, choose that role from the drop-down menu.

Connecting Amazon QuickSight for data visualization

To build an Amazon QuickSight dashboard, you need to ingest the JSON raw data from Kinesis Data Firehose. Complete the following steps:

  1. On the console, choose QuickSight.

If you are using Amazon QuickSight for the first time, you must create a new account.

  1. After you log in, choose New analysis.
  2. Choose New data set.
  3. From the available source, choose S3.
  4. Enter data source name.
  5. Select to upload the manifest file.

The manifest file provides the location of the S3 bucket and the format of data in Amazon S3. For more information, see Supported Formats for Amazon S3 Manifest Files. See the following example code:

{ 
    "fileLocations": [                                                    
              {"URIPrefixes": ["https://s3.amazonaws.com/<YOUR_BUCKET_NAME>/data/<YEAR>/<MONTH>/<DATE>/<HOUR>/"
                ]}
     ],
     "globalUploadSettings": { 
     "format": "JSON"
    }
}

Amazon QuickSight imports and parses the data.

  1. To transform and format the ingested data, choose Edit/Preview data.
  2. After you are done formatting, choose Save and visualize data.

To build an analysis on the visualization screen, complete the following steps:

  1. For Visual type, choose Line Chart.
  2. Drag and drop the DateTime field to the Field Wells X-axis.
  3. Move Light to the Value
  4. Move Name to the Color

This builds a dashboard that shows the amount of light over a period of time for the four sensor locations.

  1. Change the DateTime aggregate option from DAY to MINUTE.
  2. Change Light from Sum to Average.
  3. Choose the format visual option to change the X-axis, Y-axis, or legend properties.

The following graph shows the average light over time. You can similarly build one for moisture, temperature, and soil fertility level.

Conclusion

This post showed how to use AWS DeepLens and the built-in image classification algorithm in Amazon SageMaker to detect weeds from grass based on a publicly available dataset and build a real-time lawn monitoring system using Amazon Kinesis and AWS IoT. You can also visualize data using Amazon QuickSight. You can clone and extend this example for your own use cases.


About the Authors

Ravi Gupta is an Enterprise Solutions Architect at Amazon web services. He is a passionate technology enthusiast who enjoys working with customers and helping them build innovative solutions. His core areas of focus are IoT, Analytics and Machine learning. In his spare time, Ravi enjoys spending time with his family and photography.

Shayon Sanyal is a Senior Data Architect (Data Lake) with Global Financial Services at Amazon Web Services. He enables AWS customers and partners to solve complex and challenging AI/ML, IoT, and Data Analytics problems and helps them realize their vision of a data-driven enterprise. In his spare time, Shayon enjoys traveling, running and hiking.