Kafka est un système de messagerie distribué, originellement développé chez LinkedIn, et maintenu au sein de la fondation Apache depuis 2012. Son adoption n’a cessé de croître pour en faire un quasi de-facto standard dans les pipelines de traitement de données actuels. Bien plus qu’un simple concurrent des outils conçus autour des standards JMS ou AMQP, Kafka a pour ambition de devenir la plateforme centralisée de stockage et d’échange de toutes les toutes les données émises par une entreprise en temps réel.
En 2009 chez LinkedIn, l’équipe en charge de la mise en place de l’adoption d’Hadoop (Jay Kreps, Neha Narkhede, Jun Rao…) s’est vue confrontée à des problèmes quasi insurmontables d’intégration des données provenant des différents systèmes en place au sein de l’infrastructure de l’entreprise.
La grande diversité des technologies utilisées, la fragmentation des données, le couplage fort entre les systèmes induit par les connexions point à point entre ceux-ci, se sont avérés être des freins importants (voir des écueils) pour la création de produits innovants.
Ce constat a conduit cette équipe à concevoir un outil capable de centraliser les flux de données provenant des différents systèmes d’une entreprise et devant posséder les caractéristiques suivantes :
Les outils traditionnels de brokers de messages d’entreprise (RabbitMQ/ActiveMQ) ne se sont pas avérés adaptés à la création de cette plateforme.
Parmi les problèmes rencontrés dans la création d’un tel système avec RabbitMQ/ActiveMQ :
C’est ce qui a motivé la création d’un outil ad hoc : Kafka.
Apache Kafka est une plateforme de streaming événementielle distribuée capable de gérer des trillions d'événements par jour. Initialement conçue comme une file d'attente de messagerie, Kafka est basée sur une abstraction d'un journal de validations distribué. Depuis sa création et sa mise en open source par LinkedIn en 2011, Kafka a rapidement évolué de file d'attente de messagerie à une plateforme de streaming à part entière.
Fondé par les développeurs originaux d'Apache Kafka, Confluent développe la distribution la plus complète de Kafka avec la plateforme Confluent. La plateforme Confluent améliore Kafka avec des fonctionnalités communautaires et commerciales supplémentaires conçues pour améliorer l'expérience de streaming des opérateurs et des développeurs en production, à très grande échelle.
En son centre se trouve l'humble et immuable journal des évènements, et de là vous pouvez vous y abonner et publier des données dans un nombre illimité de systèmes ou d’applications en temps réel.
Contrairement aux files d’attente de messagerie, Kafka est un système distribué hautement évolutif, tolérant aux défaillances, lui permettant d’être déployé pour des applications comme gérer les passagers et attribuer les chauffeurs Uber, fournir des données analytiques en temps réel ainsi que de la maintenance préventive pour la maison intelligente de British Gas et effectuer de nombreux services en temps réel sur l’ensemble de LinkedIn. Ce degré de performance unique est donc parfait pour évoluer d’une application à une utilisation dans l’ensemble de l'entreprise.
Une abstraction d’un journal d'évènements distribué généralement dans des bases de données distribuées, Apache Kafka offre un stockage durable.
Kafka peut agir comme une « source de réalités », en étant en mesure de distribuer les données entre plusieurs nœuds pour un déploiement à haute disponibilité dans un centre de données unique ou entre plusieurs zones de disponibilité.
Une plateforme de streaming événementielle ne serait pas complète sans la possibilité de manipuler ces données au fur et à mesure qu'elles arrivent.
L'API Streams d'Apache Kafka est une bibliothèque puissante et légère qui autorise un traitement à la volée, vous permettant de regrouper, de créer des paramètres de fenêtrage, d'effectuer des jointures au sein d'un flux, et bien plus encore. Peut-être mieux encore, elle est conçue comme une application Java au-dessus de Kafka, gardant votre flux de travail intact sans aucun cluster supplémentaire à entretenir.
Kafka a proprement parlé est composé de Brokers et de Zookeeper. Les brokers servent de pivot entre les différents services. Ils sauvegardent les données qui transitent, tout en assurant une redondance des données afin d’avoir une forte tolérance aux pannes. Zookeeper, quant à lui, assure la gestion de la configuration distribuée, ainsi que la coordination des brokers et le suivi de l’état des services gravitants autour de Kafka.
Parmi ces services il existe deux grandes catégories d’application interagissant avec Kafka :
Kafka fournit des APIs complémentaires afin de mettre en œuvre des services dérivés :
Le log est l’abstraction de base au cœur du système Kafka.
Il ne s’agit pas ici de logs comme on l’entend lorsque l’on parle des logs d’un serveur Apache ou d’une application Web, mais plutôt d’une structure de données abstraite ayant les caractéristiques suivantes :
Jusqu’ici nous avons vu que les échanges reposaient de manière macroscopique sur une notion de topic. Plus concrètement, un topic correspond à une unité logique contenant des messages d’une même catégorie. Un message est composé d’une clé optionnelle et d’un contenu sous forme de tableau de bytes. L’utilisation de ce tableau permet de laisser la liberté sur le choix du format d’échange. Les plus utilisés étant Json et surtout Avro.
Au plus bas niveau, Kafka repose sur une structure de donné appelée Log pour stocker les topics sur disque. Un Log est un tableau de messages immuables ordonnés selon leur publication et ayant chacun un offset unique. Lors de la publication d’un message par un producer, chaque nouveau message est ajouté à la fin du tableau. Les messages sont répartis sur les partitions en se basant sur une clé.
Afin de limiter la contention lors des actions d’écriture et de lecture, un topic est décomposé en partitions assurant un maximum de parallélisme. Le nombre maximum de consommateur agissant en simultané est directement corrélé au maximum de partitions configurées.
De plus pour assurer la forte disponibilité de la plateforme, ces mêmes partitions sont copiées sur les différents brokers en se basant sur le nombre de réplication configuré. Si un broker tombe la plateforme peut continuer à fonctionner sans perturbation notable.
Ce fonctionnement des topics permet d’obtenir un système robuste dont la performance reste constante même lorsque l’on augmente le volume des données, alors même que les données sont sauvegardées sur disque. Kafka gagne ici aussi un grand avantage par rapport à des systèmes en mémoire. La persistance lui permet de traiter les données à la fois en temps réel et en mode batch.
Au niveau des consommateurs, Kafka repose sur un système à deux niveaux. Tout d’abord, chaque partition peut être lue par chaque Consumer group. Puis, les partitions sont réparties au sein d’un même groupe. Il est ainsi possible de lire plusieurs fois la même donnée pour des usages différents en créant plusieurs Consumer groups. Si les traitements des Consumer s’avèrent prendre un temps conséquent, il suffit de déployer une instance supplémentaire dans le groupe pour augmenter les performances.
Pour accèder au cli de Zookeeper :
$ /usr/hdp/current/zookeeper-client/bin/zkCli.sh -server $(hostname -f):2181 [zk: $(hostname -f):2181(CONNECTED) 1] [zk: $(hostname -f):2181(CONNECTED) 2] ls /
Extraire une proposition de ré-assignement de Topics Kafka :
/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $(hostname -f):2181 --topics-to-move-json-file topics-rassignement.json --broker-list "1010,1011" --generate
Le fichier générer, topics-rassignement.json, contiendra les topics avec les brokers donnée (–broker-list) sur lesquelles les partitions seront “transférer”
Execution du réassignement des topics suivants le fichier donnée (topics-rassignement.json) :
/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $(hostname -f):2181 --reassignment-json-file topics-rassignement.json --execute
Sortie générer à reprendre !
Check du reassignement des partitions :
/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $(hostname -f):2181 --reassignment-json-file topics-rassignement.json --verify
Lister les topics :
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $(hostname -f):2181 --list
Créer des topics :
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $(hostname -f):2181 --create --topic topic_TOTO --partition 1 --replication-factor 1
Utiliser un topic - Producer :
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list broker01.oowy.lan:6667 --topic topic_TOTO --security-protocol PLAINTEXTSASL
Attention:
Le port a utiliser est le 6667
Utiliser un topic - Consumer :
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $(hostname -f):2181 --topic topic_TOTO --security-protocol PLAINTEXTSASL --from-beginning
Supression d'un topic :
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $(hostname -f):2181 --delete --topic topic_TOTO
Description d'un topic :
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $(hostname -f):2181 --describe --topic topic_TOTO
Purge d'un topic :
Dans un premier temps, il faut modifier le temps de rétention à 100ms
/usr/hdp/current/kafka-broker/bin/kafka-configs.sh --zookeeper $(hostname -f):2181 --alter --entity-name topic_TOTO --entity-type topics --add-config retention.ms=100
Vérifier que les modifications ont bien été prises en compte :
/usr/hdp/current/kafka-broker/bin/kafka-configs.sh --zookeeper $(hostname -f):2181 --entity-name topic_TOTO --entity-type topics --describe
Note:
Il faut attendre 5 minutes pour que les modifications soient prises en compte et que le topic soit purgé
Remettre la configuration en état :
/usr/hdp/current/kafka-broker/bin/kafka-configs.sh --zookeeper $(hostname -f):2181 --alter --entity-name topic_TOTO --entity-type topics --delete-config retention.ms
Puis re-vérifier que les modifications soient bien prises en compte.
KAFKA OFFSET
Consume topic __consumer_offsets :
$ echo "exclude.internal.topics=false" > /tmp/consumer.config $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --zookeeper $(hostname -f):2181 --topic __consumer_offsets --security-protocol PLAINTEXTSASL --from-beginning
Verify Kafka Log Index :
$ /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/kafka/kafka-logs/topic_TOTO-0/000000000000.log --verify-index-only > /tmp/dump.out $ cat /tmp/dump.out
Souscription à un groupe de consumer :
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $(hostname -f):2181 --topic topic_TOTO --security-protocol SASL_PLAINTEXT --consumer.config config.TOTO
Liste des groupes de consumer :
/usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --zookeeper $(hostname -f):2181 --list
Check the offset of your consumers :
kafka-consumer-offset-checker --group consumer-group-TOTO --topic topic_TOTO --zookeeper $(hostname -f):2181