package com.ycwl.basic.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; @Configuration @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = false) public class KafkaConfig { @Value("${kafka.bootstrap-servers:100.64.0.12:39092}") private String bootstrapServers; @Value("${kafka.consumer.group-id:liuying-microservice}") private String consumerGroupId; @Value("${kafka.consumer.auto-offset-reset:earliest}") private String autoOffsetReset; @Value("${kafka.producer.acks:all}") private String acks; @Value("${kafka.producer.retries:3}") private Integer retries; @Value("${kafka.producer.batch-size:16384}") private Integer batchSize; @Value("${kafka.producer.linger-ms:1}") private Integer lingerMs; @Value("${kafka.producer.buffer-memory:33554432}") private Integer bufferMemory; @Value("${kafka.producer.enable-idempotence:true}") private boolean enableIdempotence; @Value("${kafka.producer.compression-type:snappy}") private String compressionType; @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, acks); configProps.put(ProducerConfig.RETRIES_CONFIG, retries); configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence); configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConcurrentKafkaListenerContainerFactory manualCommitKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }