Now that Apache Kafka 4.2.0 has been released, Queues for Kafka is ready for production use. I thought I'd take the opportunity to share some of the thinking behind Queues for Kafka.

I've spent many years developing enterprise messaging and streaming technologies, most recently Apache Kafka. I've also worked with many customers trying to solve their business problems using these technologies. This puts me in a unique position to look at how people are using Kafka and the ways in which it can be evolved to meet their needs better.

Kafka remains at its heart an event-streaming platform, but in practice it is increasingly being used by people as a general-purpose platform for data in motion.

To be truly general-purpose, the platform needs to satisfy a broad spread of workloads, which includes queues, publish/subscribe and event streams. Queues for Kafka broadens the appeal of Kafka. Kafka's ordered, partitioned model for data is a great foundation, even if not every kind of consumer wants to process the data as a stream. Now it becomes much more straightforward for applications which benefit from a queuing style of consumption to work with Kafka.

What is Queues for Kafka?

Queues for Kafka introduces a new kind of group called a share group to enable cooperative consumption from Kafka topics. There's still no such thing as a Kafka queue as such because all of the data remains on topics, but when the data from a topic is consumed using a share group, the behaviour is just like a queue. Hence, Queues for Kafka.

The consumers in a share group, known as share consumers, work cooperatively together in a way that will be familiar to users of traditional message queues. You can write applications using the new share consumer API to consume messages from Kafka topics using a share group.

Share groups are an alternative to consumer groups, not a replacement. In fact, there is also another kind of group called streams groups used by Kafka Streams. You choose which kind of group to use based on the consumption behaviour you desire, and in each case, the application code is quite different.

None
A share group compared with a consumer group

A share group differs from a consumer group in how it handles partitions. In a share group, it is as if every share consumer is assigned all partitions to share with all of the other consumers, and then they are given disjoint sets of messages to consume. Actually, there is partition assignment going on, but it's done behind the scenes and its purpose is workload distribution not partition ownership. Partitions are dynamically given to and taken away from individual share consumers. The effect is like a traditional message queue with multiple receiving applications.

You can scale up the number of share consumers without having to worry about exceeding the number of partitions. There are still partitions of course, and you can scale the system by adding partitions to spread the load across the brokers, but the relationship between consumers and partitions is much looser.

You also get around head-of-line blocking because there can be multiple consumers on a partition and a record which takes a long time to process doesn't delay later records from being delivered.

The main thing you're giving up by choosing a share group over a consumer group is ordering.

When using a share group, Apache Kafka is keeping track of which messages have been delivered and how many times. The messages go through a sequence of states.

None
Message delivery states for a share group

When a share consumer fetches messages, it can receive available messages from any of the partitions of the topics it subscribed to. The messages are acquired and locked for delivery to this consumer for a limited duration (30 seconds by default). The share consumer must process the messages within this time or else they are automatically released. The idea is that a failed or unresponsive consumer does not block delivery for other share consumers. When a share consumer has received a message, it acknowledges it in one of the following ways:

  • The consumer accepts the message which it processed successfully
  • The consumer releases the message to make it available for another delivery attempt
  • The consumer rejects the message because it is unprocessable and does not make it available for another delivery attempt
  • The consumer renews the message which extends its processing time

Kafka counts the number of delivery attempts and when a message reaches the maximum number of delivery attempts, which is 5 by default, no more attempts are made and the message is archived. This means that a so-called poisoned message which cannot be processed is automatically handled. In the future, it will be possible to put undeliverable messages onto a dead-letter queue.

What is a message queue anyway?

Let's think about the most basic messaging scenario which is usually called point-to-point messaging.

Sending applications create messages and publish them to the messaging system which buffers them. Receiving applications typically consume messages in two stages, first receiving the data for processing, and then acknowledging successful receipt. Generally, any number of sending and receiving applications can communicate via the messaging system. In point-to-point messaging, each message is consumed and acknowledged by one receiving application. If a receiving application receives a message and then fails before it can acknowledge it, another receiving application can receive the message, process it and acknowledge it. Once acknowledged successfully, a message is never delivered to another receiving application.

Let's illustrate how you can achieve point-to-point messaging using Apache Kafka. I'll use RabbitMQ for illustration but any of the message queuing systems would do. The terms they use are often different, but you get a similar effect.

