Заходим в свой account в https://confluent.cloud/ и логинимся. Если у вас еще нет кластера — создаем его. В итоге должно получчиться что то такое:
Теперь когда есть кластер — можно попробовать к нему подключиться
Логинимся к confluent cloud из cli
confluent loginЗдесь потребуется ввести свой user/password который вы использовали при регистрации в confluent cloud
После того как появится надпись Logged in as "smilyk" for organization … можно посмотреть какие вообще environment & clusters у вас есть. Например — у меня 2 environment и нужно выбрать с каким именно окружением сейчас сы будем работать
confluent environment list
мне нужно использовать test — поэтому я назнааю его "главным"
confluent environment use env-dzx0odтут используется ID нужного мне окружения. Теперь, после того как окружение выбрано правильно, необходимо определить — какие кластеры у меня есть
confluent kafka cluster list
теперь можно подклчиться к кластеру (я уже знаю его id) но у меня пока еще нет сгенерированого ключа.
Генерируем ключ для доступа к кластеру.
Для этого переходим в confluent cloud и выбираем API Keys (не кластера а всего профиля)

Нажимаем кнопку Add и выбираем My Account. Дальше нужно выбрать для чего именно мы генерируем этот ключ. Так как мы работаем с кластером — то выбираем Kafka Cluster, выбираем environment & cluster, заполняем поле названия ключа и его описание и нажимаем кнопку скачать и сохранить. Полуаем свой клююч

Следующий этап — подключение к кластеру с использованием созданного ключа и его secret
Первое что нужно сделать — устанавливаем данный ключч как то, который по умолччанию будет использоваться для подклбчения к кластеру kafka-cluster
confluent api-key store {{API-KEY}} {{API-SECRET}} -- resource {{CLUSTER_ID}}Ключ и secret нужно взять из файла который был сохранен при создании ключа, а cluster id можно найти выполнив команду
confluent kafka cluster listПосле установки ключа как default key появится строка
Stored secret for API key "JAWZQJSL4QLL3EAZ"
Следующий шаг — использование ключа
confluent api-key use JAWZQJSL4QLL3EAZ - resource lkc-00v8d5получим азапись
Using API Key "JAWZQJSL4QLL3EAZ"
Теперь наконец можно создать новый topic
confluent kafka topic create cli-testи после получения записи что топик создан — посмотреть список всех топиков которые существуют в кластере
confluent kafka topic list
Запускаем producer
confluent kafka topic produce cli-testи отправляем сообщение. После этого запускаем консьюмер
confluent kafka topic consume -b cli-testи видим что сообщение успешно получено!
!!! -b в команде запуска консьюммера говорит ему о том, что устанавливается flag — from-beginnig. Т е consumer будет читать с первого сообщения в топике.
Если теперь зайти в Consluent cloud то так же можно увидеть что был добавлен топик и в нем появилисб записи!

Кроме использования producer cli можно еще использовать большее привычный и дающий больше возможностей kafka-console-producer и kafka-console-consumer
Для этого необходимо добавить на компьютер файл kafka-cli.properties для подключения
vi ~/confluent-ccl/kafka-cli.properties — добавляем
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{{ CLUSTER_API_KEY }}" password="{{ CLUSTER_API_SECRET }}";данные bootstrap-servers можно получить используя команду
confluent kafka cluster describeвы увидите таблицу, в которой поле Endpoint именно наш bootstrap.servers
После того как файл добавлен можно пользоваться consol "плюшками" Например — посмотреть список топиков
kafka-topics \
--bootstrap-server $BOOTSTRAP_SERVERS \
--list \
--command-config ~/confluent-ccl/kafka-cli.properties
или запустить producer
kafka-console-producer \
--broker-list $BOOTSTRAP_SERVERS \
--topic cli-test \
--producer.config ~/confluent-ccl/kafka-cli.propertiesили — consumer
kafka-console-consumer \
--bootstrap-server $BOOTSTRAP_SERVERS \
--from-beginning \
--topic cli-test \
--consumer.config ~/confluent-ccl/kafka-cli.properties