Zum Hauptinhalt springen

How-to: CSV nach Kafka mit Python und confluent_kafka (Teil 2)

Im ersten Teil dieses Blogs ging es darum, möglichst einfach eine CSV-Datei nach Avro zu serialisieren und das Ergebnis in Kafka abzulegen, wobei das Schema in der Schema-Registry registriert werden sollte.

Dazu haben wir die CSV-Datei mittels csv.DictReader geöffnet. Dadurch haben wir schon ein Dictionary vorliegen, das wir verwenden können, um unseren Datensatz manuell zusammenzubauen. Nehmen wir an, dass in Spalte 1 ein float, in Spalte 2 ein int und in Spalte 3 ein Stringwert steht:
 

Data_set = {„column_name_1“: float(headers[„column_name_1“]),
                     “column_name_2”: int(headers[“column_name_2”]),
                     “column_name_3”: headers[“column_name_3”]


Damit haben wir wieder einen funktionierenden Producer.
 

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:
    data_set = {„column_name_1“: float(row[„column_name_1“]),
                         “column_name_2”: int(row[“column_name_2”]),
                         “column_name_3”: row[“column_name_3”]

        avroProducer.produce(topic=topic, value=data_set)
        avroProducer.flush()


Leider muss dieses Skript für jedes einzelne Schema angepasst werden. Dabei haben wir alle notwendigen Informationen schon vorliegen.

Parsen wir also einfach das Schema, das liegt schließlich als JSON vor. Dazu verwenden wir die JSON-Bibliothek

import json


und parsen Feld für Feld:

data_type = dict()

#Parse the schema to get the datatype of each column
with open(schema_path) as json_file:
    json = json.load(json_file)
    for fields in json['fields']:
        if len(fields['type'][0]) > 1:
            data_type[fields['name']] = fields['type'][0]
        else:
            data_type[fields['name']] = fields['type']


Mittels len(fields['type'][0]) > 1 wird in dem Fall geprüft, ob ‚type‘ ein Array ist oder ein String. Das kommt daher, dass der Datentyp einer NOT-NULL-Spalte als String {"name":"column_01", "type": "string"}, der Datentyp einer NULL-Spalte aber als Liste angegeben wird: {"name":"column02", "type":["string", "null"]}


Jetzt können wir prüfen, welchen Datentyp eine Spalte hat, und entsprechend konvertieren. Dazu öffnen wir zunächst wieder wie gehabt unsere CSV-Datei:

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")


Mittels reader.filenames können wir uns eine Liste aller Headereinträge zurückgeben lassen.

headers = reader.fieldnames


Und gehen dann Spalte für Spalte vor:

for row in reader:
        for header in headers:
            if str(headers[header]).lower() = ‘double’:
                row[header] = float(row[header])
#gegebenenfalls weitere Datentypen prüfen …


Das Ergebnis können wir dann wieder an Kafka übergeben:

avroProducer.produce(topic=topic, value=row)
        avroProducer.flush()


Das komplette Skript sieht also wie folgt aus:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
import json

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)

data_type = dict()
#Parse the schema to get the datatype of each column
with open(schema_path) as json_file:
    json = json.load(json_file)
    for fields in json['fields']:
        if len(fields['type'][0]) > 1:
            data_type[fields['name']] = fields['type'][0]
        else:
            data_type[fields['name']] = fields['type']

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    headers = reader.fieldnames
    for row in reader:
         for header in headers:
             if str(headers[header]).lower() = ‘double’:
                 row[header] = float(row[header])
             #gegebenenfalls weitere Datentypen prüfen …

        avroProducer.produce(topic=topic, value=data_set)
avroProducer.flush()

 

Wie man sehen kann, ist das Python-Confluent-Kafka-Framework eine einfache und doch mächtige Möglichkeit, Daten nach Kafka zu serialisieren.