To implement this scenario in RabbitMQ, you create a direct exchange which routes to a queue whose binding key matches the messages' routing key. The sending applications then send to this exchange, and the receiving applications receive from the queue. The message broker distributes the messages to the receiving applications, ensuring that each message is being consumed by one application and once acknowledged is never delivered again in that queue. This is illustrated below left.

None
Point-to-point messaging (left) and fan-out exchange (right) in RabbitMQ

Of course, you can easily extend the scenario and go beyond a single queue. For example, you can change the type of the exchange to a fanout exchange which routes to multiple queues (see above right), or even filter based on routing keys.

To implement this scenario in Apache Kafka, you create a topic and use one share group. The sending applications then publish to this topic, and the receiving applications are members of the share group subscribed to the topic. The share group distributes the messages to the receiving applications, ensuring that each message is being consumed by one application and once acknowledged is never delivered again in that group. This is illustrated below left.

None
Point-to-point messaging (left) and multiple groups(right) in Apache Kafka

Again, you can easily extend the scenario in Apache Kafka too. For example, you can subscribe another share group or a consumer group to the same topic (see above right), or even start a Kafka Connect sink connector on the topic. The flexibility you're getting beyond a simple queue is clear.

What's different about Queues for Kafka?

Queues for Kafka rethinks some of the limitations of traditional message queuing systems, such as maximum queue depth.

If a queue has a maximum depth, when the number of unprocessed messages reaches that limit, sending applications cannot publish more messages. This means that a problem with receiving applications also becomes a problem for sending applications. A more resilient way to handle this in a production system is to accept the incoming messages but build monitoring and alerting on the size of the processing backlog. If something is wrong, the problem needs to be fixed promptly rather than spread to connected systems.

There can also be an effect on the design of the messaging system if it assumes that queues are usually shallow. In production Kafka workloads, it's not uncommon for the processing backlog to be millions of messages, which can be just a few seconds of data. Provided the backlog does not increase over time, this is entirely manageable, if the system was designed with this in mind.

Share groups do not have a maximum queue depth, but they do keep track of the lag, which is the number of messages to be delivered. By monitoring the lag, it is possible to scale the number of consumers in the share group, giving additional processing power to cope with fluctuations in the message rate.

Queues for Kafka copes well with extremely large backlogs. It is optimised for receiving and acknowledging messages in batches. This makes the system resilient to temporary interruptions of service, such as when you're performing maintenance.

Queues for Kafka doesn't immediately delete a message from storage when it has been processed. Consumer group and share group members subscribe to topics, and Kafka maintains metadata to keep track of the consumption. When a message is acknowledged as consumed by a share group, an atomic update to the metadata is made and ensure that it is not delivered again, but the message is not actually deleted from the topic at this point. Messages are deleted based on log retention in the usual Kafka way.

These points really just illustrate why the name of the feature includes the word Kafka. It's Kafka under the hood, intelligently enhanced to support a new kind of consumption.

Can I replace my existing queuing system?

Many users are close to being able to consolidate their messaging workloads onto Kafka, but with a handful of applications still on the earlier queuing system because they can't naturally use consumer groups. By adding queuing semantics to Apache Kafka, it should be possible to move some of these applications onto Kafka too. The application code will need to be changed to use the Kafka share consumer API, but that's all. For these users, prepared to make some application changes to achieve consolidation, yes, I expect they will be able to replace their existing systems.

Is Queues for Kafka trying to be compatible with existing queuing systems? Not at all. You might think that implementing JMS would mean that it would be possible to move existing applications unchanged onto Kafka, but actually it would not be that simple. It's already very tricky to move non-trivial applications between JMS implementations. Knowing this, it doesn't seem like a valuable use of time to create yet another JMS variant.

There's one specific class of application which is especially sticky and that uses two-phase commit to coordinate a queuing system with a relational database. This does let you move data between the queuing system and the database atomically. However, with exactly-once support in Kafka Connect and a great choice of connectors to databases, the need to do this in an application is diminished. Similarly, some applications repeatedly receive a message, process it, write to a database, and then send a response message, all within a distributed transaction. In the Kafka world, that's a stream processing application and it would usually be achieved using a stream processing system such as Apache Flink.

What next?

It's been a lot of work by a large number of people to get to this point, but we are not finished. We've started work on dead-letter queues and are thinking about what comes next.

Here's a great video which explains the concepts of Queues for Kafka. To learn more about share groups and the share consumer, and managing share groups in the Apache Kafka documentation. And of course, you can download Apache Kafka and give it try.