Even in modern environments, CSV is still a frequently encountered exchange format because many existing systems cannot deal with more modern alternatives. However, other formats are better suited to further processing in a big-data environment. This applies, in particular, to Avro in conjunction with Kafka. Avro offers a space-saving data format with many features, in which the data schema is also transferred. To improve handling, the schema can also be registered in a related repository.
Scheme-Repository with Python
If this needs to be accomplished using Python, then the library python-confluent-kafka from the Kafka developer Confluent lends itself.
First the python-confluent-kafka library must be installed. This fails under Windows, because a dependency associated with librdkafka cannot be resolved. confluent_kafka officially also only supports OSX and Linux.
If we opt for Debian, python-confluent-kafka can be easily installed from the Debian repository. Sufficient for this purpose is:
apt install python-confluent-kafka
In the Python script, we must first import the required libraries:
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer import csv
After that, we can load the Avro schema,
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
and configure the Avro Producer:
AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092', 'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80', }
The second entry is used to indicate the address of the schema registry, so that the schema can be registered later.
With this configuration, we can create our Producer:
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
Now we are ready to open the CSV file:
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file: reader = csv.DictReader(file, delimiter=";") for row in reader:
"row" now contains a dictionary of the form {'Header name':'Column contents'} .
We can pass this directly to Avro Producer:
avroProducer.produce(topic='mein_topic', value=row) avroProducer.flush()
This command writes the contents of "row" in Avro format to Kafka and registers the schema in the repository. The employed schema name is always <TOPIC_NAME>-data, i.e. "my_topic-data" here.
The schema is also checked in this process. If the Avro schema passed in value_schema does not match the data in the employed CSV, a corresponding error is indicated.
This unfortunately also means that this script will only work if all columns of the schema consist of strings.
The entire script appears as follows:
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer import csv AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092', 'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80', } value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc') avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema) with open(/home/oliver/Dokumente/avro_daten/test.csv) as file: reader = csv.DictReader(file, delimiter=";") for row in reader: avroProducer.produce(topic=topic, value=row) avroProducer.flush()
Part 2 deals with conversion of the data into other data types.