Blog de Amazon Web Services (AWS)

Análisis de datos en tiempo real con AWS

Por Horacio Ferro, Arquitecto de Soluciones, AWS México

 

En el mundo actual dominado por los dispositivos digitales, la cantidad y velocidad con la que nuestros clientes producen y reciben datos se han incrementado sustancialmente. A demás de la recolección y almacenamiento para su análisis posterior, trabajar con los datos en tiempo real es una ventaja competitiva muy importante.

La toma de decisiones estratégicas requiere identificar cambios en variables: financieras (acciones, divisas, derivados, commodities, etc.); meteorológicas (temperatura, precipitación, viento, presión, etc.); operativas (tráfico, producción, suministros, personal. etc.); entre otras, en tiempo real permite incrementar la eficiencia y minimizar el impacto del negocio.

En este post crearemos una aplicación para cubrir dicho objetivo utilizando los siguientes servicios y kits de desarrollo:

  • AWS Cloud Development Kit (CDK) – Con este kit de desarrollo crearemos infraestructura como código habilitando su creación y mantenimiento de una forma sencilla, así como, acelerar el despliegue de nuestros micro servicios y sitio web estático.
  • Amazon Simple Storage Service – Este servicio lo utilizaremos como repositorio de datos, toda la información recibida será almacenada aquí, y también lo utilizaremos como host de nuestra aplicación web.
  • Amazon Kinesis – Aquí crearemos dos flujos de datos, utilizando un Kinesis Data Stream tendremos acceso a la información con una latencia extremadamente baja y utilizando Kinesis Firehose podemos almacenar esta información en un Bucket de S3.
  • AWS Lambda – En este servicio desplegaremos nuestros micro servicios de ingesta de datos y consulta de información.
  • Amazon DynamoDB – Crearemos una tabla dentro de DynamoDB para almacenar la información y poder tener acceso a la misma a través de un microservicio, utilizamos DynamoDB al proveer latencias en milisegundos, inclusive microsegundos si habilitamos DynamoDB Accelerator.
  • Amazon CloudFront – Utilizaremos la red de distribución de contenido para distribuir nuestra aplicación web y proveer al usuario final de una mejor experiencia de navegación.
  • Amazon CloudWatch – Utilizaremos este servicio para recopilar las bitácoras (logs) de nuestra aplicación y poderlas consultar.
  • Amazon Cognito – Aquí controlaremos el acceso a nuestra aplicación web, utilizando los beneficios adicionales como lo son el auto registro y recuperación de contraseñas.
  • AWS Identity and Access Management – Aquí definiremos los roles que pueden utilizar nuestros microservicios y los servicios como Kinesis para establecer un control de acceso controlado a nuestros recursos.
  • Amazon API Gateway – Utilizaremos a API Gateway como fachada de nuestro microservicio de consulta, utilizando un authorizador (Authorizer) lo conectaremos con Cognito para proveer seguridad a nuestro micro servicio.
  • AWS Amplify – Este kit de desarrollo lo utilizaremos para realizar la integración de Cognito con API Gateway.

La siguiente arquitectura la utilizaremos para construir nuestra aplicación:

 

 

Esta arquitectura es 100% serverless, lo que nos permitirá evitar la administración de infraestructura, reducir la responsabilidad de seguridad y nos permitirá tener un costo bajo consumo sin capacidad ociosa.

El código completo de esta aplicación puede ser descargado de este repositorio.

En este ejemplo utilizaremos TypeScript con CDK para definir la infraestructura, como requisitos se debe contar con la Línea de Comandos de AWS (AWS CLI) y tener instalado AWS Cloud Development Kit. NOTA: No olvidar configurar la AWS CLI y realizar el bootstrap de CDK.

Después de ejecutar el comando:

cdk init app --language=typescript

Deberíamos contar con un proyecto estructurado de la siguiente manera:

- real-time-analytics-poc
  - bin
    real-time-analytics-poc.ts
  - lib
    real-time-analytics-poc-stack.ts
  - test
  .gitignore
  .npmignore
  cdk.json
  jest.config.js
  package.json
  README.md
  tsconfig.json

