Yandex Cloud
  • Services
  • Solutions
  • Why Yandex Cloud
  • Blog
  • Pricing
  • Documentation
  • Contact us
Get started
Language / Region
Yandex project
© 2023 Yandex.Cloud LLC
Yandex Managed Service for Apache Kafka®
  • Getting started
  • Step-by-step instructions
    • All instructions
    • Information about existing clusters
    • Creating clusters
    • Connecting to a cluster
    • Stopping and starting clusters
    • Upgrading the Apache Kafka® version
    • Changing cluster settings
    • Managing Apache Kafka® hosts
    • Working with topics and partitions
    • Managing Apache Kafka® users
    • Managing connectors
    • Viewing cluster logs
    • Deleting clusters
    • Monitoring the state of clusters and hosts
  • Practical guidelines
    • All tutorials
    • Setting up Kafka Connect to work with Managed Service for Apache Kafka®
    • Using data format schemas with Managed Service for Apache Kafka®
      • Overview
      • Working with the managed schema registry
      • Using Confluent Schema Registry with Managed Service for Apache Kafka®
    • Migrating databases from a third-party Apache Kafka® cluster
    • Moving data between Managed Service for Apache Kafka® clusters using Yandex Data Transfer
    • Delivering data from Managed Service for Apache Kafka® using Debezium
    • Delivering data from Yandex Managed Service for MySQL using Debezium
    • Delivering data from Managed Service for Apache Kafka® with Yandex Data Transfer
    • Delivering data to Managed Service for ClickHouse
    • Data delivery in ksqlDB
    • Delivering data to Yandex Managed Service for YDB using Yandex Data Transfer
  • Concepts
    • Relationships between service resources
    • Topics and partitions
    • Brokers
    • Producers and consumers
    • Managing data schemas
    • Host classes
    • Network in Managed Service for Apache Kafka®
    • Quotas and limits
    • Disk types
    • Connectors
    • Maintenance
    • Apache Kafka® settings
  • Access management
  • Pricing policy
  • API reference
    • Authentication in the API
    • gRPC
      • Overview
      • ClusterService
      • ConnectorService
      • ResourcePresetService
      • TopicService
      • UserService
      • OperationService
    • REST
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listLogs
        • listOperations
        • move
        • rescheduleMaintenance
        • start
        • stop
        • streamLogs
        • update
      • Connector
        • Overview
        • create
        • delete
        • get
        • list
        • pause
        • resume
        • update
      • ResourcePreset
        • Overview
        • get
        • list
      • Topic
        • Overview
        • create
        • delete
        • get
        • list
        • update
      • User
        • Overview
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
      • Operation
        • Overview
        • get
  • Revision history
  • Questions and answers
  1. Practical guidelines
  2. Using data format schemas with Managed Service for Apache Kafka®
  3. Working with the managed schema registry

Working with the managed schema registry

Written by
Yandex Cloud
  • Before you begin
  • Create producer and consumer scripts
  • Check that Managed Schema Registry runs correctly
  • Delete the resources you created

To use Managed Schema Registry with Managed Service for Apache Kafka®:

  1. Create the producer and consumer scripts on the local machine.
  2. Check that Managed Schema Registry runs correctly.
  3. Delete the resources you created.

Before you begin

  1. Create a Managed Service for Apache Kafka® cluster with any suitable configuration. When creating a cluster, enable Schema registry and Public access.

    1. Create a topic named messages for exchanging messages between the producer and the consumer.
    2. Create a user named user and grant it the rights for the messages topic:
      • ACCESS_ROLE_CONSUMER,
      • ACCESS_ROLE_PRODUCER.
  2. In the network hosting the Managed Service for Apache Kafka® cluster, create a VM with Ubuntu 20.04 and a public IP address.

  3. To allow traffic between the Managed Service for Apache Kafka® cluster and the virtual machine, configure security groups.

Create producer and consumer scripts

