Spark API 全集(1):Spark SQL Dataset & DataFrame API

简介

org.apache.spark.sql.Dataset是Spark SQL中核心的类,定义如下:

1
class Dataset[T] extends Serializable

DataFrame是Dataset[Row]的别名。

本文基于spark2.3.0.

下面是类方法简介。

类方法

Actions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
collect(): Array[T]
返回一个数组,包含Dataset所有行的数据。
注意:所有数据会被加载进driver进程的内存。

collectAsList(): List[T]
同上,但是返回Java list。

count(): Long
数据行数

describe(cols: String*): DataFrame
计算指定列的统计指标,包括count, mean, stddev, min, and max.

head(): T
返回第一行

head(n: Int): Array[T]
返回前N

first(): T
返回第一行,是head()的别名。

foreach(f: (T) ⇒ Unit): Unit
所有元素上应用f函数

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
所有元素分区上应用f函数

reduce(func: (T, T) ⇒ T): T
根据映射函数func,对RDD中的元素进行二元计算,返回计算结果。
注意:提供的函数应满足交换律及结合律,否则计算结果将是非确定的。

show(numRows: Int, truncate: Int, vertical: Boolean): Unit
表格形式打印出数据。numRows:显示的行数,truncate:裁剪字符串类型值到指定长度,vertical:垂直打印。

show(numRows: Int, truncate: Int): Unit
show(numRows: Int, truncate: Boolean): Unit
show(truncate: Boolean): Unit
numRows=20 truncate=20

show(numRows: Int): Unit
truncate=20

show(): Unit
numRows=20 truncate=20

summary(statistics: String*): DataFrame
计算数据集statistics指定的指标,可指定 count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
如未指定则会计算全部。

take(n: Int): Array[T]
获取前n行

takeAsList(n: Int): List[T]
获取前n行保存为list

toLocalIterator(): Iterator[T]
返回一个所有行的迭代器
The iterator will consume as much memory as the largest partition in this Dataset.


基本函数(Basic Dataset functions)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
as[U](implicit arg0: Encoder[U]): Dataset[U]
将数据映射成指定类型U,返回新的Dataset

persist(newLevel: StorageLevel): Dataset.this.type
缓存数据,可设置缓存级别。

persist(): Dataset.this.type
同cache方法

cache(): Dataset.this.type
缓存数据,MEMORY_AND_DISK模式。
注意:RDD的cache函数默认是MEMORY_ONLY

checkpoint(eager: Boolean): Dataset[T]
返回一个checkpointed的DatasetDataset的逻辑执行计划将被截断。

checkpoint(): Dataset[T]
同上,eager=true.

columns: Array[String]
数组形式返回所有列名。

dtypes: Array[(String, String)]
数组形式返回所有列名及类型。

createGlobalTempView(viewName: String): Unit
创建全局临时视图(view),生命周期与Spark应用一致。
可以跨session访问。e.g. SELECT * FROM global_temp.view1.

createOrReplaceGlobalTempView(viewName: String): Unit
同上,已存在则替换。

createTempView(viewName: String): Unit
创建本地临时视图(view),仅当前SparkSession可访问。
注意:不跟任何库绑定,不能用db1.view1这样的形式访问。

createOrReplaceTempView(viewName: String): Unit
同上,已存在则替换。

explain(): Unit
打印物理执行计划
另有:queryExecution变量,完整执行计划。

explain(extended: Boolean): Unit
打印物理+逻辑执行计划

hint(name: String, parameters: Any*): Dataset[T]
当前dataset指定hint。//todo
e.g. df1.join(df2.hint("broadcast"))

inputFiles: Array[String]
返回组成Dataset的输入文件(Returns a best-effort snapshot of the files that compose this Dataset

isLocal: Boolean
collect和take是否可以本地执行,不需要executor.

localCheckpoint(eager: Boolean): Dataset[T]
执行本地Checkpoint,返回新dataset。

localCheckpoint(): Dataset[T]
eager=true

printSchema(): Unit
打印schema结构

rdd: RDD[T]
dataset内部的RDD

schema: StructType
schema

storageLevel: StorageLevel
当前存储等级,没有被persist则是StorageLevel.NONE

toDF(): DataFrame
toDF(colNames: String*): DataFrame
转为DataFrame,也可以将RDD转为DataFrame

unpersist(): Dataset.this.type
unpersist(blocking: Boolean): Dataset.this.type
删除缓存,blocking表示是否等所有blocks删除后才返回,删除期间阻塞。

write: DataFrameWriter[T]
DataFrameWriter,非流式数据写接口。

writeStream: DataStreamWriter[T]
DataStreamWriter,流式数据写接口。

流式函数(streaming)

1
2
3
4
5
6
7
8
isStreaming: Boolean
是否流式数据

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
Defines an event time watermark for this Dataset.
//TODO


强类型转换(Typed transformations)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
alias(alias: Symbol): Dataset[T]
alias(alias: String): Dataset[T]
as(alias: Symbol): Dataset[T]
as(alias: String): Dataset[T]
Dataset一个别名

coalesce(numPartitions: Int): Dataset[T]
分区合并(只能减少分区)

distinct(): Dataset[T]
dropDuplicates的别名

dropDuplicates(col1: String, cols: String*): Dataset[T]
dropDuplicates(colNames: Array[String]): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(): Dataset[T]
根据指定字段,对数据去重。

except(other: Dataset[T]): Dataset[T]
去除other中也有的行。同EXCEPT DISTINCT in SQL
//TODO

filter(func: (T) ⇒ Boolean): Dataset[T]
filter(conditionExpr: String): Dataset[T]
filter(condition: Column): Dataset[T]
根据条件过滤行
e.g.
peopleDs.filter("age > 15")
peopleDs.filter($"age" > 15)

flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
第一步和map一样,最后将所有的输出合并。

groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
现根据func函数生成key,然后按key分组。

intersect(other: Dataset[T]): Dataset[T]
求两个dataset的交集,等同于INTERSECT in SQL.

joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
inner equi-join两个dataset

joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
joinType可选:inner, cross, outer, full, full_outer, left, left_outer, right, right_outer

limit(n: Int): Dataset[T]
返回前n行,与head的区别是,head是一个action,会马上返回结果数组。

map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
在每一个元素应用func函数,返回包含结果集的dataset。

mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
在每一个分区应用func函数,返回包含结果集的dataset。

orderBy(sortExprs: Column*): Dataset[T]
orderBy(sortCol: String, sortCols: String*): Dataset[T]
sort的别名

sort(sortExprs: Column*): Dataset[T]
sort(sortCol: String, sortCols: String*): Dataset[T]
按指定列排序,默认asc。
e.g. ds.sort($"col1", $"col2".desc)

sortWithinPartitions(sortExprs: Column*): Dataset[T]
sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T]
分区内排序,同"SORT BY" in SQL (Hive QL).

randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
按权重随机分割数据

repartition(partitionExprs: Column*): Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
repartition(numPartitions: Int): Dataset[T]
按指定表达式,分区数,重新分区(hash),同"DISTRIBUTE BY" in SQL
默认分区数为spark.sql.shuffle.partitions

repartitionByRange(partitionExprs: Column*): Dataset[T]
repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
按指定表达式,分区数,重新分区,采用Range partition方式,按键范围分区。
分区默认排序方式为ascending nulls first,分区内数据未排序。

sample(withReplacement: Boolean, fraction: Double): Dataset[T]
sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
sample(fraction: Double): Dataset[T]
sample(fraction: Double, seed: Long): Dataset[T]
随机取样本数据
withReplacement:Sample with replacement or not.
fraction:Fraction of rows to generate, range [0.0, 1.0].
seed:Seed for sampling.

select[U1](c1: TypedColumn[T, U1]): Dataset[U1]
根据列/表达式获取列数据

transform[U](t: (Dataset[T]) ⇒ Dataset[U]): Dataset[U]
应用t函数转换Dataset

union(other: Dataset[T]): Dataset[T]
等于UNION ALL in SQL
注意是按列位置合并:
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 4| 5| 6|
// +----+----+----+

unionByName(other: Dataset[T]): Dataset[T]
同union方法,但是按列名合并:
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 6| 4| 5|
// +----+----+----+

where(conditionExpr: String): Dataset[T]
where(condition: Column): Dataset[T]
filter的别名



弱类型转换(Untyped transformations)

返回类型为DataFrame而不是Dataset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
在整个dataset进行聚合。
ds.agg(...) 是 ds.groupBy().agg(...) 的简写。
e.g.
ds.agg(max($"age"), avg($"salary"))
ds.agg(Map("age" -> "max", "salary" -> "avg"))
ds.agg("age" -> "max", "salary" -> "avg")


apply(colName: String): Column
col(colName: String): Column
colRegex(colName: String): Column
返回指定列。

crossJoin(right: Dataset[_]): DataFrame
cross join。

cube(col1: String, cols: String*): RelationalGroupedDataset
cube(cols: Column*): RelationalGroupedDataset
使用指定列创建多维cube。
//TODO

drop(col: Column): DataFrame
drop(colNames: String*): DataFrame
drop(colName: String): DataFrame
剪掉指定字段。

groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy(cols: Column*): RelationalGroupedDataset
按指定列分组

join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumn: String): DataFrame
join(right: Dataset[_]): DataFrame
与另一个DataFrame join。
joinExprs:$"df1Key" === $"df2Key"
usingColumn:Seq("user_id", "user_name")
joinType:Default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.


na: DataFrameNaFunctions
DataFrameNaFunctions

stat: DataFrameStatFunctions
DataFrameStatFunctions

rollup(col1: String, cols: String*): RelationalGroupedDataset
rollup(cols: Column*): RelationalGroupedDataset
使用指定列进行rollup聚合。//TODO

select(col: String, cols: String*): DataFrame
select(cols: Column*): DataFrame
selectExpr(exprs: String*): DataFrame
选取指定列、SQL表达式。

withColumn(colName: String, col: Column): DataFrame
新增或替换一列。

withColumnRenamed(existingName: String, newName: String): DataFrame
将指定列更名。

未分组(Ungrouped)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

queryExecution: QueryExecution
执行计划

sparkSession: SparkSession
创建该dataset的SparkSession

sqlContext: SQLContext
dataset的SQLContext

toJSON: Dataset[String]
每行数据转成JSON字符串。

toString(): String
Any的toString

参考