Lo primero que haremos es agregar los paquetes requeridos a package.json en la sección de dependencias, como se muestra a continuación:

"dependencies": {
  "@aws-cdk/core": "1.73.0",
  "@aws-cdk/aws-apigateway": "1.73.0",
  "@aws-cdk/aws-cloudfront": "1.73.0",
  "@aws-cdk/aws-cognito": "1.73.0",
  "@aws-cdk/aws-dynamodb": "1.73.0",
  "@aws-cdk/aws-iam": "1.73.0",
  "@aws-cdk/aws-kinesis": "1.73.0",
  "@aws-cdk/aws-kinesisfirehose": "1.73.0",
  "@aws-cdk/aws-lambda": "1.73.0",
  "@aws-cdk/aws-lambda-event-sources": "1.73.0",
  "@aws-cdk/aws-logs": "1.73.0",
  "@aws-cdk/aws-s3": "1.73.0",
  "@aws-cdk/aws-s3-deployment": "1.73.0",
  "source-map-support": "^0.5.16"
}

En esta aplicación necesitaremos dos stacks de CDK, uno para la infraestructura de ingestión de datos y consulta de los mismos, y otro para el sitio web.

Empezaremos agregando el nuevo stack creando un archivo llamado real-time-analytics-web-stack.ts dentro del directorio lib. Y modificando el archivo real-time-analytics-poc.ts en el directorio bin.

Archivo real-time-analytics-web-stack.ts

import * from cdk '@aws-cdk/core';

export class RealTimeAnalyticsWebStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: String, props?: cdk.StackProps) {
    super(scope, id, props);

  }
}

Archivo real-time-analytics-poc.ts

#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from '@aws-cdk/core';
import { RealTimeAnalyticsPocStack } from '../lib/real-time-analytics-poc-stack';
import { RealTimeAnalyticsWebStack } from "../lib/real-time-analytics-web-stack";
const app = new cdk.App();
new RealTimeAnalyticsPocStack(app, 'RealTimeAnalyticsPocStack');
new RealTimeAnalyticsWebStack(app, 'RealTimeAnalyticsWebStack');

Una vez realizadas estas modificaciones nuestra aplicación cuenta con dos stacks.

A continuación definiremos la infraestructura necesaria para la ingesta y consulta de datos, así como la seguridad y servicios adicionales necesarios dentro del stack 

RealTimeAnalyticsPocStack localizado en el archivo real-time-analytics-poc-stack.ts, esto lo agregaremos en el cuerpo del objeto:

export class RealTimeAnalyticsPocStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);
    
    // La infraestructura la definimos aquí.
  }
}

Primero definiremos un LogGroup para enviar ahí nuestros logs, definiremos que sea destruido cuando el stack se elimine y estableceremos una retención de logs de cinco días:

    const logGroup = new logs.LogGroup(this, 'poc-log-group', {
      logGroupName: '/realtime/analytics/',
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      retention: logs.RetentionDays.FIVE_DAYS
    });

Ahora definiremos la configuración de Cognito que nos ayudará a controlar el acceso a nuestro sitio web, para esto necesitamos un pool de usuarios, un cliente para aplicaciones y un pool de identidades:

    const userPool = new cognito.UserPool(this, 'analytics-user-pool', {
      accountRecovery: cognito.AccountRecovery.PHONE_AND_EMAIL, // Métodos por los cuales se puede recuperar la contraseña
      passwordPolicy: { // Política del password
        minLength: 8,
        requireDigits: true,
        requireLowercase: true,
        requireUppercase: true,
        tempPasswordValidity: cdk.Duration.days(7) // Duración de contraseñas generadas por Cognito.
      },
      selfSignUpEnabled: true, // Bandera para habilitar el auto servicio de inscripción
      signInAliases: { // Aquí establecemos que puede usar el usuario para iniciar sesión.
        email: true,
        username: true
      }
    });

    const appClient = userPool.addClient('api-client', {
      userPoolClientName: 'AnalyticsAPI' // Nombre del cliente.
    });

    const identityPool = new cognito.CfnIdentityPool(this, 'analytics-identity-pool', {
      allowUnauthenticatedIdentities: false, // Deshabilitamos las entidades no autenticadas.
      cognitoIdentityProviders: [{
        clientId: appClient.userPoolClientId, // Se hace la relación con el cliente que usaremos en Amplify.
        providerName: userPool.userPoolProviderName // Pool de usuarios.
      }]
    });

