Reference
SQLConf 源码
// 定义了 SparkSQL 的配置属性
org.apache.spark.sql.internal.SQLConf
org.apache.spark.sql.internal.StaticSQLConf
SparkSQL Functions
SparkSQL 的函数类型结构如下:
- Built-in Functions
- Scalar Functions: 普通函数,作用于每行记录,对一行记录中的某些列进行计算,得出一个返回值作为一个新列,表的记录数不改变。
- Aggregate-like Functions
- Aggregate Functions: 作用于一组记录,对这一组的数据的列进行聚合计算得出一个值,做聚合后结果表的总记录数通常会减少,例如
select max(age) from person group by sex
- Window Functions: 窗口函数有别于聚合函数,聚合函数分组中的所有记录都会参与计算,最终每个分组得出一条结果记录。而窗口函数只是限定一个窗口范围,窗口内的每一条记录都会进行计算,计算的过程会涉及到窗口内的其他数据参与计算,并且得出的最终记录数不会减少。例如窗口内有5条记录,计算完的结果表依然还有5条记录。
- Aggregate Functions: 作用于一组记录,对这一组的数据的列进行聚合计算得出一个值,做聚合后结果表的总记录数通常会减少,例如
- UDFs (User-Defined Functions)
Aggregate-like Functions
Aggregate Functions
-- Examples
SELECT bool_and(col) AS result
FROM (
VALUES (false),
(false),
(NULL)
) AS tab(col);
Window Functions
User Defined Functions
UDF
注册 SparkSQL UDF 有两种方式sparkSession.udf.register()
和org.apache.spark.sql.function.udf()
object Test {
case class Cat(name: String, age: Integer, sex: String)
// 测试数据集
val testDataset = Seq(
Cat(null, null, null),
Cat("喵", 1, "母"),
Cat("嗷", 3, "公"),
Cat("喵", 2, "母"),
Cat("嗷", 1, "公")
)
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder().master("local").getOrCreate()
import sparkSession.implicits._
val catDF = sparkSession.sparkContext.makeRDD(testDataset).toDF
/**
* Spark 自带的`concat_ws(cols: Column*, sep: String)`函数,只要有一个Column
* 的值为`null`,`concat`的结果就会变为`null`。有时我们并不想这么做,那么我们实现
* 一个`myConcat`方法解决这个问题
* 注意:Spark UDF 不支持变长参数`cols: String*`,不过可以用下面的方式实现
*/
val myConcatFunc = (cols: Seq[Any], sep: String) => cols.filterNot(_ == null).mkString(sep)
// 使用 register() 方法
// 这种方式注册的 udf 方法,只能在`selectExpr`中可见,而对于`DataFrame API`是不可见的
sparkSession.udf.register("myConcat", myConcatFunc)
catDF.selectExpr("myConcat(array(name, age, sex), '-') as concat").show()
// 使用 udf()
// DataFrame API
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.array
import org.apache.spark.sql.functions.lit
val myConcat = udf(myConcatFunc)
val seq = lit("-")
catDF.select(myConcat(array("name", "age", "sex"), seq).alias("concat")).show()
}
}
UDAF
SQL 调优
JOIN ON 条件中避免使用 OR 条件,在 MySQL 中会导致索引失效,执行时间变长,改成两个查询再 UNION / UNION ALL + GROUP BY 去重
Spark SQL 执行顺序
# SQL 的语法如下
(8) SELECT
(1) FROM [left_table]
(3) <join_type> JOIN <right_table>
(2) ON <join_condition>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_field_list>
(6) WITH <cube|roll_up>
(7) HAVING <having_condition>
(10) ORDER BY <order_by_field_list>
# 按执行顺序排序后
(1) FROM [left_table]
(2) ON <join_condition>
(3) <join_type> JOIN <right_table>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_field_list>
(6) WITH <cube|roll_up>
(7) HAVING <having_condition>
(8) SELECT (9) DISTINCT <select_field_list>
(10) ORDER BY <order_by_field_list>
SparkSQL Join
SQL 的所有操作,可以分为简单操作(如过滤WHERE
、限制次数LIMIT
等)和聚合操作(GROUP BY
、JOIN
等),其中 JOIN 操作是最复杂、代价最大的操作类型。
Join Operations
Join 策略与原理
当前 SparkSQL 支持 5 种 JOIN 策略,其中前两者归根到底都属于Hash Join
,只不过在Hash Join
之前需要先 Shuffle 还是先 Broadcast。
Broadcast Hash Join
Shuffle Hash Join
Shuffle Sort Merge Join
Cartesian Product Join
: 笛卡尔乘积连接Broadcast Nested Loop Join
: 广播嵌套循环连接
影响 JOIN 操作效率的三大因素
Join type is equi-join or not
: 连接类型是否为等值连接。等值连接是一个在连接条件中只包含 equals 比较的连接,而非等值连接包含除 equals 以外的任何比较,例如<
,>
,>=
,<=
。由于非等值连接需要对不确定的值的范围进行比较,因而嵌套循环是必须的。因此,对于非等值连接,SparkSQL 只支持Broadcast Nested Loop Join
(广播嵌套循环连接)和Cartesian Product Join
(笛卡尔乘积连接)。而所有连接运算符都支持等值连接。SparkSQL 定义了ExtractEquiJoinKeys
模式,JoinSelection
(规划连接操作的核心对象)使用它来检查逻辑连接计划是否是等值连接。如果是等值连接,连接的元素将从逻辑计划中提取出来,包括连接类型、左键、右键、连接条件、左连接关系、右连接关系和连接提示。这些元素的信息构成了接下来连接规划过程的基础。Join strategy hint
: 连接策略提示。Size of Join relations
: 连接数据集的大小。
SparkSQL Join 谓词下推 PushPredicateThroughJoin
SparkSQL DataFrame Common Operators
DataFrame 是一种泛型类型为 Row 的 Dataset,即
type DataFrame = Dataset[Row]
SQL
Lateral View Explode
Explode Map
SELECT account_id AS volc_account_id,
product_code AS pm_product_code,
MIN(`created_time`) AS stage_time
FROM (
SELECT account_id,
temp.product_code,
temp.tag_value,
row_number() OVER (
PARTITION BY
account_id,
temp.product_code,
temp.tag_value
ORDER BY
tag.created_time DESC
) AS rn,
tag.created_time
FROM xxx.dim_account_tag_df AS tag
LATERAL VIEW
EXPLODE(json_to_map(tag.tag_value)) temp AS product_code,
tag_value
WHERE tag.date = '${date}'
AND tag.tag_key = 'xxx'
)
WHERE rn = 1
AND tag_value = 1
GROUP BY
account_id,
product_code
Common
Union Values
INSERT OVERWRITE TABLE db.table PARTITION (date = '${date}')
SELECT id,
sla_condition_str,
sla_id,
lower_condition,
lower_target,
upper_condition,
upper_target,
compensate_rate,
version,
created_time,
updated_time,
status,
is_deleted
FROM db.tablename
WHERE date = '${date}'
UNION
VALUES
(
1000001, -- id
'[0.99,1.0]', -- condition_str
1000001, -- sla_id
'', -- lower_condition
'', --lower_target
'', -- upper_condition
'', -- upper_target
'', -- compensate_rate
NULL, -- version
NULL, -- created_time
NULL, -- updated_time
0,
'0' --is_deleted
)
SparkSQL 中 <=>
和 =
号的区别,在 where 或者 on 条件中比较重要,当数据都为NULL
时使用=
会导致关联结果为NULL
而没有成功关联造成数据丢失
SELECT a <=> b, -- true
a = b -- NULL
FROM (
SELECT NULL AS a,
NULL AS b
)
错误率补点
WITH
-- TLS Kafka 不保证数据 Exactly once,这里完全去重是为了做类似幂等的处理
distinct_log AS(
SELECT DISTINCT *
FROM eps_volc_sla.ods_sla_raw_http_log_from_tls_hourly
WHERE date = '${date}'
AND hour = '${hour}'
AND sli_key IN (
-- TOS
'tos_http_request_success_rate',
-- TLS
'tls_request_err_rate'
)
),
-- 补点逻辑,小时级任务按小时补
template AS (
SELECT account_id,
extra,
sli_key,
CAST((UNIX_TIMESTAMP('${date}${hour}', 'yyyyMMddHH') + i * (period * 60)) AS BIGINT) AS sli_time
FROM (
SELECT account_id,
extra,
sli_key,
period
FROM distinct_log
GROUP BY
account_id,
extra,
sli_key,
period
)
LATERAL VIEW
-- 例如: 一分钟一个点(period=1),一个小时就需要构造 60 个点 => SPLIT(SPACE(59), ' ')
-- TOS 和 TLS 现在都是按照 1 小时补点
POSEXPLODE(SPLIT(SPACE(60 / period - 1), ' ')) seq AS i,
x
),
-- 指标计算过程
indicator AS (
SELECT (`time` - `time` % (60 * period)) AS sli_time,
UNIX_TIMESTAMP() AS event_time,
sli_key,
extra,
account_id,
CASE WHEN sli_key = 'tos_http_request_success_rate' THEN
-- TOS 请求成功率
1 - CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
WHEN sli_key IN ('tls_request_err_rate') THEN
-- TLS 请求错误率
CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
END AS sli_value
FROM distinct_log
GROUP BY
account_id,
extra,
sli_key,
period,
(`time` - `time` % (60 * period))
)
INSERT OVERWRITE TABLE eps_volc_sla.dwd_sli_event_log_tls_hourly PARTITION (date = '${date}', hour = '${hour}')
SELECT template.sli_time,
indicator.event_time,
template.sli_key,
template.extra,
template.account_id AS vol_account_id,
CASE WHEN template.sli_key = 'tos_http_request_success_rate' THEN
-- TOS 成功率,默认成功率为 1
COALESCE(indicator.sli_value, 1)
WHEN template.sli_key IN ('tls_request_err_rate') THEN
-- TLS 错误率,默认错误率为 0
COALESCE(indicator.sli_value, 0)
END AS sli_value
FROM template
LEFT JOIN
indicator
ON template.account_id = indicator.account_id
AND template.extra = indicator.extra
AND template.sli_key = indicator.sli_key
AND template.sli_time = indicator.sli_time
数据倾斜,打散大 KEY
-- 假设某个大客户 account_id = 100001 下有 1 亿条收入明细,现在要关联客户信息维表扩充一些客户属性
WITH dim_account_bucket AS(
SELECT account_id,
bucket_num,
bucket_no
FROM (
SELECT account_id, -- 账号 ID
(COUNT(1) / 1000000) AS bucket_num -- 根据账号拥有的收入明细数量决定要分桶的数据量,这里平均每 100 万条一个 bucket
-- 倾斜的收入明细表
FROM skewed_income_detail
GROUP BY
account_id
)
LATERAL VIEW
-- SPACE(bucket_num) 构造由 bucket_num 个空格拼接成的字符串
-- SPLIT(SPACE(bucket_num), ' ') 将 bucket_num 个字符串按照空格拆分,生成一个长度为 bucket_num + 1,值为空字符串的数组
-- POSEXPLODE(...) seq AS bucket_no, x; 将数组炸开并获取数组下标,其中 bucket_no 为从 0 开始的数组下标,x 为元素值
POSEXPLODE(SPLIT(SPACE(bucket_num), ' ')) seq AS bucket_no,
x
),
explode_account AS (
SELECT account.*,
dim_account_bucket.bucket_num,
dim_account_bucket.bucket_no
FROM account
LEFT JOIN
dim_account_bucket
ON account.id = dim_account_bucket.account_id
)
SELECT *
FROM skewed_income_detail
LEFT JOIN
explode_account
-- JOIN 的关联 KEY 从单一的 account_id 被打散成了,
-- account_id + MOD(skewed_income_detail.id, explode_account.bucket_num)
-- 这样一个 account_id 的收入明细就可以被拆分到多个节点上去并发执行了
ON skewed_income_detail.account_id = explode_account.account_id
AND MOD(skewed_income_detail.id, explode_account.bucket_num) = explode_account.bucket_no