Yandex.Cloud
  • Services
  • Why Yandex.Cloud
  • Pricing
  • Documentation
  • Contact us
Get started
Yandex Managed Service for Apache Kafka®
  • Getting started
  • Step-by-step instructions
    • All instructions
    • Information about existing clusters
    • Creating clusters
    • Connecting to clusters
    • Stopping and starting clusters
    • Changing cluster settings
    • Working with topics and partitions
    • Managing Kafka accounts
    • Deleting clusters
  • Concepts
    • Relationship between service resources
    • Topics and partitions
    • Brokers
    • Producers and consumers
    • Host classes
    • Network in Managed Service for Apache Kafka®
    • Quotas and limits
    • Storage types
  • Access management
  • Pricing policy
  • API reference
    • Authentication in the API
    • gRPC
      • Overview
      • ClusterService
      • ResourcePresetService
      • TopicService
      • UserService
      • OperationService
    • REST
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listLogs
        • listOperations
        • move
        • start
        • stop
        • streamLogs
        • update
      • ResourcePreset
        • Overview
        • get
        • list
      • Topic
        • Overview
        • create
        • delete
        • get
        • list
        • update
      • User
        • Overview
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
      • Operation
        • Overview
        • get
  • Questions and answers
  1. Step-by-step instructions
  2. Connecting to clusters

Connecting to topics in an Apache Kafka® cluster

  • Get an SSL certificate
  • Sample connection strings

You can connect to Managed Service for Apache Kafka® cluster hosts:

  • Over the internet if you configured public access for the cluster when creating it. You can only connect to such clusters over an SSL connection.
  • From Yandex.Cloud virtual machines located in the same cloud network. If the cluster isn't publicly available, you don't need to use an SSL connection to connect to such VMs.

You can connect to the Apache Kafka® cluster both with encryption (SASL_SSL, port 9091) and without it (SASL_PLAINTEXT, port 9092).

To connect to an Apache Kafka® cluster:

  1. Create accounts for clients (producers and consumers) with access to the necessary topics.
  2. Connect the clients to the cluster:
    • Producers using the Kafka Producer API.
    • Consumers using the Kafka Consumer API.

There are ready-made Apache Kafka® API implementations for most popular programming languages. See code examples for connecting to a cluster in Sample connection strings.

Get an SSL certificate

To use an encrypted SSL connection, you need to get an SSL certificate:

sudo mkdir -p /usr/local/share/ca-certificates/Yandex && \
sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" -O /usr/local/share/ca-certificates/Yandex/YandexCA.crt && \
sudo chmod 655 /usr/local/share/ca-certificates/Yandex/YandexCA.crt

Sample connection strings

Examples were tested in the following environment:

  • Virtual machine in Yandex.Cloud running Ubuntu 20.04 LTS.
  • Bash: 5.0.16.
  • Python: 3.8.2; pip3: 20.0.2.
  • OpenJDK: 11.0.8; Maven: 3.6.3.
  • Go: 1.13.8.

Before connecting to cluster hosts over an SSL connection, prepare a certificate. In the examples below, it is assumed that the YandexCA.crt certificate is located in the /usr/local/share/ca-certificates/Yandex/ folder.

To view an example of the command with the host FQDN filled in, open the cluster page in the management console and click Connect.

Bash
Bash (SSL)
Python
Python (SSL)
Java
Java (SSL)
Go
Go (SSL)

To connect to an Apache Kafka® cluster from the command line, use kafkacat, an open source application that can work as a universal data producer or consumer. Read more in the documentation.

Before connecting, install the dependencies:

sudo apt update && sudo apt install -y kafkacat

To send a message to a topic, run the command:

echo "test message" | kafkacat -P  \
       -b <FQDN of the broker>:9092 \
       -t <topic name> \
       -k key \
       -X security.protocol=SASL_PLAINTEXT \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username=<producer username> \
       -X sasl.password=<producer password> -Z

To get messages from a topic, run the command:

kafkacat -C \
        -b <FQDN of the broker>:9092 \
        -t <topic name> \
        -X security.protocol=SASL_PLAINTEXT \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=<consumer username> \
        -X sasl.password=<consumer password> -Z -K:

To connect to an Apache Kafka® cluster from the command line, use kafkacat, an open source application that can work as a universal data producer or consumer. Read more in the documentation.

Before connecting, install the dependencies:

sudo apt update && sudo apt install -y kafkacat

To send a message to a topic, run the command:

echo "test message" | kafkacat -P  \
    -b <broker FQDN>:9091 \
    -t <topic name> \
    -k key \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X sasl.username=<producer username> \
    -X sasl.password=<producer password> \
    -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z   

To get messages from a topic, run the command:

kafkacat -C  \
    -b <broker FQDN>:9091 \
    -t <topic name> \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X sasl.username=<consumer username> \
    -X sasl.password=<consumer password> \
    -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:   

