Le Blog Amazon Web Services

Persister des flux de données vers S3 en utilisant Amazon Kinesis Data Firehose et AWS Lambda

Le traitement des flux de données, aussi connu sous le nom de Stream Analytics, est très répandu dans les grandes entreprises. Cela est notamment dû à l’avènement de différentes technologies, plus faciles à utiliser. Par exemple Spark-Streaming connecté à un flux de données Amazon Kinesis est une solution classique qui permet de réaliser des traitements analytiques sur des données en temps réel.

Un sujet ne doit néanmoins pas être négligé : celui qui concerne la persistance de ces flux de données d’une manière fiable et durable, ainsi que la facilité avec laquelle on peut le faire. Cet article détaille une manière simple et efficace de persister les données dans Amazon S3 depuis des flux de données Amazon Kinesis en utilisant AWS Lambda et Amazon Kinesis Data Firehose, un service managé AWS.

Prenons un cas d’usage d’un de nos clients : Hearst Publishing est un groupe média qui produit des magazines tel que Cosmopolitan, Elle, Esquire, Seventeen, et Car and Driver, ainsi que des médias télévisés telle que A&E Networks et Esquire Network aux Etats-Unis. Hearst Publishing a besoin de collecter des données pertinentes provenant de plus de 200 sites digitaux en temps réel. Ces données sont importantes pour leur permettre de mieux comprendre comment leurs sites sont utilisés, quels sujets sont les plus en vogue en se basant sur le contenu, etc. L’exploitation de ces données, qu’il s’agisse de l’historique ou en temps réel, permet à Hearst de superviser son activité et devenir plus agile dans la gestion du contenu mis en ligne pour leurs clients, par la mise à disposition de ces analyses aux gestionnaires des contenus.

Description d’une première approche

Hearst Publishing a choisi, pour ce faire, une suite de composants pour implémenter leur traitement ETL du flux de données en temps réel : Amazon Kinesis Data Streams, Spark sur Amazon EMR et Amazon S3. Ils ont également réalisé qu’ils avaient besoin de persister les données brutes en parallèle de leur traitement sur EMR-Spark. En phase avec l’un des paradigme les plus important du Big Data qui consiste à “ne jamais sacrifier des données”, toutes les données qui arrivent en flux sont aussi persistées sur S3 dans le but de constituer un historique qui pourra, le cas échéant, être re-traité par une autre équipe qui voudra à son tour consommer ces données, ou les re-analyser en utilisant un schéma de traitement différent dans Spark. La librairie client Amazon Kinesis (KCL) ainsi que le connecteur Amazon Kinesis offrent une manière consistante et hautement configurable pour récupérer des données à partir de flux de streaming et pour les persister sur Amazon S3 :

  1. La librairie KCL dispose d’une fonctionnalité qui permet de sauvegarder l’état d’avancement concernant le traitement d’un flux de données. Quand une nouvelle application consomme pour la première fois un flux de données, il est possible de traiter d’une manière rétrospective tous les enregistrements disponibles dans le flux de données (TRIM-HORIZON) ou alors commencer à traiter à partir du premier enregistrement qui arrive dans le flux de données après le démarrage de l’application (LATEST),
  2. La librairie KCL s’intègre d’une manière très facile avec les connecteurs Amazon Kinesis,
  3. Les connecteurs fournissent des fonctionnalités qui permettent de simplifier la transformation, le lotissement, le filtrage et l’émission des enregistrements Amazon Kinesis vers Amazon S3. Les données peuvent être envoyées par lots, la taille des lots peut être définie en fixant des seuils : nombre d’enregistrements, temps passé depuis la dernière émission, volume des données.

Ces fonctionnalités font du connecteur KCL (KCL-C) un outil très puissant et populaire. Le client KCL-C tourne sur une instance EC2 ou sur une flotte d’instances et le déploiement peut être géré avec AWS CloudFormation et AutoScaling. La diagramme ci-dessous montre un exemple d’architecture incluant l’utilisation de la librairie KCL. (Dans ce qui suivra, il ne s’agira pas d’implémenter cette solution. Cette architecture sert avant tout à illustrer les composants, les interactions nécessaires pour pouvoir persister les données d’un flux de données, en parallèle de sont traitement) :

