概述
同一服务Kafka多数据源配置
一个微服务项目,业务需要,引入两套kafka集群的配置
1.xml文件加入新老Kafka集群配置信息
2.添加KafkaTemplateConfig bean文件
3.注解@Qualifier(“kafkaTemplateNew”)使用
@EnableKafka
@Configuration
public class KafkaTemplateConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka-new.bootstrap-servers}")
private String newBootstrapServers;
/**
* Producer Template 配置
*/
@Bean
@Primary
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Producer Template new
配置
*/
@Bean(name="kafkaTemplateNew")
public KafkaTemplate kafkaTemplateNew() {
return new KafkaTemplate<>(newProducerFactory());
}
/**
* Producer 工厂配置
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(true));
}
/**
* Producer new
工厂配置
*/
@Bean
public ProducerFactory<String, String> newProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(false));
}
/**
* Producer 参数配置
*/
public Map<String, Object> producerConfigs(boolean isPrimary) {
Map<String, Object> props = new HashMap<>();
// 指定多个kafka集群多个地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, isPrimary?bootstrapServers:newBootstrapServers);
// 重试次数,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//同步到副本, 默认为1
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
// acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
return props;
}
}
@Autowired
@Qualifier("kafkaTemplateNew")
private KafkaTemplate kafkaTemplateNew;
@Autowired
private KafkaTemplate kafkaTemplate;
最后
以上就是奋斗唇膏为你收集整理的同一服务Kafka多数据源配置同一服务Kafka多数据源配置的全部内容,希望文章能够帮你解决同一服务Kafka多数据源配置同一服务Kafka多数据源配置所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复