Ahora crearemos un Bucket de S3 con acceso privado para depositar los mensajes de Kinesis Data Streams usando Kinesis Firehose:

    const rawDataBucket = new s3.Bucket(this, 'raw-data-bucket', {
      accessControl: s3.BucketAccessControl.PRIVATE,
      removalPolicy: cdk.RemovalPolicy.DESTROY
    });

Ahora crearemos una tabla de DynamoDB para poder consultarla a través de nuestra aplicación web; la cual configuraremos con lectura y escritura bajo demanda, una llave primaria de con nombre “instrument” de tipo numerica y de igual forma será destruida al eliminar el stack:

    const analyticsTable = new ddb.Table(this, 'analytics-table', {
      billingMode: ddb.BillingMode.PAY_PER_REQUEST,
      partitionKey: {
        name: 'instrument',
        type: ddb.AttributeType.NUMBER
      },
      removalPolicy: cdk.RemovalPolicy.DESTROY
    });

Ahora crearemos el Kinesis Data Stream con una retención de 2 días y 10 shards (esta configuración debe ajustarse dependiendo la carga de datos a recibir):

    const dataStream = new kinesis.Stream(this, 'real-time-stream', {
      retentionPeriod: cdk.Duration.days(2),
      shardCount: 10
    });

Ahora configuraremos un Kinesis Firehose para enviar toda la información recibida, normalmente esto debe hacerse a la capa RAW o datos crudos de un lago de datos, esta configuración requiere de un flujo de datos para los logs, un rol que utilizará el servicio para tener acceso a los recursos y el stream per se:

    const firehoseLogStream = new logs.LogStream(this, 'kinesis-firehose-log', {
      logGroup: logGroup, // Se asocia al grupo creado en un inicio.
      logStreamName: 'firehose-stream',
      removalPolicy: cdk.RemovalPolicy.DESTROY // Elimina el stream al borrar el stack.
    });

    const firehoseRole = new iam.Role(this, 'firehose-role', {
      assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
      description: 'Role used by Kinesis Firehose to have access to the S3 bucket.',
      inlinePolicies: {
        's3-policy': new iam.PolicyDocument({ // Creamos una política en línea de S3.
          statements: [
            new iam.PolicyStatement({
              actions: [ // Solo colocamos los permisos mínimos necesarios para usar S3.
                's3:AbortMultipartUpload',
                's3:GetBucketLocation',
                's3:GetObject',
                's3:ListBucket',
                's3:ListBucketMultipartUploads',
                's3:PutObject'
              ],
              effect: iam.Effect.ALLOW,
              resources: [ // Hacemos referencia la bucket inicial.
                rawDataBucket.bucketArn,
                rawDataBucket.bucketArn + '/*'
              ]
            })
          ]
        }),
        'kinesis-policy': new iam.PolicyDocument({ // Política para usar el Kinesis Data Stream.
          statements: [
            new iam.PolicyStatement({
              actions: [
                'kinesis:DescribeStream',
                'kinesis:GetShardIterator',
                'kinesis:GetRecords',
                'kinesis:ListShards'
              ],
              effect: iam.Effect.ALLOW,
              resources: [
                dataStream.streamArn // Stream previamente creado.
              ]
            })
          ]
        }),
        'cloudwatch': new iam.PolicyDocument({ // Política para logs, permisos mínimos.
          statements: [
            new iam.PolicyStatement({
              actions: [
                'logs:PutLogEvents'
              ],
              effect: iam.Effect.ALLOW,
              resources: [
                logGroup.logGroupArn + ':log-stream:' + firehoseLogStream.logStreamName
              ]
            })
          ]
        })
      }
    });

    const firehoseStream = new kfh.CfnDeliveryStream(this, 'analytics-firehose-stream', {
      deliveryStreamName: 'analytics-ingestion-fh',
      deliveryStreamType: 'KinesisStreamAsSource', // Establecemos el tipo de origen.
      kinesisStreamSourceConfiguration: {
        kinesisStreamArn: dataStream.streamArn, // Stream de kinesis.
        roleArn: firehoseRole.roleArn // Rol con permisos mínimos necesarios.
      },
      s3DestinationConfiguration: { // Bucket de datos crudos.
        bucketArn: rawDataBucket.bucketArn,
        bufferingHints: {
          intervalInSeconds: 60,
          sizeInMBs: 100
        },
        cloudWatchLoggingOptions: { // Grupo y flujo de logs.
          logGroupName: logGroup.logGroupName,
          logStreamName: firehoseLogStream.logStreamName
        },
        compressionFormat: 'Snappy', // Compresión.
        encryptionConfiguration: {
          noEncryptionConfig: 'NoEncryption' // Sin encripción.
        },
        errorOutputPrefix: 'failed-data/', // Prefijo de errores.
        prefix: 'ingested-data/', // Prefijo donde se deposita la información.
        roleArn: firehoseRole.roleArn // Rol con permisos mínimos.
      }
    });

