Avec l'utilisation continue et croissante de StreamSets comme composant d'intégration de données, nous avons commencé à formaliser certains modèles émergents qui nous ont permis de :
- Simplifier la maintenance
- Éviter les temps d'arrêt
- tirer parti de l'utilisation de Kafka, de HAproxy et de la réplication des pipelines pour faire évoluer les intégrations avec de gros volumes de données ou des transformations complexes.
Vous trouverez ci-dessous une liste de certains de nos modèles les plus utilisés.
Modèle logique d'intégration de données de haut niveau
En règle générale, nous divisons chaque intégration en 3 étapes qui peuvent contenir 3 pipelines ou plus.
Les avantages de cette approche sont les suivants :
- L'isolation des étapes permet de traiter les changements à une étape spécifique sans affecter les autres.
- Kafka in the middle garantit que si un parser ou un store est en panne, les données continuent d'être consommées et stockées dans Kafka pendant 7 jours.
- Kafka permet également un traitement parallèle avec des pipelines répliqués.
- Les journaux analysés poussés vers Kafka sont immédiatement accessibles par d'autres outils tels que Flink Streaming, KSQL ou des conteneurs d'apprentissage automatique sans avoir à répliquer les règles appliquées dans les pipelines Streamsets.
- Nous pouvons avoir des pipelines qui s'exécutent et consomment les logs dans des ordinateurs extérieurs à notre cluster et poussent les données vers Kafka. Par la suite, l'analyse et le stockage sont centralisés dans le cluster.
Étape 1 - Pipeline des consommateurs
Un pipeline StreamSets consomme des données brutes et les pousse dans un topic Kafka portant le nom projectname.raw.integration. Par exemple projectx.raw.weblogic
Cela nous permet de modifier, arrêter et redémarrer les pipelines Parse et Store sans perdre les données générées pendant les fenêtres de maintenance.
Ce pipeline n'est pas nécessaire pour les agents qui écrivent directement sur kafka.
Pour certains cas spécifiques, le même pipeline ou un autre écrit les données vers HDFS ou un stockage externe qui a besoin des données brutes. Principalement les sources requises pour les audits.
Étape 2 - Pipeline d'analyse et d'enrichissement
Un deuxième pipeline consomme les données brutes du sujet brut, analyse et enrichit le log et stocke les nouvelles données dans un nouveau sujet : projectname.parsed.integration.
Les transformations les plus courantes sont les suivantes :
- Analyser le modèle de journal en utilisant Log Parser.
- Supprimer les champs inutiles avec Field Remover.
- Renommer ou changer le cas d'un champ en utilisant Field Renamer.
- Générer de nouveaux champs en utilisant Expression Evaluators.
- Enrichissement en utilisant Redis Lookup ou Apache Solr comme magasins de valeurs clés. Par exemple :
- Ajouter un champ d'environnement (prod, stage, qa, dev) en utilisant le nom d'hôte comme clé.
- Ajout de détails sur l'entreprise en utilisant son numéro d'identification fiscale
- Recherche de géolocalisation IP
- Convertissez une date en UTC ou une chaîne de caractères en chiffres à l'aide du convertisseur de type de champ.
- Rejetez les enregistrements ou acheminez-les vers différents sujets Kafka à l'aide de Stream Selector.
- Aplatissez les champs avec Field Flattener.
- Quelques règles de gestion complexes écrites en Jython.
- Générer les champs année, mois, jour pour le partitionnement dans Kudu.
- Générer un identifiant unique (GUID / UUID).
- Analyse des champs de date.
Voici notre pipeline "2 - Apache Access - Parser" :
Les étapes :
1. Consomme les logs du sujet "brut".
2. Convertit le corps HTTP en Json
3. Pivote les champs du corps à la racine de l'enregistrement
4. Acheminer les erreurs d'apache et les accès d'apache vers des chemins différents.
1.1. Analyse de l'erreur apache
1.2. Ajouter la destination Kafka : sujet parsed.apache_error
2.1. Analyse de l'accès d'apache
2.2. Ajouter une destination Kafka : sujet parsed.apache_access
5. Aplatir les champs
6. Supprimer les en-têtes et les noms analysés
7. Supprimer les champs contenant des données brutes
8. Nettoyage des noms d'hôtes (minuscules et suppression du domaine)
9. Résoudre l'environnement en utilisant Redis
10. Ajouter un champ ID avec un UUID
11. Convertir les dates en ISO
12. Convertir les champs d'horodatage en long
13. Convertir les horodatages en millisecondes
14. Résoudre le référent d'accès apache
15. Pour les journaux d'accès apache, utiliser l'adresse IP pour identifier la géolocalisation
en utilisant Apache Solr
16. Écrire le log dans project_name.parsed.apache_access
ou nom_du_projet.parsed.apache_error
Étape 3 - Pipelines de stockage et de visualisation
Le troisième pipeline consomme les données analysées du sujet et les envoie à :
- Une table Kudu
- Un dossier HDFS
- Un stockage externe (en dehors de notre cluster) tel que les bases de données Oracle
- Un stockage NAS pour l'historique et la sauvegarde tel que EMC Isilon
- Apache Solr
- Courtiers Kafka externes (en dehors de notre cluster)
- Instadeq pour les tableaux de bord en direct et l'exploration des données
Parfois, nous utilisons un seul pipeline dans les StreamSets mais si les destinations ont des vitesses différentes ou si l'une d'entre elles présente plus d'erreurs qui provoquent des redémarrages du pipeline, nous utilisons différents pipelines pour chaque destination.
Nous conservons le numéro 3 pour tous les pipelines car ils consomment tous les logs des sujets analysés mais les envoient à des destinations différentes :
- 3- Accès Tomcat - Stockage Kudu
- 3- Accès Tomcat - Stockage HDFS
- 3- Accès Tomcat - Stockage Isilon à long terme
- 3- Accès Tomcat - Tableau de bord Instadeq
Scaling Pipelines
Lorsque nous avons un pipeline avec un volume de données élevé, nous le faisons évoluer avec :
- HAproxy agissant comme répartiteur de charge
- Réplication du pipeline StreamSets
- Partitionnement des sujets Kafka
Par exemple, chez deux clients, nous recevons les journaux d'accès à Tomcat par le biais de messages HTTP et nous avons la configuration suivante :
Équilibreur de charge HAproxy
Nous créons une nouvelle entrée dans notre serveur exécutant HAproxy : /etc/haproxy/haproxy.cfg
Nous recevons des requêtes HTTP sur le port 4005 et les transmettons à 3 pipelines StreamSets "1- Tomcat Access - Consumer" différents fonctionnant dans le cluster-host-01, cluster-host-02 et cluster-host-03.
Kafka : Partitionnement des sujets
Nous créons les sujets Kafka projectx.raw.tomcat_access et projectx.parsed.tomcat_access avec 3 partitions ou plus :
Streamsets : Réplication des pipelines
We replicate our pipeline to run in different cluster nodes, each one consuming from one of those partitions.
1. Accès à Tomcat - Consumer
- Trois instances de pipeline fonctionnant dans trois nœuds différents
- Consommez les journaux en utilisant un serveur HTTP Origin qui écoute sur le port 4005.
- Ecrire les logs bruts dans Kafka en utilisant une destination Kafka Producer.
- Sujet Kafka cible : projectx.raw.tomcat_access
2.Accès à Tomcat - Analyseur/Parser
- Trois instances de pipeline fonctionnant dans trois nœuds différents
- Consommer les logs bruts de Kafka en utilisant un Kafka Consumer Origin
- Sujet source : projectx.raw.tomcat_access
- Analyser, enrichir et filtrer les logs
- Les écrire dans Kafka en utilisant un Kafka Producer Destination
- Sujet cible : projectx.parsed.tomcat_access
3.Accès à Tomcat - Stockage
- Trois instances de pipeline fonctionnant dans trois nœuds différents
- Consommez les journaux d'accès Tomcat enrichis à partir de Kafka en utilisant un Kafka Consumer Origin.
- Sujet Kafka source : projectx.parsed.tomcat_access
- Les stocker dans une table Kudu en utilisant une Destination Kudu.
- Table Kudu cible : projectx.tomcat_access
3- Accès à Tomcat - Tableau de bord d'Instadeq
- Une seule instance de pipeline
- Consommez les logs d'accès Tomcat enrichis à partir des trois partitions Kafka en utilisant un Kafka Consumer Origin.
- Source Kafka Topic : projectx.parsed.tomcat_access
- Envoyez-les à Instadeq en utilisant les webhooks Instadeq
Exemples
- Logs Linux de RSysLog vers une table Kudu en utilisant StreamSets pour l'ingestion et la transformation de données
- Logs d'applications Java envoyés au cluster à l'aide de log4j ou logback, avec StreamSets pour l'ingestion et la transformation des données, KSQL pour l'analyse en continu, Kudu pour le stockage et Instadeq pour les tableaux de bord en direct.
- De Redmine à Instadeq Dashboard en utilisant des Streamsets : Intégration directe sans Kafka ni stockage intermédiaire