
peopleDF = spark.read.option("header","true").csv("people.csv")
peopleDF['age']
Column
peopleDF.age
Column
peopleDF.select(peopleDF.age).show()
+---+
|age|
+---+
| 52|
| 32|
| 28|
3、示例:列引用(Scala)
val peopleDF = spark.read.option("header","true").csv("people.csv")
peopleDF("age")
org.apache.spark.sql.Column = age
$"age"
org.apache.spark.sql.ColumnName = age
peopleDF.select(peopleDF("age")).show
+---+
|age|
+---+
| 52|
| 32|
4、列表达式
peopleDF.select("lastName", peopleDF.age * 10).show()
+--------+----------+
|lastName|(age * 10)|
+--------+----------+
| Hopper| 520|
| Turing| 320|
peopleDF.where(peopleDF.firstName.startswith("A")).show()
+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|94020| Turing| Alan| 32|
|94020|Lovelace| Ada| 28|
+-----+--------+---------+---+
6、示例:列表达式(Scala)
peopleDF.select($"lastName", $"age" * 10).show
+--------+----------+
|lastName|(age * 10)|
+--------+----------+
| Hopper| 520|
| Turing| 320|
peopleDF.where(peopleDF("firstName").startsWith("A")).show
+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|94020| Turing| Alan| 32|
|94020|Lovelace| Ada| 28|
+-----+--------+---------+---+
7、列别名(1)
peopleDF.select("lastName",(peopleDF.age * 10).alias("age_10")).show()
+--------+------+
|lastName|age_10|
+--------+------+
| Hopper| 520|
| Turing| 320|
peopleDF.select($"lastName",($"age" * 10).alias("age_10")).show
+--------+------+
|lastName|age_10|
+--------+------+
| Hopper| 520|
| Turing| 320|
二、Queries的分组(Group)和聚合(Aggregation)
1、Queries的聚合(Aggregation)
peopleDF.groupBy("pcode").count().show()
+-----+-----+
|pcode|count|
+-----+-----+
|94020| 2|
|87501| 1|
|02134| 2|
+-----+-----+
2、转换(Transformation)中的groupBy
python:
from pyspark.sql.functions import stddev
peopleDF.groupBy("pcode").agg(stddev("age")).show()
+-----+------------------+
|pcode| stddev_samp(age)|
+-----+------------------+
|94020|0.7071067811865476|
|87501| NaN|
|02134|2.1213203435596424|
+-----+------------------+
三、Joining Dataframes
1、Joining Dataframes
Scala:
val peopleDF = spark.read.option("header","true").csv("people-no-pcode.csv")
val pcodesDF = spark.read.option("header","true").csv("pcodes.csv")
python:
peopleDF.join(pcodesDF, "pcode").show() +-----+--------+---------+---+---------+-----+ |pcode|lastName|firstName|age| city|state| +-----+--------+---------+---+---------+-----+ |02134| Hopper| Grace| 52| Boston| MA| |94020|Lovelace| Ada| 28|Palo Alto| CA| |87501| Babbage| Charles| 49| Santa Fe| NM| |02134| Wirth| Niklaus| 48| Boston| MA| +-----+--------+---------+---+---------+-----+3、示例:一个Left Outer Join
python:
peopleDF.join(pcodesDF, "pcode", "left_outer").show() +-----+--------+---------+---+---------+-----+ |pcode|lastName|firstName|age| city|state| +-----+--------+---------+---+---------+-----+ |02134| Hopper| Grace| 52| Boston| MA| | null| Turing| Alan| 32| null| null| |94020|Lovelace| Ada| 28|Palo Alto| CA| |87501| Babbage| Charles| 49| Santa Fe| NM| |02134| Wirth| Niklaus| 48| Boston| MA| +-----+--------+---------+---+---------+-----+
Scala:
peopleDF.join(pcodesDF,peopleDF("pcode") === pcodesDF("pcode"),"left_outer").show
+-----+--------+---------+---+---------+-----+
|pcode|lastName|firstName|age| city|state|
+-----+--------+---------+---+---------+-----+
|02134| Hopper| Grace| 52| Boston| MA|
| null| Turing| Alan| 32| null| null|
|94020|Lovelace| Ada| 28|Palo Alto| CA|
|87501| Babbage| Charles| 49| Santa Fe| NM|
|02134| Wirth| Niklaus| 48| Boston| MA|
+-----+--------+---------+---+---------+-----+
4、示例:对名称不同的列进行连接
Scala:
peopleDF.join(zcodesDF, $"pcode" === $"zip").show +-----+--------+---------+---+-----+---------+-----+ |pcode|lastName|firstName|age| zip| city|state| +-----+--------+---------+---+-----+---------+-----+ |02134| Hopper| Grace| 52|02134| Boston| MA| |94020|Lovelace| Ada| 28|94020|Palo Alto| CA| |87501| Babbage| Charles| 49|87501| Santa Fe| NM| |02134| Wirth| Niklaus| 48|02134| Boston| MA| +-----+--------+---------+---+-----+---------+-----+
python:
peopleDF.join(zcodesDF,peopleDF.pcode == zcodesDF.zip).show() +-----+--------+---------+---+-----+---------+-----+ |pcode|lastName|firstName|age| zip| city|state| +-----+--------+---------+---+-----+---------+-----+ |02134| Hopper| Grace| 52|02134| Boston| MA| |94020|Lovelace| Ada| 28|94020|Palo Alto| CA| |87501| Babbage| Charles| 49|87501| Santa Fe| NM| |02134| Wirth| Niklaus| 48|02134| Boston| MA| +-----+--------+---------+---+-----+---------+-----+四、基本要点
1、可选:查看Column类(在Python模块pyspark中)的API文档。sql和Scala包org.apache.spark.sql)。注意各种可用的选项。
2、在终端中启动Spark shell(如果还没有运行)。
3、基于Hive devsh,新建一个名为accountsDF的Dataframe。accounts表。
4、尝试使用select进行一个简单的查询,使用两种列引用语法。
pyspark> accountsDF.select(accountsDF["first_name"]).show()
pyspark> accountsDF.select(accountsDF.first_name).show()
scala> accountsDF.select(accountsDF("first_name")).show
scala> accountsDF.select($"first_name").show
5、要研究列表达式,请基于accountsDF Dataframe中的first_name列创建一个要使用的列对象。
pyspark> fnCol = accountsDF.first_name
scala> val fnCol = accountsDF("first_name")
6、注意,对象类型是Column。要查看可用的方法和属性,请使用制表符补全—即输入fnCol。其次是选项卡。
7、当您对现有列执行操作时,将创建New Column对象。在上面创建的fnCol对象上使用相等操作符,根据一个列表达式创建一个新的Column对象,该列表达式标识名为Lucy的用户。
pyspark> lucyCol = (fnCol == "Lucy")
scala> val lucyCol = (fnCol === "Lucy")
8、在选择语句中使用lucyCol列表达式。因为lucyCol是基于布尔表达式的,所以列值将是true或false,具体取决于first_name列的值。确认以true标识名为Lucy的用户。
pyspark> accountsDF.select(accountsDF.first_name,accountsDF.last_name,lucyCol).show()
scala> accountsDF.select($"first_name",$"last_name",lucyCol).show
9、where操作需要一个基于布尔的列表达式。在where转换中使用lucyCol列表达式,并在生成的Dataframe中查看数据。确认数据中只有名为Lucy的用户。
> accountsDF.where(lucyCol).show(5)
10、列表达式不需要赋值给变量。尝试不使用lucyCol变量的相同查询。
pyspark> accountsDF.where(fnCol == "Lucy").show(5)
scala> accountsDF.where(fnCol === "Lucy").show(5)
11、列表达式不限于上面的操作。它们可以用于任何可以使用简单列的转换,例如选择。尝试选择城市和州列,以及phone_number列的前三个字符(在美国,电话号码的前三个数字被称为地区代码)。在phone_number列上使用substr操作符提取区域代码。
pyspark> accountsDF.select("city","state",accountsDF.phone_number.substr(1,3)).show(5)
scala> accountsDF.select($"city",$"state",$"phone_number".substr(1,3)).show(5)
12、注意,在最后一步中,查询返回的值是正确的,但是列名是子字符串(phone_number, 1, 3),它很长,很难处理。重复相同的查询,使用别名操作符将该列重命名为area_code。
pyspark> accountsDF.select("city","state",accountsDF.phone_number.substr(1,3).alias("area_code")).show(5)
scala> accountsDF.select($"city",$"state",$"phone_number".substr(1,3).alias("area_code")).show(5)
13、执行一个查询,结果是只包含first_name和last_name列的Dataframe,并且只包含名和姓都以相同的两个字母开头的用户。(例如,用户Roberta Roget将被包括在内,因为她的姓和名都以“Ro”开头。)
2、按名称分组和计数14、使用带有count的groupBy查询accountsDF Dataframe,以找出共享每个姓氏的总人数。(注意计数聚合转换返回一个Dataframe,不像计数Dataframe操作,它返回一个单一的值给驱动程序。)
pyspark> accountsDF.groupBy("last_name").count().show(5)
scala> accountsDF.groupBy("last_name").count.show(5)
15、还可以按多列分组。再次查询accountsDF,这一次计算拥有相同姓和名的人数。
pyspark> accountsDF.groupBy("last_name","first_name").count().show(5)
scala> accountsDF.groupBy("last_name","first_name").count.show(5)
3、通过邮政编码将帐户数据与蜂窝塔连接
16、在本节中,您将将一直使用的帐户数据与位于base_stations中的蜂窝基站位置数据连接起来。镶木地板文件。首先检查模式和数据的一些记录。在单独的终端窗口(不是运行Spark shell的终端窗口)中使用parquet-tools命令。
$ parquet-tools schema $DEVDATA/base_stations.parquet
$ parquet-tools head $DEVDATA/base_stations.parquet
17、上传数据文件到HDFS。
$ hdfs dfs -put $DEVDATA/base_stations.parquet /devsh_loudacre/
18、在Spark shell中,使用基站数据创建一个名为baseDF的新Dataframe。检查baseDF模式和数据,确保它与Parquet文件中的数据匹配。
scala> val baseDF = spark.read.parquet("/devsh_loudacre/base_stations.parquet")
19、一些账户持有人居住在有基站的邮政编码地区。加入baseDF和accountsDF以找到这些用户。对于每个用户,包括他们的帐户ID、名、姓,以及他们的邮政编码中基站的ID和位置数据(纬度和经度)。
pyspark> accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF, baseDF.zip == accountsDF.zipcode).show()
scala> accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF,$"zip" === $"zipcode").show()
4、计数活动设备
20、accountdevice CSV数据集包含所有帐户使用的所有设备的列表。数据集中的每一行包括行ID、账户ID、设备类型的ID、设备被激活的日期和具体的设备ID。CSV数据文件在$DEVDATA/accountdevice目录下。查看数据集中的数据,然后将该目录及其内容上传到HDFS目录/ devsh_loudacre/accountdevice。
21、基于accountdevice数据文件创建Dataframe。
22、使用帐户设备数据和之前在本练习中创建的Dataframes来查找所有活动帐户(即尚未关闭的帐户)中每个设备模型的总数。新的Dataframe应该从最常用的模型到最不常用的模型进行排序。将数据保存为Parquet文件,保存在/devsh_loudacre/top_devices目录下,列如下:
提示: