Im ersten Teil dieses Blogs ging es darum, möglichst einfach eine CSV-Datei nach Avro zu serialisieren und das Ergebnis in Kafka abzulegen, wobei das Schema in der Schema-Registry registriert werden sollte.
Dazu haben wir die CSV-Datei mittels csv.DictReader geöffnet. Dadurch haben wir schon ein Dictionary vorliegen, das wir verwenden können, um unseren Datensatz manuell zusammenzubauen. Nehmen wir an, dass in Spalte 1 ein float, in Spalte 2 ein int und in Spalte 3 ein Stringwert steht:
Data_set = {„column_name_1“: float(headers[„column_name_1“]), “column_name_2”: int(headers[“column_name_2”]), “column_name_3”: headers[“column_name_3”]
Damit haben wir wieder einen funktionierenden Producer.
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer import csv AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092', 'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80', } value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc') avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema) with open(/home/oliver/Dokumente/avro_daten/test.csv) as file: reader = csv.DictReader(file, delimiter=";") for row in reader: data_set = {„column_name_1“: float(row[„column_name_1“]), “column_name_2”: int(row[“column_name_2”]), “column_name_3”: row[“column_name_3”] avroProducer.produce(topic=topic, value=data_set) avroProducer.flush()
Leider muss dieses Skript für jedes einzelne Schema angepasst werden. Dabei haben wir alle notwendigen Informationen schon vorliegen.
Parsen wir also einfach das Schema, das liegt schließlich als JSON vor. Dazu verwenden wir die JSON-Bibliothek
import json
und parsen Feld für Feld:
data_type = dict() #Parse the schema to get the datatype of each column with open(schema_path) as json_file: json = json.load(json_file) for fields in json['fields']: if len(fields['type'][0]) > 1: data_type[fields['name']] = fields['type'][0] else: data_type[fields['name']] = fields['type']
Mittels len(fields['type'][0]) > 1 wird in dem Fall geprüft, ob ‚type‘ ein Array ist oder ein String. Das kommt daher, dass der Datentyp einer NOT-NULL-Spalte als String {"name":"column_01", "type": "string"}, der Datentyp einer NULL-Spalte aber als Liste angegeben wird: {"name":"column02", "type":["string", "null"]}
Jetzt können wir prüfen, welchen Datentyp eine Spalte hat, und entsprechend konvertieren. Dazu öffnen wir zunächst wieder wie gehabt unsere CSV-Datei:
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file: reader = csv.DictReader(file, delimiter=";")
Mittels reader.filenames können wir uns eine Liste aller Headereinträge zurückgeben lassen.
headers = reader.fieldnames
Und gehen dann Spalte für Spalte vor:
for row in reader: for header in headers: if str(headers[header]).lower() = ‘double’: row[header] = float(row[header]) #gegebenenfalls weitere Datentypen prüfen …
Das Ergebnis können wir dann wieder an Kafka übergeben:
avroProducer.produce(topic=topic, value=row) avroProducer.flush()
Das komplette Skript sieht also wie folgt aus:
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer import csv import json AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092', 'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80', } value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc') avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema) data_type = dict() #Parse the schema to get the datatype of each column with open(schema_path) as json_file: json = json.load(json_file) for fields in json['fields']: if len(fields['type'][0]) > 1: data_type[fields['name']] = fields['type'][0] else: data_type[fields['name']] = fields['type'] with open(/home/oliver/Dokumente/avro_daten/test.csv) as file: reader = csv.DictReader(file, delimiter=";") headers = reader.fieldnames for row in reader: for header in headers: if str(headers[header]).lower() = ‘double’: row[header] = float(row[header]) #gegebenenfalls weitere Datentypen prüfen … avroProducer.produce(topic=topic, value=data_set) avroProducer.flush()
Wie man sehen kann, ist das Python-Confluent-Kafka-Framework eine einfache und doch mächtige Möglichkeit, Daten nach Kafka zu serialisieren.