
创建maven工程
添加spark相关依赖,在pom.xml中添加如下依赖
jar 2.11.8 2.4.8 2.12 2.7.3 org.apache.spark spark-core_${spark.artifact.version} ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.scala-lang scala-library ${scala.version} src/main/scala src/test/scala org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin jar-with-dependencies net.alchim31.maven scala-maven-plugin 4.5.4 compile testCompile ${scala.version}
【任务一】代码实现如下:
import org.apache.spark.{SparkConf, SparkContext}
object CountSalary {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountSalary.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
(deptNo,salary)
})
.reduceByKey(_+_)
.collect()
.foreach(println)
// 关闭sc
sc.stop()
}
}
【任务二】代码实现如下:
import org.apache.spark.{SparkConf, SparkContext}
object CountBonusAndSalary {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountBonusAndSalary.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
var bonus = 0
if (!"".equals(strings(6)) && null != strings(6)){
bonus = strings(6).toInt
}
(deptNo,salary+bonus)
})
.reduceByKey(_+_)
.collect()
.foreach(println)
// 关闭sc
sc.stop()
}
}
【任务三】代码实现如下:
import org.apache.spark.{SparkConf, SparkContext}
object CountBonusAndSalaryByAsc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountBonusAndSalaryByAsc.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
var bonus = 0
if (!"".equals(strings(6)) && null != strings(6)){
bonus = strings(6).toInt
}
(deptNo,salary+bonus)
})
.reduceByKey(_+_)
.sortByKey(true)
.collect()
.foreach(println)
// 关闭sc
sc.stop()
}
}
【任务四】代码实现如下:
import org.apache.spark.{SparkConf, SparkContext}
object CountTotalByAsc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountTotalByAsc.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
var bonus = 0
if (!"".equals(strings(6)) && null != strings(6)){
bonus = strings(6).toInt
}
(deptNo,salary+bonus)
})
.reduceByKey(_+_)
.sortBy(tuple2 => {
tuple2._2
},false)
.collect()
.foreach(println)
// 关闭sc
sc.stop()
}
}