
kafka起初是有linkedIn公司采用scala语言开发的一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
三大主要角色:
kafka体系架构:若干个producer,若干broker,若干consumer,以及一个zookeeper集群。
由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK,同时因为kafka依赖zookeeper,所以需要还需安装zookeeper。
wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz tar -xzf kafka_2.11-2.4.1.tgz cd kafka_2.11-2.4.1
#broker.id属性在kafka集群中必须要是唯一 broker.id=0 #kafka部署的机器ip和提供服务的端口号 listeners=PLAINTEXT://ip:9092 #kafka的消息存储文件 log.dir=/usr/local/data/kafka-logs #kafka连接zookeeper的地址 zookeeper.connect=ip:2181
# 启动kafka,运行日志在logs目录的server.log文件里 bin/kafka-server-start.sh -daemon config/server.properties #-daemon后台启动,不会打印日志到控制台 或者用 bin/kafka-server-start.sh config/server.properties & jps #查看是否启动 # 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树 bin/zkCli.sh ls / #查看zk的根目录kafka相关节点 ls /brokers/ids #查看kafka节点 # 停止kafka bin/kafka-server-stop.sh基本使用
bin/kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 1 --topic test001 Created topic test001.#成功标志
–replication-factor:制定副本因子
–create :创建主题的动作指令
–topic:所创建主题的名称
–zookeeper:所要连接的zookeeper服务地址。
bin/kafka-topics.sh --list --zookeeper ip:2181
bin/kafka-topics.sh --delete --topic test --zookeeper ip:2181
bin/kafka-console-producer.sh --broker-list ip:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --whitelist "test|test-2"
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup-2 --topic test