Ahora crearemos el microservicio de ingesta de datos, el cual desplegaremos en una lambda y necesitará de un rol para poder tener acceso a los diferentes servicios con los que interactuará:

    const ingestLambdaRole = new iam.Role(this, 'ingest-lambda-role', {
      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
      description: 'Execution role for the data ingestion Lambda.',
      inlinePolicies: {
        'dynamodb': new iam.PolicyDocument({ // Política para acceso a DynamoDB.
          statements: [
            new iam.PolicyStatement({
              actions: [
                'dynamodb:UpdateItem' // El micro servicio solo require actualizar items.
              ],
              effect: iam.Effect.ALLOW,
              resources: [
                analyticsTable.tableArn // Tabla creada en el stack.
              ]
            })
          ]
        }),
        'kinesis-policy': new iam.PolicyDocument({ // Política para acceso a Kinesis Data Streams.
          statements: [
            new iam.PolicyStatement({
              actions: [ // Permisos necesarios para consumir registros.
                'kinesis:DescribeStream',
                'kinesis:GetShardIterator',
                'kinesis:GetRecords',
                'kinesis:ListShards'
              ],
              effect: iam.Effect.ALLOW,
              resources: [
                dataStream.streamArn // Stream creado previamente.
              ]
            })
          ]
        })
      },
      managedPolicies: [ // Política administrada con los permisos más básicos para ejecutar lambdas.
        iam.ManagedPolicy.fromManagedPolicyArn(this, 'aws-lambda-basic',
          'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')
      ]
    });

    const ingestLambda = new lambda.Function(this, 'data-ingestion-lambda', {
      code: new lambda.AssetCode('functions/ingest_data'), // Definimos que el código de la lambda se encuentra en el sub directorio functions/ingest_data
      environment: {
        'ANALYTICS_TABLE': analyticsTable.tableName // Exponemos el nombre de la tabla de DynamoDB a traves de variables de ambiente.
      },
      events: [ // Definimos los eventos que disparan la lambda, en este caso registros nuevos en Kinesis.
        new lev.KinesisEventSource(dataStream, {
          // Estos parámetros deberán ser configurados de acorde a la cantidad de registros esperados.
          batchSize: 100,
          bisectBatchOnError: true,
          parallelizationFactor: 2,
          retryAttempts: 2,
          startingPosition: lambda.StartingPosition.LATEST
        })
      ],
      handler: 'app.lambda_handler',
      role: ingestLambdaRole, // Rol previamente creado.
      runtime: lambda.Runtime.PYTHON_3_8
    });

