Scheme-Repository with Python
If this needs to be accomplished using Python, then the library python-confluent-kafka from the Kafka developer Confluent lends itself.
First the python-confluent-kafka library must be installed. This fails under Windows, because a dependency associated with librdkafka cannot be resolved. confluent_kafka officially also only supports OSX and Linux.
If we opt for Debian, python-confluent-kafka can be easily installed from the Debian repository. Sufficient for this purpose is:
apt install python-confluent-kafka
In the Python script, we must first import the required libraries:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
After that, we can load the Avro schema,
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
and configure the Avro Producer:
AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
}
The second entry is used to indicate the address of the schema registry, so that the schema can be registered later.
With this configuration, we can create our Producer:
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
Now we are ready to open the CSV file:
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
"row" now contains a dictionary of the form {'Header name':'Column contents'} .
We can pass this directly to Avro Producer:
avroProducer.produce(topic='mein_topic', value=row)
avroProducer.flush()
This command writes the contents of "row" in Avro format to Kafka and registers the schema in the repository. The employed schema name is always <TOPIC_NAME>-data, i.e. "my_topic-data" here.
The schema is also checked in this process. If the Avro schema passed in value_schema does not match the data in the employed CSV, a corresponding error is indicated.
This unfortunately also means that this script will only work if all columns of the schema consist of strings.
The entire script appears as follows:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
}
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
avroProducer.produce(topic=topic, value=row)
avroProducer.flush()
Part 2 deals with conversion of the data into other data types.