Kafka Share Consumer¶
Apache Kafka has long been the gold standard for high-throughput, low-latency data streaming. Historically, Kafka consumption used a per-partition model, where each topic partition was assigned to exactly one consumer in a group.
With Kafka 4.0 and KIP-932, Kafka introduced Share Consumer(Queues). This new paradigm brings per-message semantics to Kafka and behaves more like a traditional message queue. A share group allows multiple consumers to pull messages from the same topic concurrently, regardless of the originating partition. The feature was experimental in Kafka 4.0 and reached General Availability in Kafka 4.2.
Understanding the architectural shift is crucial for choosing the right tool for your use case:
| Feature | Traditional Consumer Group | Share Group |
|---|---|---|
| Processing Granularity | Per-partition (Exclusive access) | Per-message (Competing consumers) |
| Concurrency Limit | Limited by the number of partitions | Virtually unlimited; limited only by consumer resources |
| Message Ordering | Guaranteed within a single partition | No strict ordering guarantees across the group |
| Delivery State | Managed via offsets (Log-based) | Managed per message (Locking/Ack-based) |
| Scalability | To scale, you must add more partitions | You can add consumers without modifying the topic |
| Use Case | Event sourcing, stream processing, ordered logs | Work queues, competing consumers, slow task processing |
A Share Consumer group is a good fit when you have a high volume of independent messages, variable processing time, or want to avoid head-of-line blocking where one slow message stalls an entire partition.
Spring for Kafka 4.0 has added first-class Share Consumer support, providing the familiar @KafkaListener programming model for these share groups.
Annotation-Driven Listeners¶
Generate a Spring Boot 4 project via Spring Initializr with these dependencies:
- Spring for Apache Kafka
- Testcontainers
- Lombok
Then define a ShareConsumerFactory and a dedicated ShareKafkaListenerContainerFactory as follows.
@Configuration
@Slf4j
class ShareConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
String bootstrapServers;
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
log.debug("Get bootstrap servers from properties:{}", bootstrapServers);
Map<String, Object> props = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG, DEMO_GROUP_NAME,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
//ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"
);
DefaultShareConsumerFactory<String, String> factory = new DefaultShareConsumerFactory<>(props);
factory.addListener(new ShareConsumerFactory.Listener<>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
log.debug("consumer added id:{}", id);
}
@Override
public void consumerRemoved(@Nullable String id, ShareConsumer<String, String> consumer) {
log.debug("consumer removed id:{}", id);
}
});
return factory;
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
This configuration defines a specialized ShareConsumerFactory and a ShareKafkaListenerContainerFactory that uses it. It also registers a lifecycle listener to log when consumers join or leave the share group.
With the infrastructure in place, define a @KafkaListener that uses this container factory to participate in the share group:
@Component
@Slf4j
public class GreetingListener {
public Map<String, Long> counter = new ConcurrentHashMap<>();
@KafkaListener(
topics = DEMO_TOPIC_NAME,
containerFactory = "shareKafkaListenerContainerFactory",
groupId = DEMO_GROUP_NAME
)
public void onMessage(ConsumerRecord<String, String> record) {
log.debug("received record: {} at {}", record, LocalDateTime.now());
counter.compute(record.value(), (s, v) -> v == null ? 1 : v + 1);
}
public Long getWordCount(String word) {
return this.counter.get(word);
}
}
Create a test to verify the share consumer behavior against a running Kafka instance in Testcontainers:
@Testcontainers
@SpringBootTest
@Slf4j
class DemoApplicationTests {
// Kafka 4.2 enabled share consumer by default
@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("apache/kafka:latest"))
.withEnv("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1");
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private GreetingListener listener;
@SneakyThrows
@Test
public void testSendMessage() {
Thread.sleep(Duration.ofSeconds(5));
List.of("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog")
.forEach(word -> kafkaTemplate.send(DemoApplication.DEMO_TOPIC_NAME, word)
.thenAccept(s -> log.debug("sent message: {}", s)));
Awaitility.waitAtMost(Duration.ofMillis(30_000))
.untilAsserted(() -> assertThat(this.listener.getWordCount("the")).isEqualTo(2));
}
}
[!NOTE] I encountered some issues when running the test, so I added the
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTORenvironment variable and a delay before sending messages to ensure the share consumer is fully initialized and ready to consume. Check the original discussion on StackOverflow for more details: Kafka Share Consumer issue.
Grab the example code from GitHub, then run the test to verify the share consumer is working and the word count updates correctly.
Programmatic Listeners¶
Like the traditional consumer model, you can create programmatic listeners with ShareKafkaMessageListenerContainer. This gives finer control over consumer lifecycle and message processing.
@Configuration
@Slf4j
class ShareConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
String bootstrapServers;
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
log.debug("Get bootstrap servers from properties:{}", bootstrapServers);
Map<String, Object> props = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
// ConsumerConfig.GROUP_ID_CONFIG, DEMO_GROUP_NAME, // set in the consumer side
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
);
DefaultShareConsumerFactory<String, String> factory = new DefaultShareConsumerFactory<>(props);
factory.addListener(new ShareConsumerFactory.Listener<>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
log.debug("consumer added id:{}", id);
}
@Override
public void consumerRemoved(@Nullable String id, ShareConsumer<String, String> consumer) {
log.debug("consumer removed id:{}", id);
}
});
return factory;
}
@Bean
public ShareKafkaMessageListenerContainer<String, String> shareKafkaMessageListenerContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties(DEMO_TOPIC_NAME);
containerProps.setGroupId(DEMO_GROUP_NAME);
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new GreetingListener());
// container.setConcurrency(10);
return container;
}
}
See the example project on GitHub for the full implementation, then inspect the test and GreetingListener class to confirm the programmatic listener behavior.
Explicit Acknowledgement¶
By default, a Share consumer uses implicit acknowledgement: messages are considered acknowledged as soon as they are delivered. To control acknowledgement timing, enable explicit acknowledgement mode.
To enable explicit acknowledgement, set the ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG property to explicit in the consumer configuration:
@Configuration
@Slf4j
class ShareConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
String bootstrapServers;
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = Map.of(
...
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"
);
return new DefaultShareConsumerFactory<>(props);
}
...
}
On the consumer side, you can then use the ShareAcknowledgment object to acknowledge messages explicitly:
@KafkaListener(
topics = DEMO_TOPIC_NAME,
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = DEMO_GROUP_NAME
)
public void onMessage(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
log.debug("received record: {} at {}", record, LocalDateTime.now());
counter.compute(record.value(), (s, v) -> {
if (v == null) {
ack.acknowledge();
return 1L;
} else {
ack.reject(); // reject when the word is already tapped.
return v;
}
}
);
}
In this example, the consumer acknowledges a message only on first occurrence. If the word has already been processed, the listener rejects it and the broker will not retry it.
This explicit acknowledgement mode allows you to implement more complex processing logic and error handling strategies, giving you greater control over the message processing lifecycle in a Share consumer group.
Check the example project from GitHub and explore the test code. You can run the test to see how explicit acknowledgement works in the Share consumer model.