Before connecting, install the dependencies:

sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c

Example code for delivering a message to a topic:

producer.py

from kafka import KafkaProducer

producer = KafkaProducer(
  bootstrap_servers='<FQDN of the broker host>:9092',
  security_protocol="SASL_PLAINTEXT",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<producer password>',
  sasl_plain_username='<producer username>')

producer.send('<topic name>', b'test message', b'key')
producer.flush()
producer.close()

Code example for getting messages from a topic:

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('<topic name>',
  bootstrap_servers='<FQDN of the broker>:9092',
  security_protocol="SASL_PLAINTEXT",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<consumer password>',
  sasl_plain_username='<consumer name>')

print("ready")

for msg in consumer:
  print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))

Running applications:

python3 producer.py
python3 consumer.py

Before connecting, install the dependencies:

sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c

Example code for delivering a message to a topic:

producer.py

from kafka import KafkaProducer

producer = KafkaProducer(
  bootstrap_servers='<FQDN of the broker host>:9091',
  security_protocol="SASL_SSL",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<producer password>',
  sasl_plain_username='<producer username>',
  ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexCA.crt")

producer.send('<topic name>', b'test message', b'key')
producer.flush()
producer.close()

Code example for getting messages from a topic:

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('<topic name>',
  bootstrap_servers='<FQDN of the broker>:9091',
  security_protocol="SASL_SSL",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<consumer password>',
  sasl_plain_username='<consumer name>',
  ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexCA.crt")

print("ready")

for msg in consumer:
  print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))

Running applications:

python3 producer.py
python3 consumer.py

Before connecting:

  1. Install the dependencies:

    sudo apt update && sudo apt install -y default-jdk maven
    
  2. Create a folder for the Maven project:

    cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
    
  3. Create a configuration file for Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Up-to-date versions of dependencies for Maven:

    • kafka-clients.
    • jackson-databind.
    • slf4j-simple.
  4. Copy pom.xml to the directories of the producer application and consumer application:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    

Example code for delivering messages to a topic:

producer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;

public class App {

  public static void main(String[] args) {

    int MSG_COUNT = 5;

    String HOST = "<FQDN of the broker>:9092";
    String TOPIC = "<topic name>";
    String USER = "<producer name>";
    String PASS = "<producer password>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String KEY = "key";

    String serializer = StringSerializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "all");
    props.put("key.serializer", serializer);
    props.put("value.serializer", serializer);
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);

    Producer<String, String> producer = new KafkaProducer<>(props);

    try {
     for (int i = 1; i <= MSG_COUNT; i++){
       producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
       System.out.println("Test message " + i);
      }
     producer.flush();
     producer.close();
    } catch (Exception ex) {
        System.out.println(ex);
        producer.close();
    }
  }
}

Code example for getting messages from a topic:

consumer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;

public class App {

  public static void main(String[] args) {

    String HOST = "<FQDN of the broker>:9092";
    String TOPIC = "<topic name>";
    String USER = "<consumer name>";
    String PASS = "<consumer password>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String GROUP = "demo";

    String deserializer = StringDeserializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("group.id", GROUP);
    props.put("key.deserializer", deserializer);
    props.put("value.deserializer", deserializer);
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(new String[] {TOPIC}));

    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.key() + ":" + record.value());
      }
    }
  }
}

Building applications:

cd ~/project/producer && mvn clean package && \
cd ~/project/consumer && mvn clean package 

Running applications:

java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar

Before connecting:

  1. Install the dependencies:

    sudo apt update && sudo apt install -y default-jdk maven
    
  2. Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Make sure to set the password using the -storepass parameter for additional storage protection:

    cd /etc/security && \
    sudo keytool -importcert -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
           -keystore ssl -storepass <certificate store password> \
           --noprompt
    
  3. Create a folder for the Maven project:

    cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
    
  4. Create a configuration file for Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Up-to-date versions of dependencies for Maven:

    • kafka-clients;
    • jackson-databind;
    • slf4j-simple.
  5. Copy pom.xml to the directories of the producer application and consumer application:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    

Example code for delivering messages to a topic:

producer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;

public class App {

  public static void main(String[] args) {

    int MSG_COUNT = 5;

    String HOST = "<FQDN of the broker>:9091";
    String TOPIC = "<topic name>";
    String USER = "<producer name>";
    String PASS = "<producer password>";
    String TS_FILE = "/etc/security/ssl";
    String TS_PASS = "<certificate store password>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String KEY = "key";

    String serializer = StringSerializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "all");
    props.put("key.serializer", serializer);
    props.put("value.serializer", serializer);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);
    props.put("ssl.truststore.location", TS_FILE);
    props.put("ssl.truststore.password", TS_PASS);

    Producer<String, String> producer = new KafkaProducer<>(props);

    try {
     for (int i = 1; i <= MSG_COUNT; i++){
       producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
       System.out.println("Test message " + i);
      }
     producer.flush();
     producer.close();
    } catch (Exception ex) {
        System.out.println(ex);
        producer.close();
    }
  }
}