Configuremos la función de consulta:

    const queryLambdaRole = new iam.Role(this, 'query-lambda-role', {
      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
      description: 'Execution role for the query lambda.',
      inlinePolicies: { // Política para permisos de DynamoDB.
        'dynamodb': new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              actions: [ // Solo necesitamos realizar scans.
                'dynamodb:Scan'
              ],
              effect: iam.Effect.ALLOW,
              resources: [
                analyticsTable.tableArn // Tabla creada.
              ]
            })
          ]
        })
      },
      managedPolicies: [ // Política con permisos mínimos.
        iam.ManagedPolicy.fromManagedPolicyArn(this, 'aws-lambda-query-basic',
          'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')
      ]
    });

    const queryLambda = new lambda.Function(this, 'api-lambda', {
      code: new lambda.AssetCode('functions/query_data'),
      environment: {
        'ANALYTICS_TABLE': analyticsTable.tableName
      },
      handler: 'app.lambda_handler',
      role: queryLambdaRole,
      runtime: lambda.Runtime.PYTHON_3_8
    });

Para poder consumir este micro servicio utilizaremos API Gateway para exponerlo como una API REST.

    const analyticsApi = new api.RestApi(this, 'analytics-api', { // Creamos la API.
      description: 'Analytics query API.',
      defaultCorsPreflightOptions: { // Agregamos CORS en la API.
        allowCredentials: true,
        allowHeaders: ['*'],
        allowMethods: ['GET'],
        allowOrigins: ['*'],
        statusCode: 200
      }
    });

    // Se crea el recurso donde vivirá la API de datos.
    const dataResource = analyticsApi.root.addResource('data');

    // Definimos el método GET sobre el recurso y se hace la integración con la lambda.
    const getDataMethod = dataResource.addMethod('GET',
      new api.LambdaIntegration(queryLambda, {
        proxy: true
      }));

    // Se crea el autorizador de cognito para evitar llamadas no autenticadas a la API.
    const apiAuthorizer = new api.CfnAuthorizer(this, 'analytics-authorizer', {
      identitySource: 'method.request.header.Authorization',
      identityValidationExpression: 'Bearer (.*)',
      name: 'AnalyticsAuthorizer-' + cdk.Aws.ACCOUNT_ID,
      providerArns: [userPool.userPoolArn],
      restApiId: analyticsApi.restApiId,
      type: 'COGNITO_USER_POOLS'
    });

Para facilitar la configuración de la aplicación Web creamos los siguientes outputs para obtener la información de Cognito y APIs:

    new cdk.CfnOutput(this, 'identity-pool-id', {
      description: 'IdentityPoolId',
      value: identityPool.ref
    });

    new cdk.CfnOutput(this, 'region', {
      description: 'Cognito Region',
      value: cdk.Aws.REGION
    });

    new cdk.CfnOutput(this, 'user-pool-id', {
      description: 'UserPoolId',
      value: userPool.userPoolId
    });

    new cdk.CfnOutput(this, 'user-pool-web-client-id', {
      description: 'UserPoolWebClientId',
      value: appClient.userPoolClientId
    });

    new cdk.CfnOutput(this, 'api-endpoint', {
      description: 'API Gateway URL',
      value: analyticsApi.url
    });

Ahora construiremos el stack para la aplicación web:

    // Creamos el bucket de S3 para el sitio Web y configuramos la página de inicio.
    const webAppBucket = new s3.Bucket(this, 'web-app-bucket', {
      websiteIndexDocument: 'index.html'
    });

    // Generamos el despliegue del sitio web señalando la ruta de los fuentes.
    const deployment = new s3d.BucketDeployment(this, 'web-deployment', {
      sources: [s3d.Source.asset('web/dashboard/dist')],
      destinationBucket: webAppBucket
    });

    // Creamos un Origin Access Identity para dar permisos de lectura a CloudFront sobre el bucket de S3.
    const cloudFrontOia = new cf.OriginAccessIdentity(this, 'web-site-oia', {
      comment: 'OIA for real time analytics.'
    });

    // Creamos la distribución del sitio web en CloudFront y asignamos la OAI.
    const webDistribution = new cf.CloudFrontWebDistribution(this, 'web-cf-distribution', {
      originConfigs: [{
        behaviors: [{
          isDefaultBehavior: true
        }],
        s3OriginSource: {
          originAccessIdentity: cloudFrontOia,
          s3BucketSource: webAppBucket
        }
      }]
    });

    // Creamos una política para acceso en el bucket de S3 con la OAI.
    const cfPolicy = new iam.PolicyStatement();
    cfPolicy.addActions('s3:GetBucket*', 's3:GetObject*', 's3:List*');
    cfPolicy.addResources(webAppBucket.bucketArn, webAppBucket.bucketArn + '/*')
    cfPolicy.addCanonicalUserPrincipal(cloudFrontOia.cloudFrontOriginAccessIdentityS3CanonicalUserId)
    webAppBucket.addToResourcePolicy(cfPolicy)

    // URL de CloudFront.
    new cdk.CfnOutput(this, 'cloud-front-url', {
      description: 'Cloud Front URL',
      value: webDistribution.distributionDomainName
    });