Persistence Data Stream vers Amazon S3 - Solution avec KCL

Hearst Publishing a ensuite réévaluer son environnement AWS en souhaitant, tant que possible, privilégier l’utilisation de services managés. Avec une équipe de développement à taille réduite et une volonté de consacrer leurs efforts sur la partie Analyse des données par la Data Science, leur objectif était de s’affranchir des tâches de supervision des instances EC2. Ainsi la question suivante s’est posée : “comment faire pour continuer à profiter de la fiabilité de la KCL-C pour nos données, sans avoir a gérer d’instances EC2 ? AWS pourrait-il nous fournir un service qui permettrait d’accomplir cette tâche afin que nous puissions nous concentrer sur les sujets de data science ?”

Pour faire court, il s’agit d’un cas d’usage où Amazon Kinesis Data Firehose répond parfaitement. Étant donnés les besoins en terme de traitement, la fiabilité était critique, de même pour la possibilité d’agréger les données dans des fichiers plus volumineux avant de les persister sur S3. Le diagramme ci-dessous illustre un exemple d’architecture avec Amazon Kinesis Data Firehose :

Persistence Data Stream vers Amazon S3 - Solution avec Amazon Kinesis Data Firehose

Mise en place Amazon S3 et de Amazon Kinesis Data Streams

Il est nécessaire de créer un flux de données représentant les données brutes telles qu’elles arrivent, ainsi qu’un compartiment/bucket Amazon S3 ou les données vont être stockées. Pour plus d’information, référez-vous à ces liens pour Créer un flux de données et Créer un bucket.

Création du flux de diffusion Kinesis Firehose

Passons maintenant à l’étape de création du flux de diffusion Kinesis Firehose sur la Console AWS. Pour plus d’information, consultez la documentation Amazon Kinesis Data Firehose.

Commencez par choisir le nom de votre flux de diffusion, ainsi que la source. Dans notre cas, nous publierons des enregistrements dans un flux de données Kinesis Data Streams avant de les récupérer et persister avec Kinesis Data Firehose. Nous opterons donc pour “Flux de données Kinesis” comme source. Prenez soin de bien sélectionner le flux de données Kinesis créé à l’étape précédente depuis la liste déroulant.

Amazon Kinesis Firehose vers Amazon S3 - AWS Console - Etape 1

Notons que Amazon Kinesis Data Firehose nous propose optionnellement de :

  • Modifier les enregistrements en utilisant une fonction AWS Lambda,
  • Convertir les données stockées dans S3 vers un format colonne (ORC ou PARQUET) plus adaptée pour être requêté en SQL.

Ces fonctionnalités ne seront pas exploitées dans le cadre de cet article, nous choisirons donc de les désactiver.

Il conviendra par la suite définir la Amazon S3 comme destination, sélectionner le bucket S3, ainsi que le préfixe S3 :

Dans cet article, nous considérons que vous n’avez pas créé le rôle AWS IAM qui octroiera les accès nécessaires à Amazon Kinesis Data Firehose, il convient de le créer maintenant : Dans la section Autorisations, choisissez Créer ou mettez à jour le rôle IAM.

Pour information, la stratégie AWS IAM associée au rôle utilisé par Kinesis Firehose est la suivante :

{
    "Version": "2012-10-17",
    "Statement":
    [
        {
            "Sid": "StmtDemo1",
            "Effect": "Allow",
            "Action":
            ["s3:AbortMultipartUpload","s3:GetBucketLocation","s3:GetObject","s3:ListBucket","s3:ListBucketMultipartUploads","s3:PutObject"
            ],
            "Resource":
            ["arn:aws:s3:::*"]
        },
        {
            "Sid": "StmtDemo2",
            "Effect": "Allow",
            "Action":  
            ["kms:Decrypt","kms:Encrypt"],
            "Resource":  ["*"]
        }
    ]
}