Code example for getting messages from a topic:

consumer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;

public class App {

  public static void main(String[] args) {

    String HOST = "<FQDN of the broker>:9091";
    String TOPIC = "<topic name>";
    String USER = "<consumer name>";
    String PASS = "<consumer password>";
    String TS_FILE = "/etc/security/ssl"; 
    String TS_PASS = "<certificate store password>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String GROUP = "demo";

    String deserializer = StringDeserializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("group.id", GROUP);
    props.put("key.deserializer", deserializer);
    props.put("value.deserializer", deserializer);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);
    props.put("ssl.truststore.location", TS_FILE);
    props.put("ssl.truststore.password", TS_PASS);

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(new String[] {TOPIC}));

    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.key() + ":" + record.value());
      }
    }
  }
}

Building applications:

cd ~/project/producer && mvn clean package && \
cd ~/project/consumer && mvn clean package 

Running applications:

java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar

Before connecting:

  1. Install the dependencies:

    sudo apt update && sudo apt install -y golang git && \
    go get github.com/Shopify/sarama && \
    go get github.com/xdg/scram
    
  2. Create a directory for the project:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Create the scram.go file with the code for running SCRAM. This code is the same for the producer application and consumer application:

    scram.go
    package main
    
    import (
     	"crypto/sha256"
     	"crypto/sha512"
     	"hash"
    
     	"github.com/xdg/scram"
    )
    
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
    
    type XDGSCRAMClient struct {
       *scram.Client
     	*scram.ClientConversation
     	scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
     	x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
     	if err != nil {
     		return err
     	}
     	x.ClientConversation = x.Client.NewConversation()
     	return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
     	response, err = x.ClientConversation.Step(challenge)
     	return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
     	return x.ClientConversation.Done()
    }
    
  4. Copy scram.go to the directory of the producer application and the consumer application:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    

Example code for delivering a message to a topic:

producer/main.go

package main

import (
      "fmt"
      "os"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN of the broker host>:9092"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Producer.Return.Successes = true
      conf.Version = sarama.V0_10_0_0
      conf.ClientID = "sasl_scram_client"
      conf.Net.SASL.Enable = true
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.User = "<producer username>"
      conf.Net.SASL.Password = "<producer password>"
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Couldn't create producer: ", err.Error())
              os.Exit(0)
      }
      publish("test message", syncProducer)

}

func publish(message string, producer sarama.SyncProducer) {
  // publish sync
  msg := &sarama.ProducerMessage {
      Topic: "<topic name>",
      Value: sarama.StringEncoder(message),
  }
  p, o, err := producer.SendMessage(msg)
  if err != nil {
      fmt.Println("Error publish: ", err.Error())
  }

  fmt.Println("Partition: ", p)
  fmt.Println("Offset: ", o)
}

Code example for getting messages from a topic:

consumer/main.go

package main

import (
      "fmt"
      "os"
      "os/signal"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN of the broker host>:9092"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Version = sarama.V0_10_0_0
      conf.Consumer.Return.Errors = true
      conf.ClientID = "sasl_scram_client"
      conf.Metadata.Full = true
      conf.Net.SASL.Enable = true
      conf.Net.SASL.User =  "<consumer name>"
      conf.Net.SASL.Password = "<consumer password>"
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      master, err := sarama.NewConsumer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Coulnd't create consumer: ", err.Error())
              os.Exit(1)
      }

      defer func() {
              if err := master.Close(); err != nil {
                      panic(err)
              }
      }()

      topic := "<topic name>"

      consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
      if err != nil {
              panic(err)
      }

      signals := make(chan os.Signal, 1)
      signal.Notify(signals, os.Interrupt)

      // Count how many message processed
      msgCount := 0

      // Get signal for finish
      doneCh := make(chan struct{})
      go func() {
              for {
                      select {
                      case err := <-consumer.Errors():
                              fmt.Println(err)
                      case msg := <-consumer.Messages():
                              msgCount++
                              fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                      case <-signals:
                              fmt.Println("Interrupt is detected")
                              doneCh <- struct{}{}
                      }
              }
      }()

      <-doneCh
      fmt.Println("Processed", msgCount, "messages")
}

Building applications:

cd ~/go-project/producer && go build && \
cd ~/go-project/consumer && go build 

Running applications:

~/go-project/producer/producer
~/go-project/consumer/consumer