Utilizando el comando de npm run build podemos verificar que el código este correcto, ej:

npm run build
> real-time-analytics-poc@0.1.0 build /home/horacio/projects/serverless-realtime-analytics
> tsc

Para poder realizar el despliegue de la infraestructura es necesario crear el código fuente de los micro servicios.

Microservicio de ingesta que deberá ser colocado en functions/ingest_data:

import base64
import boto3
import os
from botocore.exceptions import ClientError
from decimal import *

TABLE_NAME = os.getenv('ANALYTICS_TABLE')
DYNAMODB_CLIENT = boto3.client('dynamodb')


def lambda_handler(event, context):
    """
    Lambda handler, process the records in a Kinesis Stream and updates the DynamoDB table.

    :param event: Received by the lambda.
    :param context: Execution context.
    :return: Ok string.
    """
    for record in event['Records']:
        data = str(base64.b64decode(record['kinesis']['data'])).replace("'", '').replace('\\n', '').replace('"', '')
        print(data)
        item = extract_data(data)
        key = {'instrument': {'N': str(item['instrument'])}}
        del item['instrument']
        exp, values = create_expression_values(item)
        try:
            outcome = DYNAMODB_CLIENT.update_item(
                TableName=TABLE_NAME,
                Key=key,
                UpdateExpression=exp,
                ExpressionAttributeValues=values
            )
            print(outcome)
        except ClientError as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                print("Ignoring message out of sync: ")
                print(e.response['Error'])
            else:
                raise e
    return "Ok"


def extract_data(data: str):
    """
    Extracts the data from the String, splits the fields by | and then gets the name before the = and value.

    :param data: The string to parse.
    :return: Dictionary with the parsed data.
    """
    fields = data.split('|')
    item = dict()
    for field in fields:
        if len(field) > 3:
            name_value = field.split('=')
            name = name_value[0]
            value = name_value[1]
            if name.startswith('price_'):
                item[name] = Decimal(str(int(value) / 100))
            elif name == 'instrument' or name == 'reception':
                item[name] = int(value)
            elif name == 'sequence':
                item['seq'] = int(value)
            elif name == 'type':
                item['t'] = int(value)
            elif name == 'level':
                item['l'] = int(value)
            else:
                item[name] = value
    return item


def create_expression_values(item: dict):
    """
    Creates the update expression using the provided dictionary.

    :param item: Dictionary of the data to use for the expression.
    :return: String with the expression.
    """
    expression = 'SET '
    values = dict()
    for name in item:
        expression += '{} = :{}, '.format(name, name)
        values[':{}'.format(name)] = {'S' if type(item[name]) is str else 'N': str(item[name])}
    return expression[0:-2], values

Microservicio de consulta que deberá ser colocado en functions/query_data:

import boto3
import json
import os
from decimal import Decimal

TABLE_NAME = os.getenv('ANALYTICS_TABLE')
DYNAMODB_CLIENT = boto3.client('dynamodb')


def lambda_handler(event, context):
    """
    Simple handler, scans the DynamoDB to retrieve all the items.

    :param event: Received from the API Gateway.
    :param context: Execution context.
    :return: The available items in DynamoDB.
    """
    response = DYNAMODB_CLIENT.scan(
        TableName=TABLE_NAME,
        Select='ALL_ATTRIBUTES',
        ConsistentRead=True
    )
    print(response['Items'])
    items = list()
    for item in response['Items']:
        new_item = dict()
        for key in item:
            if str(key).startswith('price_'):
                new_item[key] = float(item[key]['N'])
            elif key == 'instrument':
                new_item[key] = int(item[key]['N'])
            else:
                new_item[key] = item[key]
        items.append(new_item)
    return {
        'statusCode': 200,
        'headers': {
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Credentials': 'true',
            'Access-Control-Allow-Methods': 'GET',
            'Access-Control-Allow-Headers': '*',
            'Content-Type': 'application/json'
        },
        'body': json.dumps(items)
    }

