err handling

This commit is contained in:
2026-02-27 04:55:20 +01:00
parent 322a08f3dc
commit cdcdf97894

View File

@@ -9,7 +9,10 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.FixedBackOff;
import java.util.Map; import java.util.Map;
@@ -24,10 +27,13 @@ public class KafkaConsumerConfig {
@Bean @Bean
public ConsumerFactory<String, UserEvent> consumerFactory() { public ConsumerFactory<String, UserEvent> consumerFactory() {
JsonDeserializer<UserEvent> deserializer = new JsonDeserializer<>(UserEvent.class); JsonDeserializer<UserEvent> jsonDeserializer = new JsonDeserializer<>(UserEvent.class);
deserializer.setRemoveTypeHeaders(true); jsonDeserializer.setRemoveTypeHeaders(true);
deserializer.addTrustedPackages("*"); jsonDeserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(false); jsonDeserializer.setUseTypeMapperForKey(false);
ErrorHandlingDeserializer<UserEvent> errorHandlingDeserializer =
new ErrorHandlingDeserializer<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>( return new DefaultKafkaConsumerFactory<>(
Map.of( Map.of(
@@ -35,10 +41,10 @@ public class KafkaConsumerConfig {
ConsumerConfig.GROUP_ID_CONFIG, groupId, ConsumerConfig.GROUP_ID_CONFIG, groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class
), ),
new StringDeserializer(), new StringDeserializer(),
deserializer errorHandlingDeserializer
); );
} }
@@ -47,6 +53,8 @@ public class KafkaConsumerConfig {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory = ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>(); new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
// Skip failed records after 0 retries (log and move on)
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L)));
return factory; return factory;
} }
} }