
前言:
Spark 可以通过 Python、Java 或 Scala 来使用 。
Spark 本身是用 Scala 写的,运行在 Java 虚拟机(JVM)上。要在电脑或集群上运行Spark,要做的准备工作只是安装 Java 6 或者更新的版本。如果希望使用 Python 接口,你还需要一个 Python 解释器(2.6 以上版本)。Spark 尚不支持 Python 32。
这里本人所作演示全部基于Java 8,如果仅仅实在单机上学习Spark,只需要添加Spark的Maven依赖即可。
初始化Spark,创建SparkContext对象
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
//创建一个配置信息的对象,其中setMaster参数是 集群URL,告诉Spark如何连接到集群上面。
//我这里使用local作为参数,默认是讲Spark运行在单机单线程上面。
//第二个参数是应用名,方便连接上集群后在集群上找到我们的应用
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
//使用配置对象创建对应的SparkContext对象sc,有了sc就能创建RDD了。
JavaSparkContext sc = new JavaSparkContext(conf);
RDD介绍(重要)
RDD是Spark 对数据的核心抽象——弹性分布式数据集。RDD 其实就是分布式的元素集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。而在这一切背后,Spark 会自动将RDD 中的数据分发到集群上,并将操作并行化执行。Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。
创建RDD
用户可以使用两种方法创建 RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程
序中的对象集合(比如 list 和 set)。
1、读取程序中存在的一个集合
JavaRDDlines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
2、从外部存储中读取数据
JavaRDDlines = sc.textFile("/path/to/README.md");
3、读取一个外部数据集
另外再讲。
RDD操作
RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的 RDD 的操作,比如 map() 和 filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first()。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。
1、转化操作
RDD 的转化操作是返回新 RDD 的操作。转化出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作 RDD 中的一个元素。不过并不是所有的转化操作都是这样的。
2、行动操作
行动操作是第二种类型的 RDD 操作,它们会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。
惰性求值
惰性求值意味着当我们对 RDD 调用转化操作(例如调用 map())时,操作不会立即执行。相反,Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读取到RDD 的操作也同样是惰性的。因此,当我们调用sc.textFile() 时,数据并没有读取进来,而是在必要时才会读取。和转化操作一样的是,读取数据的操作也有可能会多次执行。虽然转化操作是惰性求值的,但还是可以随时通过运行一个行动操作来强制Spark 执行 RDD 的转化操作,比如使用 count()。这是一种对你所写的程序进行部分测试的简单方法。Spark 使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。在类似Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少 MapReduce 的周期数。而在 Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。
RDD常用操作方法
1、转换操作
举个例子:
使用Java 中的 flatMap() 将行数据切分为一个个单词,并返回一个新的RDD。
JavaRDDlines = sc.parallelize(Arrays.asList("hello world", "hi")); JavaRDD words = lines.flatMap(new FlatMapFunction () { public Iterable call(String line) { return Arrays.asList(line.split(" ")).iterator(); } }); words.first(); // 返回第一个单词"hello"
2、行动操作
举个例子:
创建一个新的RDD,存储的值为原来每个元素的平方数。这里result中存储的是:
1,4,9,16,25
// 构造数据源 Listdata = Arrays.asList(1, 2, 3, 4, 5); //并行化创建rdd JavaRDD rdd = sc.parallelize(data); //map中接口第一个泛型作为call方法参数,第二个泛型作为call方法返回值 JavaRDD result= rdd.map(new Function () { public Integer call(Integer x) throws Exception { return x*x; } });