Il conviendra de remplacer la variable YOURACCTID par l’identifiant de votre compte AWS

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "StmtDemo3",
      "Effect": "Allow",
      "Principal": {"Service": "firehose.amazonaws.com"},
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": “YOURACCTID">
        }
      }
    }
  ]
}

Pour finir, nous allons configurer le flux de diffusion Kinesis Firehose en indiquant les éléments de configuration détaillés plus haut. Dans le cadre de cet article, nous allons fixer le volume maximal de la zone tampon et la période maximale de lotissement respectivement à 2MB et 60 secondes.

NOTE : Dans le cadre de cet article, nous n’allons pas compresser ni chiffrer les données sur Amazon S3. votre implémentation pourrait éventuellement varier pour en optimizer les performances et les coûts.
Pour un déploiement en production le chiffrement des données est fortement recommandé

Pour résumer la configuration, nous avons :

  • Défini le nom du flux de diffusion Kinesis Firehose,
  • Défini le bucket S3 de destination,
  • Spécifié un préfixe S3,
  • Défini les seuils d’agrégation – Dans ce cas : au bout de 60 secondes ou 2MB (le premier seuil atteint déclenche la diffusion d’un lot de données vers S3)
  • Nous n’avons ni compressé, ni chiffré les données dans S3 (étape importante pour tout déploiement en production).

Vérifiez la configuration de votre flux de diffusion avant de lancer sa création:

 

Insertion d’enregistrement dans le flux de données et vérification des résultats

La seule étape manquante consiste à ajouter des données à notre flux et constater que notre bucket S3 se remplit correctement. Dans le projet Java disponible sur le repository Github du blog Big Data, une classe utilitaire permet de publier des données de simulation vers Kinesis Data Streams (com.amazonaws.proserv.PopulateKinesisData). Si vous l’exécutez depuis votre repository local, il convient de renseigner vos information d’authentification en ajoutant votre clé d’accès (access key) au fichier resources/AwsCredentials.properties. Si vous exécutez le programme depuis une instance EC2, il convient de s’assurer que le rôle IAM attaché à instance EC2 dispose bien des permissions nécessaires sur Kinesis Data streams
Après la publication de messages dans le flux de données et l’un des seuil franchi (2MB ou 60 secondes), vous verrez apparaître dans votre bucket S3 de destination les données avec comme clé le préfixe que vous avez défini, ainsi que l’année, le mois, le jour et l’heure correspondant au moment ou le fichier a été écrit par Kinesis Firehose (prefixe/yyyy/mm/dd/hr/*).

Conclusion

Dans cet article, nous avons montré comment persister d’une manière fiable les données d’un flux Kinesis Data Streams vers S3 en utilisant le service Kinesis Firehose. Kinesis Firehose permet de s’affranchir de la nécessité d’avoir a gérer des instances EC2 pour agréger des données venant de streams Kinesis et pour les stocker dans S3 et s’appuie sur l’une des pratiques les plus courante de la persistance des flux temps-réel de données :

  1. Agréger les données en définissant des seuil,
  2. Persister les données sur une solution de stockage durable (Amazon S3 dans ce cas précis).

Le cas d’usage de Hearst Publishing a permis de mettre en évidence une manière fiable de persister les données d’un flux Kinesis Data Streams dans Amazon S3 en parallèle du traitement du Stream. Cet exemple a également permis de mettre en évidence les services managés AWS. Enfin, la source de données étant Kinesis Data Streams, le flux de diffusion Kinesis Firehose permettant la persistance dans Amazon S3 pourrait s’exécuter en parallèle du traitement temps réel du flux de données sans aucun impact.

Article initialement publié par Derek Graeber, Consultant Big Data Analytics Senior pour AWS Professional Services. Mise à jour et traduit en Français par Nadir Djadi, Architecte de Solutions AWS au sein de l’équipe France passionné par les architectures de données/Analytics, LinkedIn.