栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot集成配置Kafka及KafkaTemplate生产消息

大数据系统 更新时间:发布时间: 百科书网 趣学号
@Configuration配置
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;

@Configuration
public class KafkaTemplateConfig {
    @Value("${spring.kafka.servers}")
    private String bootstrap_servers_config;
    @Value("${spring.kafka.producer.retries}")
    private String retries_config;
    @Value("${spring.kafka.producer.batch-size}")
    private String batch_size_config;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String buffer_memory_config;

    @Bean(name = "myKafkaTemplate")
    public KafkaTemplate kafkaTemplate() {
        HashMap configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
        configs.put(ProducerConfig.RETRIES_CONFIG, retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
        //设置序列化
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //设置自定义分区
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs);
        return new KafkaTemplate(producerFactory);
    }

}
发送消息工具类
import com.webray.information.notice.api.enums.EventStatusEnum;
import com.webray.information.notice.api.vo.EventStatusVo;
import com.webray.information.notice.provider.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneOffset;

@Component
@Slf4j
public class EventKafkaProducer {

    @Resource(name = "myKafkaTemplate")
    private KafkaTemplate myKafkaTemplate;

    @Value("${spring.kafka.producer.topic}")
    private String topic;

    public void sendMsg(String message) {
        log.info("*************kafka发送消息**********************");
        log.info("message:"+message);
        if(null == message){
            log.error("message不能为空");
            return;
        }
        ListenableFuture> future = myKafkaTemplate.send(topic, message);
        future.addCallback(success -> log.info("KafkaMessageProducer 发送消息成功!"),
                fail -> log.error("KafkaMessageProducer 发送消息失败!"));
    }
    public void sendMsg(Object vo) {
        sendMsg(JsonUtil.toJson(vo));
    }
 
}
发送消息 注入EventKafkaProducer
    @Autowired
    private EventKafkaProducer eventKafkaProducer;

发送消息
    eventKafkaProducer.sendMsg("我是傻逼");
转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/601010.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号