The above scripts send and receive messages in the messages topic as a key:meaning pair. In the example, data format schemas are described in Avro format.

Note

Python scripts are provided for demonstration. You can prepare and send data format schemas and the data itself by creating a similar script in another language.

  1. Connect to the virtual machine over SSH.

  2. Install the necessary Python packages:

    sudo apt-get update && \
    sudo pip3 install avro confluent_kafka
    
  3. To use an encrypted connection, install an SSL certificate.

    sudo mkdir -p /usr/share/ca-certificates && \
    sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
              -O /usr/share/ca-certificates/YandexCA.crt && \
    sudo chmod 655 /usr/share/ca-certificates/YandexCA.crt
    
  4. Create a Python script for the consumer.

    The script works as follows:

    1. Connect to the messages topic and Confluent Schema Registry.
    2. In a continuous cycle, read messages sent to the messages topic.
    3. When receiving a message, request the necessary schemas in Confluent Schema Registry to parse the message.
    4. Parse binary data from the message according to the schemas for the key and value and display the result on the screen.

    consumer.py

    #!/usr/bin/python3
    
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    
    c = AvroConsumer(
        {
            "bootstrap.servers": ','.join([
            "<FQDN of first broker host>:9091",
            ...
            "<FQDN of the Nth broker host>:9091",
            ]),
            "group.id": "avro-consumer",
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/share/ca-certificates/YandexCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<password of the user named user>",
            "schema.registry.url": "https://<FQDN or IP address of the Managed Schema Registry server>:443",
            "schema.registry.basic.auth.credentials.source": "SASL_INHERIT",
            "auto.offset.reset": "earliest"
        }
    )
    
    c.subscribe(["messages"])
    
    while True:
        try:
            msg = c.poll(10)
    
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
    
        if msg is None:
            continue
    
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
    
        print(msg.value())
    
    c.close()
    
  5. Create a Python script for the producer.

    The script works as follows:

    1. Connect to the schema registry and pass to it the data format schemas for the key and value.
    2. Generate the key and value based on the passed schemas.
    3. Send a message consisting of the key:meaning pair to the messages topic. The schema versions are added to the message automatically.

    producer.py

    #!/usr/bin/python3
    
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
        "namespace": "my.test",
        "name": "value",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    key_schema_str = """
    {
        "namespace": "my.test",
        "name": "key",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush()."""
        if err is not None:
            print("Message delivery failed: {}".format(err))
        else:
            print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer(
        {
            "bootstrap.servers": ','.join([
                "<FQDN of the 1st broker host>:9091",
                ...
                "<FQDN of the Nth broker host>:9091",
            ]),
            "security.protocol": 'SASL_SSL',
            "ssl.ca.location": '/usr/share/ca-certificates/YandexCA.crt',
            "sasl.mechanism": 'SCRAM-SHA-512',
            "sasl.username": 'user',
            "sasl.password": '<password of the user named user>',
            "on_delivery": delivery_report,
            "schema.registry.basic.auth.credentials.source": 'SASL_INHERIT',
            "schema.registry.url": 'https://<FQDN or IP address of the Managed Schema Registry server>:443'
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema
    )
    
    avroProducer.produce(topic="messages", key=key, value=value)
    avroProducer.flush()
    

Check that Managed Schema Registry runs correctly

  1. Start the consumer:

    python3 ./consumer.py
    
  2. In a separate terminal, start the producer:

    python3 ./producer.py
    
  3. Make sure that the data sent by the producer is received and correctly interpreted by the consumer:

    {'name': 'Value'}
    

Delete the resources you created

If you no longer need the created resources, delete the VM instance and the Managed Service for Apache Kafka® cluster.

If you have reserved a public static IP address for the created VM, delete it.

Was the article helpful?

Language / Region
Yandex project
© 2023 Yandex.Cloud LLC
In this article:
  • Before you begin
  • Create producer and consumer scripts
  • Check that Managed Schema Registry runs correctly
  • Delete the resources you created