TIL #2: Kafka poison pill message and CommitFailedException


Yesterday I was working with a team that was facing issue with their Kafka related code. The Kafka consumer was failing with the exception

[] ERROR [2022-11-22 08:32:52,853] com.abc.MyKakfaConsumer: Exception while processing events
! java.lang.NullPointerException: Cannot invoke "org.apache.kafka.common.header.Header.value()" because the return value of "org.apache.kafka.common.header.Headers.lastHeader(String)" is null
! at com.abc.MyKakfaConsumer.run(MyKakfaConsumer.java:83)
! at java.base/java.lang.Thread.run(Thread.java:833)

The consumer code looked like as shown below.

public void run() {
        try {
            kafkaConsumer.subscribe(List.of(this.topic), new RebalanceHandler());
            while (!closed.get()) {
                ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<byte[], byte[]> record : records) {

                    String eventId = new String(record.headers().lastHeader("ID").value());

                    rebalanceHandler.put(record.topic(), record.partition(), record.offset());
                    Event event;
                    try {
                        event = this.objectMapper.readValue(record.value(), Event.class);
                    } catch (IOException e) {
                        kafkaConsumer.commitSync(rebalanceHandler.getCurrentOffsets());
                        continue;
                    }
                    try {
                        String key = new String(record.key());
                        eventProcess.process(key, event);
                        kafkaConsumer.commitAsync(rebalanceHandler.getCurrentOffsets(), null);
                    } catch (Exception exception) {
                        logger.warn("Exception while processing event [eventId={}, eventType={}]",
                                eventId,
                                event.eventType(),
                                exception);
                    }
                }
            }
        } catch (WakeupException ignore) {
            //ignored because we are closing. This is the recommended way.
            logger.warn("Shutting down [topic={}]", topic);
        } catch (Exception exception) {
            logger.error("Exception while processing events", exception);
        } finally {
            try {
                kafkaConsumer.commitSync(rebalanceHandler.getCurrentOffsets());
            } finally {
                kafkaConsumer.close();
            }
        }
}

Things to note in the code snippet shown above:

  1. We use Consumer<byte[], byte[]> so that we can deserialize JSON messages to our Event class in the consumer code. This help us handle poison pill messages.

A poison pill (in the context of Kafka) is a record that has been produced to a Kafka topic and always fails when consumed, no matter how many times it is attempted.

  1. We commit Kafka offset at multiple points to cover both successful and failure scenarios
  2. We process message again in case a message failed to process. We have made our process idempotent using eventId as the idempotency key.

It is expected that events are published with the eventId. In our case this is the correlation id that is set at the API gateway level.

The exception we were facing was in the line

String eventId = new String(record.headers().lastHeader("ID").value());

There was a bug in the producer that caused producer to not set the ID header.

We had to fix the producer to ensure it sets the ID in all cases but it also required consumer to be fixed. The consumer is requied to be fixed because this message is a poison pill message. Since it was never processed consumer will keep on processing it causing it to go in the failure loop.

The fix to handle this poison pill message is simple. We just had to try/catch the header code as shown below.

for (ConsumerRecord<byte[], byte[]> record : records) {
    rebalanceHandler.put(record.topic(), record.partition(), record.offset());
    String eventId;
    try {
        eventId = new String(record.headers().lastHeader("ID").value());
    } catch (Exception e) {
        kafkaConsumer.commitSync(rebalanceHandler.getCurrentOffsets());
        continue;
}
// rest removed for brevity

We deployed the change and in few mins our latest code was deployed to our Kubernetes cluster by Argo CD.

We tailed the logs and saw messages getting processed. We were happy.

After few mins we started seeing exceptions in our logs and our consumer again crashed 😦

! org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
! at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1309)
! at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1120)
! at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1086)
! at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573)
! at com.abc.MyKakfaConsumer.run(MyKakfaConsumer.java:106)
! at java.base/java.lang.Thread.run(Thread.java:833)

This exception does not tell much. But, few lines above this exception I found the reason for this behavior.

[] WARN  [2022-11-22 16:59:13,854] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-group-1, groupId=group1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

This exception was caused because Kafka consumer was unable to process unprocessed messages in max.poll.interval.ms. When the consumer came back again it polled for the messages and started to process them. By default Kafka consumer will poll at maximum 500 messages in one go and it is expected by that Kafka consumer will process them in 5mins. Since our processing took more than 5mins Kafka consumer crashed.

The solution that requires no code change is to either increase max.poll.interval.ms or reduce max.poll.records. We are also looking ways to improve the performance of the code so that that messages are processed faster. For now, we went with reducing max.poll.records to 100.

One thought on “TIL #2: Kafka poison pill message and CommitFailedException”

Leave a comment