How-To: CSV to Kafka With Python and confluent_kafka (Part 1)

How-To: CSV to Kafka With Python and confluent_kafka (Part 1)

Even in modern environments, CSV is still a frequently encountered exchange format because many existing systems cannot deal with more modern alternatives. However, other formats are better suited to further processing in a big-data environment. This applies, in particular, to Avro in conjunction with Kafka. Avro offers a space-saving data format with many features, in which the data schema is also transferred. To improve handling, the schema can also be registered in a related repository.

Table of Contents

Scheme-Repository with Python

If this needs to be accomplished using Python, then the library python-confluent-kafka from the Kafka developer Confluent lends itself.

First the python-confluent-kafka library must be installed. This fails under Windows, because a dependency associated with librdkafka cannot be resolved. confluent_kafka officially also only supports OSX and Linux.

If we opt for Debian, python-confluent-kafka can be easily installed from the Debian repository. Sufficient for this purpose is:

apt install python-confluent-kafka


In the Python script, we must first import the required libraries:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv


After that, we can load the Avro schema,

value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')


and configure the Avro Producer:

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }


The second entry is used to indicate the address of the schema registry, so that the schema can be registered later.


With this configuration, we can create our Producer:

avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)


Now we are ready to open the CSV file:

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:


"row" now contains a dictionary of the form {'Header name':'Column contents'} .

We can pass this directly to Avro Producer:

 avroProducer.produce(topic='mein_topic', value=row)
       avroProducer.flush()


This command writes the contents of "row" in Avro format to Kafka and registers the schema in the repository. The employed schema name is always <TOPIC_NAME>-data, i.e. "my_topic-data" here.

The schema is also checked in this process. If the Avro schema passed in value_schema does not match the data in the employed CSV, a corresponding error is indicated.

This unfortunately also means that this script will only work if all columns of the schema consist of strings.

The entire script appears as follows:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:
        avroProducer.produce(topic=topic, value=row)
        avroProducer.flush()

Part 2 deals with conversion of the data into other data types.

Want To Learn More? Contact Us!

Pia Ehrnlechner

Your contact person

Pia Ehrnlechner

Domain Lead Data Platform & Data Management

Helene Fuchs

Your contact person

Helene Fuchs

Domain Lead Data Platform & Data Management

Related Posts

chevron left icon
Previous post
Next post
chevron right icon

No previous post

No next post