Zum Hauptinhalt springen

How-to: CSV nach Kafka mit Python und confluent_kafka (Teil 1)

Auch in modernen Umgebungen ist CSV noch ein häufig anzutreffendes Austauschformat, da viele bestehende Systeme mit moderneren Alternativen nicht umgehen können. Zur Weiterverarbeitung in einer Big-Data-Umgebung sind jedoch andere Formate besser geeignet. Im Zusammenhang mit Kafka ist das vor allem Avro. Avro bietet ein platzsparendes Datenformat mit vielen Features, in dem das Datenschema ebenfalls mit übertragen wird. Um die Handhabung zu verbessern, kann zudem das Schema in einem Schema-Repository registriert werden.

Schema-Repository mit Python

Will man das mit Python erledigen, so bietet sich die Bibliothek python-confluent-kafka der Kafka-Entwickler Confluent an.

Zunächst muss die Bibliothek python-confluent-kafka installiert werden. Dies schlägt unter Windows fehl, da eine Abhängigkeit nach librdkafka nicht aufgelöst werden kann. Offiziell unterstützt confluent_kafka auch nur macOS und Linux.
 

Entscheiden wir uns für Debian, kann python-confluent-kafka einfach aus dem Debian-Repository installiert werden. Dafür reicht ein:

apt install python-confluent-kafka


Im Python-Skript müssen wir zunächst die benötigten Bibliotheken importieren:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv


Danach können wir das Avro-Schema laden

value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')


und den Avro-Producer konfigurieren:

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }


Der zweite Eintrag dient dazu, die Adresse der Schema-Registry bekanntzumachen, so dass später das Schema registriert werden kann.


Mit dieser Konfiguration können wir jetzt unseren Producer erzeugen:

avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)


Jetzt sind wir so weit, dass wir die CSV-Datei öffnen können:

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:

row enthält nun ein Dictionary der Form {‚Headername‘: ‚Spalteninhalt‘}


Dieses können wir direkt dem Avro-Producer übergeben:

       avroProducer.produce(topic='mein_topic', value=row)
       avroProducer.flush()


Der Befehl schreibt den Inhalt von row im Avro-Format nach Kafka und registriert das Schema im Repository. Der verwendete Schemaname ist dabei immer <TOPIC_NAME>-data, in diesem Fall also „mein_topic-data“.

Das Schema wird dabei auch überprüft. Sollte also das in value_schema übergebene Avro-Schema nicht zu den Daten im verwendeten CSV passen, so wird ein entsprechender Fehler produziert.

Das heißt aber leider auch, dass dieses Skript nur funktioniert, wenn alle Spalten des Schemas aus Strings bestehen.


Das komplette Skript sieht wie folgt aus:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:
        avroProducer.produce(topic=topic, value=row)
        avroProducer.flush()

 

Das Konvertieren der Daten in andere Datentypen behandele ich im 2. Teil.