栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot手动重置kafka的offset

大数据系统 更新时间:发布时间: 百科书网 趣学号

直接上代码

@Autowired
	private KafkaListenerEndpointRegistry kafkaRegistry;

	@Autowired
	private ConsumerFactory consumerFactory;

	
	@KafkaListener(id = "testlistenerId", topics ={ "testTopic" }, groupId = "testGroupId",clientIdPrefix = "testClientId")
	public void kafkaListener(String recored,@Header(KafkaHeaders.OFFSET) Integer offset,Acknowledgment acknowledgment)
	{
		System.out.println("1============================"+offset);
		System.out.println("2============================"+recored.length());
		System.out.println("3============================"+recored.substring(0,100));
		acknowledgment.acknowledge();
	}

	
	public void startReStat(long offset)
	{
		//1、获取需要重置offset的topic的监听器停止,因为一个topic的分区只能有一个客户端操作
		MessageListenerContainer messageListenerContainer = kafkaRegistry.getListenerContainer("testlistenerId");
		if (messageListenerContainer!= null && !messageListenerContainer.isContainerPaused())
		{
			messageListenerContainer.pause();
		}
		if (messageListenerContainer!= null && messageListenerContainer.isRunning())
		{
			messageListenerContainer.stop();
		}
		
		//2、创建一个新的客户端(消费者)并设置需要重置offset的topic的分区(可实际情况设置,我的分区是0)
		Consumer cunsumer = consumerFactory.createConsumer("testGroupId", null);
		cunsumer.assign(Arrays.asList(new TopicPartition("testTopic", 0)));
		
		//3、重置指定topic的offset,并同步提交,提交后消费者会自动退出
		cunsumer.seek(new TopicPartition("testTopic", 0), offset);
		cunsumer.commitSync();
		
		//4、重启启动监听器,原消费者继续消费kafka消息
		if (messageListenerContainer!= null && !messageListenerContainer.isRunning())
		{
			messageListenerContainer.start();
			messageListenerContainer.resume();
		}
		
	}

关键流程:

1、停止需要重置offset的topic的监听器,因为一个topic的分区只能有一个客户端操作

2、创建一个新的客户端(消费者)并设置需要重置offset的topic的分区

3、重置指定topic的offset,并同步提交,提交后消费者会自动退出

4、重启启动监听器,原消费者继续消费kafka消息

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/280906.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号