From cdcdf97894258e764a7624ab5067d4f0b94d8b2f Mon Sep 17 00:00:00 2001 From: balex Date: Fri, 27 Feb 2026 04:55:20 +0100 Subject: [PATCH] err handling --- .../analytics/config/KafkaConsumerConfig.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/analytics-service/src/main/java/com/posthub/analytics/config/KafkaConsumerConfig.java b/analytics-service/src/main/java/com/posthub/analytics/config/KafkaConsumerConfig.java index ea5359c..bcf2f02 100644 --- a/analytics-service/src/main/java/com/posthub/analytics/config/KafkaConsumerConfig.java +++ b/analytics-service/src/main/java/com/posthub/analytics/config/KafkaConsumerConfig.java @@ -9,7 +9,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.util.backoff.FixedBackOff; import java.util.Map; @@ -24,10 +27,13 @@ public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { - JsonDeserializer deserializer = new JsonDeserializer<>(UserEvent.class); - deserializer.setRemoveTypeHeaders(true); - deserializer.addTrustedPackages("*"); - deserializer.setUseTypeMapperForKey(false); + JsonDeserializer jsonDeserializer = new JsonDeserializer<>(UserEvent.class); + jsonDeserializer.setRemoveTypeHeaders(true); + jsonDeserializer.addTrustedPackages("*"); + jsonDeserializer.setUseTypeMapperForKey(false); + + ErrorHandlingDeserializer errorHandlingDeserializer = + new ErrorHandlingDeserializer<>(jsonDeserializer); return new DefaultKafkaConsumerFactory<>( Map.of( @@ -35,10 +41,10 @@ public class KafkaConsumerConfig { ConsumerConfig.GROUP_ID_CONFIG, groupId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class ), new StringDeserializer(), - deserializer + errorHandlingDeserializer ); } @@ -47,6 +53,8 @@ public class KafkaConsumerConfig { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); + // Skip failed records after 0 retries (log and move on) + factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L))); return factory; } -} +} \ No newline at end of file