
Pull模式说明
不同于 Flume 直接将数据推送到 Spark Streaming 中,第二种模式通过以下条 件运行一个正常的 Flume sink。Flume 将数据推送到 sink 中,并且数据保持 buffered 状态。Spark Streaming 使用一个可靠的 Flume 接收器和转换器从 sink 拉取数据。 只要当数据被接收并且被 Spark Streaming 备份后,转换器才运行成功。 这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要 为 Flume 配置一个正常的 sink。
# 定义agent的三大组件 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定义source a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/training/nginx/logs/flumeLogs a1.sources.r1.fileHeader=true # 定义sink a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.channel=c1 a1.sinks.k1.hostname=niit01 a1.sinks.k1.port=1314 # 定义channel a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity= 100 # 定义三者关系 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
程序功能说明:
该程序从SparkSink中拉取Flume中的数据:
一、查看下原始数据;
二、将数据简单处理,完成数据的统计,如求各个部门员工工资
编写程序,代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumeDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeDemo")
val streamingContext = new StreamingContext(sparkConf, Seconds(2))
// niit01为虚拟机主机名 val flumeEvent: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(streamingContext, "niit01", 1314)
// 查看接收到的原始数据
val flumeDStream: DStream[String] = flumeEvent.map(e => {
println(new String(e.event.getBody.array()))
new String(e.event.getBody.array())
})
// 分词,组合
val mapDStream: DStream[(Int,Int)] = flumeDStream.map(x => {
val strings = x.trim.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
(deptNo,salary)
})
// 实现统计
val resultDStream: DStream[(Int, Int)] = mapDStream.reduceByKey(_ + _)
// 打印结果
resultDStream.print()
// 启动实时计算
streamingContext.start()
// 等待计算结束
streamingContext.awaitTermination()
}
}
实验成功关键:上传如下几个jar到Flume的安装路径下的lib中:
先启动Flume,命令如下:
flume-ng agent -n a1 -f conf/a3.conf -Dflume.root.logger=INFO,console
启动成功后,如下所示:
运行上述编写的程序
将文件emp.csv(如没有,则先上传到虚拟机中)复制到Fume的source所对应的目录下(如/training/nginx/logs/flumeLogs),如下所示:
cp /root/emp.csv /training/nginx/logs/flumeLogs/
查看程序是否读取到数据并完成数据的处理,结果如下图所示: