Connecting to topics in an Apache Kafka® cluster
You can connect to Managed Service for Apache Kafka® cluster hosts:
-
Over the internet if you configured public access for the cluster when creating it. You can only connect to this type of cluster using an SSL connection.
-
From Yandex Cloud virtual machines located in the same cloud network. If the cluster is not publicly available, you do not need to use an SSL connection to connect to such VMs.
You can connect to the Apache Kafka® cluster both with encryption (SASL_SSL
, port 9091) and without it (SASL_PLAINTEXT
, port 9092).
To connect to an Apache Kafka® cluster:
- Create users for clients (producers and consumers) with access to the required topics.
- Connect the clients to the cluster:
- Producers using the Kafka Producer API
. - Consumers using the Kafka Consumer API
.
- Producers using the Kafka Producer API
There are ready-made Apache Kafka® API implementations for most popular programming languages. See Sample connection strings for an example cluster connection code.
Configuring security groups
To connect to a cluster, security groups must include rules allowing traffic from certain ports, IP addresses, or from other security groups.
Rule settings depend on the connection method you select:
Configure all security groups in the cluster to allow incoming traffic on port 9091 from any IP address. To do this, create the following rule for incoming traffic:
- Port range:
9091
- Protocol:
TCP
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
To allow connections to Managed Schema Registry, add a rule for incoming traffic:
- Port range:
443
- Protocol:
TCP
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
-
Configure all security groups in the cluster to allow incoming traffic from the security group where the VM is located on ports 9091 and 9092. To do this, create the following rule for incoming traffic in these groups:
- Port range:
9091-9092
. - Protocol:
TCP
. - Source:
Security group
. - Security group: If your cluster and VM are in the same security group, select
Current
(Self
) as the value. Otherwise, specify the VM security group.
To allow connections to Managed Schema Registry, add a rule for incoming traffic:
- Port range:
443
- Protocol:
TCP
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
Configure the security group where the VM is located to enable connections to the VM and traffic between the VM and the cluster hosts.
For example, you can set the following rules for a VM:
-
For incoming traffic:
- Port range:
22
- Protocol:
TCP
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
This rule allows you to connect to the VM over SSH.
- Port range:
-
For outgoing traffic:
- Protocol:
Any
(Any
) - Port range:
0-65535
- Destination name:
CIDR
- CIDR blocks:
0.0.0.0/0
This rule allows all outgoing traffic, which enables you to both connect to the cluster and install the certificates and utilities the VMs need to connect to the cluster.
- Protocol:
-
Note
You can set more detailed rules for security groups, such as allowing traffic in only specific subnets.
Security groups must be configured correctly for all subnets that will include cluster hosts. If the security group settings are incomplete or incorrect, you might lose access the cluster.
For more information about security groups, see Security groups.
Getting an SSL certificate
To use an encrypted connection, get an SSL certificate:
mkdir -p /usr/local/share/ca-certificates/Yandex/ && \
wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
--output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt
The certificate will be saved to the /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt
file.
mkdir $HOME\.kafka; curl.exe -o $HOME\.kafka\YandexInternalRootCA.crt https://storage.yandexcloud.net/cloud-certs/CA.pem
The certificate is saved to the $HOME\.kafka\YandexInternalRootCA.crt
file.
The resulting SSL certificate is also used when working with Managed Schema Registry.
Apache Kafka® host FQDN
To connect to a host, you need its fully qualified domain name (FQDN). You can obtain it in one of the following ways:
-
In the management console
, copy the command for connecting to the cluster. This command contains the broker host FQDN. To get the command, go to the cluster page and click Connect. -
Look up the FQDN in the management console:
- Go to the cluster page.
- Go to Hosts.
- Copy the Host FQDN column value.
Before you connect from a Docker container
To connect to a Managed Service for Apache Kafka® cluster from a Docker container, add the following lines to the Dockerfile:
RUN apt-get update && \
apt-get install kafkacat --yes
RUN apt-get update && \
apt-get install wget kafkacat --yes && \
mkdir --parents /usr/local/share/ca-certificates/Yandex/ && \
wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
--output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt
Sample connection strings
Examples were tested in the following environment:
- Yandex Cloud virtual machine running Ubuntu 20.04 LTS:
- Bash:
5.0.16
. - Python:
3.8.2
, pip3:20.0.2
. - Node.JS:
10.19.0
, npm:6.14.4
. - OpenJDK:
11.0.8
, Maven:3.6.3
. - Go:
1.13.8
. - mono-complete:
6.8.0.105
.
- Bash:
- Virtual machine in Yandex Cloud running Windows Server 2019 Datacenter:
- Microsoft OpenJDK:
11.0.11
. - PowerShell:
5.1.17763.1490 Desktop
.
- Microsoft OpenJDK:
Prior to connecting to cluster hosts over an SSL connection, generate a certificate. The examples below assume that the YandexInternalRootCA.crt
certificate is located in the directory:
/usr/local/share/ca-certificates/Yandex/
for Ubuntu.$HOME\.kafka\
for Windows.
To view an example of the command with the host FQDN filled in, open the cluster page in the management console
Bash
To connect to an Apache Kafka® cluster from the command line, use kafkacat
, an open source application that can work as a universal data producer or consumer. For more information, see the documentation
Before connecting, install the dependencies:
sudo apt update && sudo apt install -y kafkacat
-
Run this command for receiving messages from a topic:
kafkacat -C \ -b <broker_FQDN>:9092 \ -t <topic_name> \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<consumer_password>" -Z
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "test message" | kafkacat -P \ -b <broker_FQDN>:9092 \ -t <topic_name> \ -k key \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<producer_username>" -Z
-
Run this command for receiving messages from a topic:
kafkacat -C \ -b <broker_FQDN>:9091 \ -t <topic_name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<consumer_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "test message" | kafkacat -P \ -b <broker_FQDN>:9091 \ -t <topic_name> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<producer_login>" \ -X sasl.password="<producer_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Learn how to get the FQDN of a broker host in this guide.
Make sure that the first terminal displays the message key:test message
sent in the second terminal.
C#
Before connecting:
-
Install the dependencies:
sudo apt-get update && \ sudo apt-get install -y apt-transport-https dotnet-sdk-6.0
-
Create a directory for the project:
cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
-
Create a configuration file:
App.csproj
<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp6.0</TargetFramework> </PropertyGroup> <ItemGroup> <PackageReference Include="Confluent.Kafka" Version="2.2.0" /> </ItemGroup> </Project>
-
Copy
App.csproj
to the directories of the producer application and consumer application:cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
-
Example code for delivering messages to a topic:
cs-project/producer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<FQDN_of_broker_host>:9092"; string TOPIC = "<topic_name>"; string USER = "<producer_username>"; string PASS = "<producer_password>"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } }
-
Code example for getting messages from a topic:
cs-project/consumer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<FQDN_of_broker_host>:9092"; string TOPIC = "<topic_name>"; string USER = "<consumer_name>"; string PASS = "<consumer_password>"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } }
-
Building and launching applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dll
cd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dll
-
Example code for delivering messages to a topic:
cs-project/producer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<FQDN_of_broker_host>:9091"; string TOPIC = "<topic_name>"; string USER = "<producer_username>"; string PASS = "<producer_password>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } }
-
Code example for getting messages from a topic:
cs-project/consumer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<FQDN_of_broker_host>:9091"; string TOPIC = "<topic_name>"; string USER = "<consumer_name>"; string PASS = "<consumer_password>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } }
-
Building and launching applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dll
cd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dll
Learn how to get the FQDN of a broker host in this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message
messages to the topic. The consumer application displays messages sent to the topic.
Go
Before connecting:
-
Install the dependencies:
sudo apt update && sudo apt install -y golang git && \ go get github.com/Shopify/sarama && \ go get github.com/xdg/scram
-
Create a directory for the project:
cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
-
Create the
scram.go
file with the code for running SCRAM . This code is the same for the producer application and consumer application:scram.gopackage main import ( "crypto/sha256" "crypto/sha512" "hash" "github.com/xdg/scram" ) var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() }
-
Copy
scram.go
to the directory of the producer application and the consumer application:cp scram.go producer/scram.go && cp scram.go consumer/scram.go
-
Example code for delivering a message to a topic:
producer/main.go
package main import ( "fmt" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_of_broker_host>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<producer_name>" conf.Net.SASL.Password = "<producer_password>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // Publish sync msg := &sarama.ProducerMessage { Topic: "<topic_name>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) }
-
Code example for getting messages from a topic:
consumer/main.go
package main import ( "fmt" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_of_broker_host>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<consumer_name>" conf.Net.SASL.Password = "<consumer_password>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<topic_name>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") }
-
Building applications:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build
-
Running applications:
~/go-project/consumer/consumer
~/go-project/producer/producer
-
Example code for delivering a message to a topic:
producer/main.go
package main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_of_broker_host>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<producer_username>" conf.Net.SASL.Password = "<producer_password>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ InsecureSkipVerify: true, RootCAs: certs, } syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // publish sync msg := &sarama.ProducerMessage { Topic: "<topic_name>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) }
-
Code example for getting messages from a topic:
consumer/main.go
package main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_of_broker_host>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<consumer_username>" conf.Net.SASL.Password = "<consumer_password>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ InsecureSkipVerify: true, RootCAs: certs, } master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<topic_name>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") }
-
Building applications:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build
-
Running applications:
~/go-project/consumer/consumer
~/go-project/producer/producer
Learn how to get the FQDN of a broker host in this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message
messages to the topic. The consumer application displays messages sent to the topic.
Java
Before connecting:
-
Install the dependencies:
sudo apt update && sudo apt install --yes default-jdk maven
-
Create a folder for the Maven project:
cd ~/ && \ mkdir --parents project/consumer/src/java/com/example project/producer/src/java/com/example && \ cd ~/project
-
Create a configuration file for Maven:
pom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>app</artifactId> <packaging>jar</packaging> <version>0.1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}-${project.version}</finalName> <sourceDirectory>src</sourceDirectory> <resources> <resource> <directory>src</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <goals> <goal>attached</goal> </goals> <phase>package</phase> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
Refer to the relevant project pages in the Maven repository for up-to-date versions of the dependencies:
-
Copy
pom.xml
to the directories of the producer application and consumer application:cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
-
Example code for delivering messages to a topic:
producer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<broker_FQDN>:9092"; String TOPIC = "<topic_name>"; String USER = "<producer_username>"; String PASS = "<producer_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } }
-
Code example for getting messages from a topic:
consumer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<broker_FQDN>:9092"; String TOPIC = "<topic_name>"; String USER = "<consumer_name>"; String PASS = "<consumer_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } }
-
Building applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package
-
Running applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
-
Go to the folder where the Java certificate store will be located:
cd /etc/security
-
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the
-storepass
parameter for additional storage protection:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <certificate_store_password> \ --noprompt
-
Example code for delivering messages to a topic:
producer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<broker_FQDN>:9091"; String TOPIC = "<topic_name>"; String USER = "<producer_username>"; String PASS = "<producer_password>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<certificate_store_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } }
-
Code example for getting messages from a topic:
consumer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<broker_FQDN>:9091"; String TOPIC = "<topic_name>"; String USER = "<consumer_name>"; String PASS = "<consumer_password>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<certificate_store_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } }
-
Building applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package
-
Running applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
Learn how to get the FQDN of a broker host in this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message
messages to the topic. The consumer application displays messages sent to the topic.
Node.js
Before connecting, install the dependencies:
sudo apt update && sudo apt install -y nodejs npm && \
npm install node-rdkafka
-
Example code for delivering messages to a topic:
producer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9092"; const TOPIC = "<topic_name>"; const USER = "<producer_username>"; const PASS = "<producer_password>"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } });
-
Code example for getting messages from a topic:
consumer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9092"; const TOPIC = "<topic_name>"; const USER = "<consumer_name>"; const PASS = "<consumer_password>"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); });
-
Running applications:
node consumer.js
node producer.js
-
Example code for delivering messages to a topic:
producer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9091"; const TOPIC = "<topic_name>"; const USER = "<producer_username>"; const PASS = "<producer_password>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } });
-
Code example for getting messages from a topic:
consumer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9091"; const TOPIC = "<topic_name>"; const USER = "<consumer_name>"; const PASS = "<consumer_password>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); });
-
Running applications:
node consumer.js
node producer.js
Learn how to get the FQDN of a broker host in this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message
messages to the topic. The consumer application displays messages sent to the topic.
PowerShell
Before connecting:
-
Install the latest available version of Microsoft OpenJDK
. -
Download the archive with binary files
for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant. -
Unpack the archive.
Tip
Unpack the Apache Kafka® files to the root directory of the disk, for example,
C:\kafka_2.12-2.6.0\
.If the path to the executable and batch files of Apache Kafka® is too long, you will get the error
The input line is too long
when trying to run the files.
-
Run this command for receiving messages from a topic:
<path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <broker_FQDN>:9092 ` --topic <topic_name> ` --property print.key=true ` --property key.separator=":" ` --consumer-property security.protocol=SASL_PLAINTEXT ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<consumer_username>' password='<consumer_password>';"
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <broker_FQDN>:9092 ` --topic <topic_name> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_PLAINTEXT ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<producer_login>' password='<producer_password>';"
-
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set the password using the
-storepass
parameter for additional storage protection:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <certificate_store_password> ` --noprompt
-
Run this command for receiving messages from a topic:
<path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <broker_FQDN>:9091 ` --topic <topic_name> ` --property print.key=true ` --property key.separator=":" ` --consumer-property security.protocol=SASL_SSL ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property ssl.truststore.location=$HOME\.kafka\ssl ` --consumer-property ssl.truststore.password=<certificate_store_password> ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<consumer_username>' password='<consumer_password>';"
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <broker_FQDN>:9091 ` --topic <topic_name> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_SSL ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property ssl.truststore.location=$HOME\.kafka\ssl ` --producer-property ssl.truststore.password=<certificate_store_password> ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<producer_password>' password='<producer_password>';"
Learn how to get the FQDN of a broker host in this guide.
Make sure that the first terminal displays the message key:test message
sent in the second terminal.
Python (kafka-python)
Before connecting, install the dependencies:
sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c
-
Example code for delivering a message to a topic:
producer.py
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<FQDN_of_broker_host>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<producer_name>', sasl_plain_password='<producer_password>') producer.send('<topic_name>', b'test message', b'key') producer.flush() producer.close()
-
Code example for getting messages from a topic:
consumer.py
from kafka import KafkaConsumer consumer = KafkaConsumer( '<topic_name>', bootstrap_servers='<broker_FQDN>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<consumer_name>', sasl_plain_password='<consumer_password>') print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
-
Running applications:
python3 producer.py
python3 consumer.py
-
Example code for delivering a message to a topic:
producer.py
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<FQDN_of_broker_host>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<producer_name>', sasl_plain_password='<producer_password>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") producer.send('<topic_name>', b'test message', b'key') producer.flush() producer.close()
-
Code example for getting messages from a topic:
consumer.py
from kafka import KafkaConsumer consumer = KafkaConsumer( '<topic_name>', bootstrap_servers='<broker_FQDN>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<consumer_username>', sasl_plain_password='<consumer_password>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
-
Running applications:
python3 consumer.py
python3 producer.py
Learn how to get the FQDN of a broker host in this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message
messages to the topic. The consumer application displays messages sent to the topic.
Python (confluent-kafka)
Before connecting, install the dependencies:
pip install confluent_kafka
-
Example code for delivering a message to a topic:
producer.py
from confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_of_broker_host>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<producer_username>', 'sasl.password': '<producer_password>', 'error_cb': error_callback, } p = Producer(params) p.produce('<topic_name>', 'some payload1') p.flush(10)
-
Code example for getting messages from a topic:
consumer.py
from confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_of_broker_host>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<consumer_name>', 'sasl.password': '<consumer_password>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<topic_name>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val)
-
Running applications:
python3 producer.py
python3 consumer.py
-
Example code for delivering a message to a topic:
producer.py
from confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_of_broker_host>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<producer_username>', 'sasl.password': '<producer_password>', 'error_cb': error_callback, } p = Producer(params) p.produce('<topic_name>', 'some payload1') p.flush(10)
-
Code example for getting messages from a topic:
consumer.py
from confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_of_broker_host>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<consumer_name>', 'sasl.password': '<consumer_password>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<topic_name>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val)
-
Running applications:
python3 consumer.py
python3 producer.py
Learn how to get the FQDN of a broker host in this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message
messages to the topic. The consumer application displays messages sent to the topic.