Before connecting:

  1. Install the dependencies:

    sudo apt update && sudo apt install -y golang git && \
    go get github.com/Shopify/sarama && \
    go get github.com/xdg/scram
    
  2. Create a directory for the project:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Create the scram.go file with the code for running SCRAM. This code is the same for the producer application and consumer application:

    scram.go
    package main
    
    import (
     	"crypto/sha256"
     	"crypto/sha512"
     	"hash"
    
     	"github.com/xdg/scram"
    )
    
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
    
    type XDGSCRAMClient struct {
       *scram.Client
     	*scram.ClientConversation
     	scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
     	x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
     	if err != nil {
     		return err
     	}
     	x.ClientConversation = x.Client.NewConversation()
     	return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
     	response, err = x.ClientConversation.Step(challenge)
     	return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
     	return x.ClientConversation.Done()
    }
    
  4. Copy scram.go to the directory of the producer application and the consumer application:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    

Example code for delivering a message to a topic:

producer/main.go

package main

import (
      "fmt"
      "crypto/tls"
      "crypto/x509"
      "io/ioutil"
      "os"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN of the broker host>:9091"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Producer.Return.Successes = true
      conf.Version = sarama.V0_10_0_0
      conf.ClientID = "sasl_scram_client"
      conf.Net.SASL.Enable = true
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.User = "<producer username>"
      conf.Net.SASL.Password = "<producer password>"
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      certs := x509.NewCertPool()
      pemPath := "/usr/local/share/ca-certificates/Yandex/YandexCA.crt"
      pemData, err := ioutil.ReadFile(pemPath)
      if err != nil {
              fmt.Println("Couldn't load cert: ", err.Error())
          // handle the error
      }
      certs.AppendCertsFromPEM(pemData)

      conf.Net.TLS.Enable = true
      conf.Net.TLS.Config = &tls.Config{
        InsecureSkipVerify: true,
        RootCAs: certs,
      }

      syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Couldn't create producer: ", err.Error())
              os.Exit(0)
      }
      publish("test message", syncProducer)

}

func publish(message string, producer sarama.SyncProducer) {
  // publish sync
  msg := &sarama.ProducerMessage {
      Topic: "<topic name>",
      Value: sarama.StringEncoder(message),
  }
  p, o, err := producer.SendMessage(msg)
  if err != nil {
      fmt.Println("Error publish: ", err.Error())
  }

  fmt.Println("Partition: ", p)
  fmt.Println("Offset: ", o)
}

Code example for getting messages from a topic:

consumer/main.go

package main

import (
      "fmt"
      "crypto/tls"
      "crypto/x509"
      "io/ioutil"
      "os"
      "os/signal"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN of the broker host>:9091"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Version = sarama.V0_10_0_0
      conf.Consumer.Return.Errors = true
      conf.ClientID = "sasl_scram_client"
      conf.Metadata.Full = true
      conf.Net.SASL.Enable = true
      conf.Net.SASL.User =  "<consumer name>"
      conf.Net.SASL.Password = "<consumer password>"
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      certs := x509.NewCertPool()
      pemPath := "/usr/local/share/ca-certificates/Yandex/YandexCA.crt"
      pemData, err := ioutil.ReadFile(pemPath)
      if err != nil {
          fmt.Println("Couldn't load cert: ", err.Error())
              // handle the error
      }
      certs.AppendCertsFromPEM(pemData)

      conf.Net.TLS.Enable = true
      conf.Net.TLS.Config = &tls.Config{
                InsecureSkipVerify: true,
                  RootCAs: certs,
      }

      master, err := sarama.NewConsumer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Coulnd't create consumer: ", err.Error())
              os.Exit(1)
      }

      defer func() {
              if err := master.Close(); err != nil {
                      panic(err)
              }
      }()

      topic := "<topic name>"

      consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
      if err != nil {
              panic(err)
      }

      signals := make(chan os.Signal, 1)
      signal.Notify(signals, os.Interrupt)

      // Count how many message processed
      msgCount := 0

      // Get signal for finish
      doneCh := make(chan struct{})
      go func() {
              for {
                      select {
                      case err := <-consumer.Errors():
                              fmt.Println(err)
                      case msg := <-consumer.Messages():
                              msgCount++
                              fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                      case <-signals:
                              fmt.Println("Interrupt is detected")
                              doneCh <- struct{}{}
                      }
              }
      }()

      <-doneCh
      fmt.Println("Processed", msgCount, "messages")
}

Building applications:

cd ~/go-project/producer && go build && \
cd ~/go-project/consumer && go build 

Running applications:

~/go-project/producer/producer
~/go-project/consumer/consumer

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more test message messages with the key key to the Apache Kafka® topic. The consumer application displays messages sent to the topic.

In this article:
  • Get an SSL certificate
  • Sample connection strings
Language
Careers
Privacy policy
Terms of use
© 2021 Yandex.Cloud LLC