Apache Kafka

Premiers pas avec Apache Kafka et Python

Premiers pas avec Apache Kafka et Python
Dans cette leçon, nous verrons comment utiliser Apache Kafka avec Python et créer un exemple d'application à l'aide du client Python pour Apache Kafka.

Pour terminer cette leçon, vous devez avoir une installation active de Kafka sur votre machine. Lisez Installer Apache Kafka sur Ubuntu pour savoir comment faire cela.

Installation du client Python pour Apache Kafka

Avant de pouvoir commencer à travailler avec Apache Kafka dans le programme Python, nous devons installer le client Python pour Apache Kafka. Cela peut être fait en utilisant pépin (Index du paquet Python). Voici une commande pour y parvenir :

pip3 installer kafka-python

Ce sera une installation rapide sur le terminal :

Installation du client Python Kafka à l'aide de PIP

Maintenant que nous avons une installation active pour Apache Kafka et que nous avons également installé le client Python Kafka, nous sommes prêts à commencer à coder.

Faire un producteur

La première chose à avoir pour publier des messages sur Kafka est une application productrice qui peut envoyer des messages à des sujets dans Kafka.

Notez que les producteurs Kafka sont des producteurs de messages asynchrones. Cela signifie que les opérations effectuées lors de la publication d'un message sur la partition Kafka Topic sont non bloquantes. Pour garder les choses simples, nous écrirons un éditeur JSON simple pour cette leçon.

Pour commencer, créez une instance pour le producteur Kafka :

de kafka import KafkaProducteur
importer json
importer pprint
producteur = KafkaProducteur(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.décharges(v).encoder('utf-8'))

L'attribut bootstrap_servers informe sur l'hôte et le port du serveur Kafka. L'attribut value_serializer sert uniquement à la sérialisation JSON des valeurs JSON rencontrées.

Pour jouer avec Kafka Producer, essayons d'imprimer les métriques liées au Producer et au cluster Kafka :

métriques = producteur.métrique()
pprint.pprint(métriques)

Nous allons voir ce qui suit maintenant :

Kafka Mterics

Maintenant, essayons enfin d'envoyer un message à la file d'attente Kafka. Un simple objet JSON sera un bon exemple :

producteur.envoyer('linuxhint', 'sujet': 'kafka')

le astuce linux est la partition de sujet sur laquelle l'objet JSON sera envoyé sur. Lorsque vous exécutez le script, vous n'obtiendrez aucune sortie car le message est simplement envoyé à la partition de sujet. Il est temps d'écrire un consommateur pour que nous puissions tester notre application.

Faire un consommateur

Nous sommes maintenant prêts à établir une nouvelle connexion en tant qu'application grand public et à recevoir les messages du sujet Kafka. Commencez par créer une nouvelle instance pour le consommateur :

de kafka import KafkaConsumer
de kafka importer TopicPartition
print('Etablissement de la connexion.')
consommateur = KafkaConsumer(bootstrap_servers='localhost:9092')

Maintenant, attribuez un sujet à cette connexion et une valeur de décalage possible.

print('Affectation du sujet.')
consommateur.assign([SujetPartition('linuxhint', 2)])

Enfin, nous sommes prêts à imprimer le message :

print('Recevoir le message.')
pour le message dans le consommateur :
print("OFFSET: " + str(message[0])+ "\t MSG: " + str(message))

Grâce à cela, nous obtiendrons une liste de tous les messages publiés sur la partition Kafka Consumer Topic. La sortie de ce programme sera :

Consommateur Kafka

Juste pour une référence rapide, voici le script complet du producteur :

de kafka import KafkaProducteur
importer json
importer pprint
producteur = KafkaProducteur(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.décharges(v).encoder('utf-8'))
producteur.envoyer('linuxhint', 'sujet': 'kafka')
# métrique = producteur.métrique()
# impression.pprint(métriques)

Et voici le programme Consumer complet que nous avons utilisé :

de kafka import KafkaConsumer
de kafka importer TopicPartition
print('Etablissement de la connexion.')
consommateur = KafkaConsumer(bootstrap_servers='localhost:9092')
print('Affectation du sujet.')
consommateur.assign([SujetPartition('linuxhint', 2)])
print('Recevoir le message.')
pour le message dans le consommateur :
print("OFFSET: " + str(message[0])+ "\t MSG: " + str(message))

Conclusion

Dans cette leçon, nous avons vu comment installer et commencer à utiliser Apache Kafka dans nos programmes Python. Nous avons montré à quel point il est facile d'effectuer des tâches simples liées à Kafka en Python avec la démonstration du client Kafka pour Python.

Meilleurs jeux à jouer avec le suivi des mains
Oculus Quest a récemment introduit l'idée géniale du suivi manuel sans contrôleurs. Avec un nombre toujours croissant de jeux et d'activités qui exécu...
Comment afficher la superposition OSD dans les applications et jeux Linux en plein écran
Jouer à des jeux en plein écran ou utiliser des applications en mode plein écran sans distraction peut vous couper des informations système pertinente...
Top 5 des cartes de capture de jeu
Nous avons tous vu et aimé des gameplays en streaming sur YouTube. PewDiePie, Jakesepticye et Markiplier ne sont que quelques-uns des meilleurs joueur...