Заходим в свой account в https://confluent.cloud/ и логинимся. Если у вас еще нет кластера — создаем его. В итоге должно получчиться что то такое:

Теперь когда есть кластер — можно попробовать к нему подключиться

Логинимся к confluent cloud из cli

confluent login

Здесь потребуется ввести свой user/password который вы использовали при регистрации в confluent cloud

После того как появится надпись Logged in as "smilyk" for organization … можно посмотреть какие вообще environment & clusters у вас есть. Например — у меня 2 environment и нужно выбрать с каким именно окружением сейчас сы будем работать

confluent environment list
None

мне нужно использовать test — поэтому я назнааю его "главным"

confluent environment use env-dzx0od

тут используется ID нужного мне окружения. Теперь, после того как окружение выбрано правильно, необходимо определить — какие кластеры у меня есть

confluent kafka cluster list
None

теперь можно подклчиться к кластеру (я уже знаю его id) но у меня пока еще нет сгенерированого ключа.

Генерируем ключ для доступа к кластеру.

Для этого переходим в confluent cloud и выбираем API Keys (не кластера а всего профиля)

None

Нажимаем кнопку Add и выбираем My Account. Дальше нужно выбрать для чего именно мы генерируем этот ключ. Так как мы работаем с кластером — то выбираем Kafka Cluster, выбираем environment & cluster, заполняем поле названия ключа и его описание и нажимаем кнопку скачать и сохранить. Полуаем свой клююч

None

Следующий этап — подключение к кластеру с использованием созданного ключа и его secret

Первое что нужно сделать — устанавливаем данный ключч как то, который по умолччанию будет использоваться для подклбчения к кластеру kafka-cluster

confluent api-key store {{API-KEY}} {{API-SECRET}}  -- resource {{CLUSTER_ID}}

Ключ и secret нужно взять из файла который был сохранен при создании ключа, а cluster id можно найти выполнив команду

confluent kafka cluster list

После установки ключа как default key появится строка

Stored secret for API key "JAWZQJSL4QLL3EAZ"

Следующий шаг — использование ключа

confluent api-key use JAWZQJSL4QLL3EAZ - resource lkc-00v8d5

получим азапись

Using API Key "JAWZQJSL4QLL3EAZ"

Теперь наконец можно создать новый topic

confluent kafka topic create cli-test

и после получения записи что топик создан — посмотреть список всех топиков которые существуют в кластере

confluent kafka topic list
None

Запускаем producer

confluent kafka topic produce cli-test

и отправляем сообщение. После этого запускаем консьюмер

confluent kafka topic consume -b cli-test

и видим что сообщение успешно получено!

!!! -b в команде запуска консьюммера говорит ему о том, что устанавливается flag — from-beginnig. Т е consumer будет читать с первого сообщения в топике.

Если теперь зайти в Consluent cloud то так же можно увидеть что был добавлен топик и в нем появилисб записи!

None

Кроме использования producer cli можно еще использовать большее привычный и дающий больше возможностей kafka-console-producer и kafka-console-consumer

Для этого необходимо добавить на компьютер файл kafka-cli.properties для подключения

vi ~/confluent-ccl/kafka-cli.properties — добавляем

bootstrap.servers={{ BOOTSTRAP_SERVERS }}
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{{ CLUSTER_API_KEY }}" password="{{ CLUSTER_API_SECRET }}";

данные bootstrap-servers можно получить используя команду

confluent kafka cluster describe

вы увидите таблицу, в которой поле Endpoint именно наш bootstrap.servers

После того как файл добавлен можно пользоваться consol "плюшками" Например — посмотреть список топиков

kafka-topics \                         
    --bootstrap-server $BOOTSTRAP_SERVERS \
    --list \
    --command-config ~/confluent-ccl/kafka-cli.properties

или запустить producer

kafka-console-producer \
    --broker-list $BOOTSTRAP_SERVERS \
    --topic cli-test \
    --producer.config ~/confluent-ccl/kafka-cli.properties

или — consumer

kafka-console-consumer \
    --bootstrap-server $BOOTSTRAP_SERVERS \
    --from-beginning \
    --topic cli-test \                                    
    --consumer.config ~/confluent-ccl/kafka-cli.properties