
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个缓冲双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
工作流程:
① batch.size:只有数据积累达到batch.size,sender才会发送数据。默认是16k
② linger.ms:如果数据未达到batch.size的默认大小,通过linger.ms设置等待的时间
0:生产者发送过来的数据,不需要等数据落盘应答
1:生产者发送过来的数据,Leader收到数据后应答
-1(all):生产者发送过来的数据,Leader和ISR(即所有的Follower)队列里面的所有节点收齐数据后应答。-1和all等价
同步:效率低,需要等待才能进行下一步操作
异步:效率高,不需要等待,可以同时进行多步操作
同步发送:必须等待上一批外部的数据发送完成,才能继续发送下一批数据
异步发送:将外部的数据一批一批的放入缓冲双端队列中,不需要等待
1.普通异步发送案例说明:在IDEA创建工程,创建Kafka 生产者,采用异步的方式发送到 Kafka Broker
(1)导入依赖
org.apache.kafka kafka-clients 3.0.0
(2)启动集群及消费者
(3)创建不带回调函数的API代码
package com.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
//说明kafka集群需要启动
//在hadoop103上启动消费者,消费数据
//bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first1 --from-beginning
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//第一个参数为生产者的主题名,第二个生产者生产的数据value,该方法还有其他配置选项
kafkaProducer.send(new ProducerRecord("first1","kafka"));
}
// 3.关闭资源
kafkaProducer.close();
}
}
2.带回调函数的异步发送
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败
(1)(2)同上
(3)创建带回调函数的API代码
package com.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) {
//说明kafka集群需要启动
//在hadoop103上启动消费者,消费数据
//bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first1 --from-beginning
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//第一个参数为生产者的主题名,第二个生产者生产的数据value, new Callback()创建回调函数
kafkaProducer.send(new ProducerRecord("first1", "kafka" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主题为:" + recordMetadata.topic() + " 分区为:" + recordMetadata.partition());
}
}
});
}
// 3.关闭资源
kafkaProducer.close();
}
}
三、同步发送 API
在send方法后面加一个调用get()方法即可
package com.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//第一个参数为生产者的主题名,第二个生产者生产的数据value
kafkaProducer.send(new ProducerRecord("first1", "kafka" + i)).get();
}
// 3.关闭资源
kafkaProducer.close();
}
}
四、生产者分区
1.分区的好处
通过实现 ProducerRecord() 类的构造方法来确定分区
(1)指明partition的情况下,直接将指明的值作为partition值;
例如:partition=0,所有数据写入分区0
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch(默认16k)已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)
方式一:直接指明
注意:进行分区时,需要查看主题topic的分区数,如果是只有一个分区,那么只能指定的分区数为0
package com.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions04 {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//new Callback()创建回调函数
kafkaProducer.send(new ProducerRecord("first3",1 ,"","kafka" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主题为:" + recordMetadata.topic() + " 分区为:" + recordMetadata.partition());
}
}
});
}
// 3.关闭资源
kafkaProducer.close();
}
}
方式二:通过key的hash值与topic的partition数进行取余得到partition值
package com.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions04 {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//new Callback()创建回调函数,以a的hash值与topic的partition数进行取余得到分区数partition
kafkaProducer.send(new ProducerRecord("first3","a","kafka" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主题为:" + recordMetadata.topic() + " 分区为:" + recordMetadata.partition());
}
}
});
}
// 3.关闭资源
kafkaProducer.close();
}
}
方式三:既没有指定分区也没指定key
package com.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions04 {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//new Callback()创建回调函数
kafkaProducer.send(new ProducerRecord("first3","kafka" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主题为:" + recordMetadata.topic() + " 分区为:" + recordMetadata.partition());
}
}
});
}
// 3.关闭资源
kafkaProducer.close();
}
}
五、自定义分区器
首先定义一个myPartition类实现 Partitioner 接口,重写其中的方法
案例说明:如果数据包含kafka放入到1号分区,不包含放入2号分区(注意:主题的分区要有1,2号分区)
可以通过下面命令查看主题的详细信息:
bin/kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic first3
myPatition类
package com.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class myPartitions05_1 implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取数据
String value1 = value.toString();
int Partition1;
if (value1.contains("kafka")){
Partition1 = 1;
}else {
Partition1 = 2;
}
return Partition1;
}
public void close() {
}
public void configure(Map map) {
}
}
CustomProducerCallbackPartitions05_2 类
package com.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions05_2 {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.producer.myPartitions05_1");
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//new Callback()创建回调函数
kafkaProducer.send(new ProducerRecord("first3","kafka1111" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主题为:" + recordMetadata.topic() + " 分区为:" + recordMetadata.partition());
}
}
});
}
// 3.关闭资源
kafkaProducer.close();
}
}