Schema-Repository mit Python
Will man das mit Python erledigen, so bietet sich die Bibliothek python-confluent-kafka der Kafka-Entwickler Confluent an.
Zunächst muss die Bibliothek python-confluent-kafka installiert werden. Dies schlägt unter Windows fehl, da eine Abhängigkeit nach librdkafka nicht aufgelöst werden kann. Offiziell unterstützt confluent_kafka auch nur macOS und Linux.
Entscheiden wir uns für Debian, kann python-confluent-kafka einfach aus dem Debian-Repository installiert werden. Dafür reicht ein:
apt install python-confluent-kafka
Im Python-Skript müssen wir zunächst die benötigten Bibliotheken importieren:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
Danach können wir das Avro-Schema laden
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
und den Avro-Producer konfigurieren:
AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
}
Der zweite Eintrag dient dazu, die Adresse der Schema-Registry bekanntzumachen, so dass später das Schema registriert werden kann.
Mit dieser Konfiguration können wir jetzt unseren Producer erzeugen:
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
Jetzt sind wir so weit, dass wir die CSV-Datei öffnen können:
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
row enthält nun ein Dictionary der Form {‚Headername‘: ‚Spalteninhalt‘}
Dieses können wir direkt dem Avro-Producer übergeben:
avroProducer.produce(topic='mein_topic', value=row)
avroProducer.flush()
Der Befehl schreibt den Inhalt von row im Avro-Format nach Kafka und registriert das Schema im Repository. Der verwendete Schemaname ist dabei immer <TOPIC_NAME>-data, in diesem Fall also „mein_topic-data“.
Das Schema wird dabei auch überprüft. Sollte also das in value_schema übergebene Avro-Schema nicht zu den Daten im verwendeten CSV passen, so wird ein entsprechender Fehler produziert.
Das heißt aber leider auch, dass dieses Skript nur funktioniert, wenn alle Spalten des Schemas aus Strings bestehen.
Das komplette Skript sieht wie folgt aus:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
}
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
avroProducer.produce(topic=topic, value=row)
avroProducer.flush()
Das Konvertieren der Daten in andere Datentypen behandele ich im 2. Teil.