Una vez creados los microservicios en su respectiva carpeta, es debe realizar la instalación de la infraestructura del primer stack ejecutando el comando “cdk deploy RealTimeAnalyticsPocStack“. Al terminar el despliegue obtendremos los valores necesarios para configurar la aplicación web:

En el archivo main.js de la aplicación (descargar el código desde este repositorio git), debemos modificar la configuración de Amplify que se muestran a continuación. Nota: En este ejemplo la región es us-east-1:

Amplify.configure({
  Auth: {
    identityPoolId: 'us-east-1:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
    identityPoolRegion: 'us-east-1',
    userPoolId: 'us-east-X_XXXXXXXXX',
    userPoolWebClientId: 'XXXXXXXXXXXXXXXXXXXXXXXXXX',
    region: 'us-east-1'
  },
  API: {
    endpoints: [
      {
        name: 'AnalyticsAPI',
        endpoint: 'https://XXXXXXXXXX.execute-api.us-east-1.amazonaws.com/prod'
      }
    ]
  }
})

Una vez modificadas esta configuración podemos proceder a desplegar nuestra aplicación web con el comando “cdk deploy RealTimeAnalyticsWebStack”.

Al finalizar el despliegue podemos navegar a la liga de salida y ver la página de inicio de sesión.

Para probar esta aplicación necesitaremos utilizar Kinesis Data Generator, el cual se puede obtener de la siguiente liga junto con sus instrucciones de uso: Kinesis Data Generator

Este es el template para generar los registros:

session={{date.now('YYYYMMDD')}}|sequence={{date.now('x')}}|reception={{date.now('x')}}|instrument={{random.number(9)}}|l={{random.number(20)}}|price_0={{random.number({"min":10000, "max":30000})}}|price_1={{random.number({"min":10000, "max":30000})}}|price_2={{random.number({"min":10000, "max":30000})}}|price_3={{random.number({"min":10000, "max":30000})}}|price_4={{random.number({"min":10000, "max":30000})}}|price_5={{random.number({"min":10000, "max":30000})}}|price_6={{random.number({"min":10000, "max":30000})}}|price_7={{random.number({"min":10000, "max":30000})}}|price_8={{random.number({"min":10000, "max":30000})}}|

Conclusion

Utilizando esta arquitectura es posible realizar análisis de información en tiempo semireal y análisis de información posterior, podemos hacer una combinación de ambos para generar modelos predictivos utilizando AI/ML y realizar detección de anomalías o incluso integrar Kinesis Data Analytics con el mismo fin.

 

Siguientes Pasos

Esta aplicación de ejemplo permite visualización de datos en tiempo semireal, las siguientes acciones es habilitar a los usuarios finales a poder tomar acciones con esta información.

 

Más información:

https://aws.amazon.com/cdk/

https://aws.amazon.com/s3/

https://aws.amazon.com/kinesis/

https://aws.amazon.com/lambda/

https://aws.amazon.com/dynamodb/

https://aws.amazon.com/cloudfront/

https://aws.amazon.com/cloudwatch/

https://aws.amazon.com/cognito/

https://aws.amazon.com/iam/

https://aws.amazon.com/apigateway/

https://aws.amazon.com/amplify/

https://aws.amazon.com/cli/

 


Sobre el autor:

Horacio Ferro es Arquitecto de Soluciones en AWS México.

 

 

 

 

Revisores:

Enrique Compañ es Arquitecto de Soluciones en AWS México.

 

 

 

 

Enrique Valladares  es Sr.Manager Arquitecto de Soluciones en AWS México.

 

 

 

 

 

Use los datos para impulsar el crecimiento empresarial. Logre una innovación constante con el volante de inercia de datos.