Skip to content Skip to sidebar Skip to footer

How To Produce A Tombstone Avro Record In Kafka Using Python?

my sink properties : { 'name': 'jdbc-oracle', 'config': { 'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector', 'tasks.max': '1', 'topics': 'orders',

Solution 1:

I assume you want to produce Avro message therefore you need to serialise your messages properly. I'll be using confluent-kafka-python library so if you don't already have it installed, just run

pip install confluent-kafka[avro]

And here's an example AvroConsumer that sends an Avro message with a null value:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "type":"record",
   "name":"myrecord",
   "fields":[
      {
         "name":"id",
         "type":[
            "null",
            "int"
         ],
         "default":null
      },
      {
         "name":"product",
         "type":[
            "null",
            "string"
         ],
         "default":null
      },
      {
         "name":"quantity",
         "type":[
            "null",
            "int"
         ],
         "default":null
      },
      {
         "name":"price",
         "type":[
            "null",
            "int"
         ],
         "default":null
      }
   ]
}
"""

key_schema_str = """
{
   "type":"record",
   "name":"key_schema",
   "fields":[
      {
         "name":"id",
         "type":"int"
      }
   ]
}
"""


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


if __name__ == '__main__':
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    #value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
    key = {"id": 1}


    avroProducer = AvroProducer({
        'bootstrap.servers': '10.0.0.0:9092',
        'on_delivery': delivery_report,
        'schema.registry.url': 'http://10.0.0.0:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

    avroProducer.produce(topic='orders', key=key)
    avroProducer.flush()

Solution 2:

You need to set in Avro Schema to be able to set Avro field to null, by adding null as one of the possible types of the field.

Take a look on example from Avro documentation:

{"type":"record","name":"yourRecord","fields":[{"name":"empId","type":"long"},// mandatory field{"name":"empName","type":["null","string"]}// optional field ]}

here empName is declared as in type as a null or string. which allows to set empName field to null.

Post a Comment for "How To Produce A Tombstone Avro Record In Kafka Using Python?"