Reference
SQLConf 源码
// 定义了 SparkSQL 的配置属性
org.apache.spark.sql.internal.SQLConf
org.apache.spark.sql.internal.StaticSQLConf
SparkSQL Functions
SparkSQL 的函数类型结构如下:
- Built-in Functions
- Scalar Functions: 普通函数,作用于每行记录,对一行记录中的某些列进行计算,得出一个返回值作为一个新列,表的记录数不改变。
- Aggregate-like Functions
- Aggregate Functions: 作用于一组记录,对这一组的数据的列进行聚合计算得出一个值,做聚合后结果表的总记录数通常会减少,例如
select max(age) from person group by sex
- Window Functions: 窗口函数有别于聚合函数,聚合函数分组中的所有记录都会参与计算,最终每个分组得出一条结果记录。而窗口函数只是限定一个窗口范围,窗口内的每一条记录都会进行计算,计算的过程会涉及到窗口内的其他数据参与计算,并且得出的最终记录数不会减少。例如窗口内有5条记录,计算完的结果表依然还有5条记录。
- Aggregate Functions: 作用于一组记录,对这一组的数据的列进行聚合计算得出一个值,做聚合后结果表的总记录数通常会减少,例如
- UDFs (User-Defined Functions)
Aggregate-like Functions
Aggregate Functions
-- Examples
SELECT bool_and(col) AS result
FROM (
VALUES (false),
(false),
(NULL)
) AS tab(col);
Window Functions
User Defined Functions
UDF
注册 SparkSQL UDF 有两种方式sparkSession.udf.register()
和org.apache.spark.sql.function.udf()
object Test {
case class Cat(name: String, age: Integer, sex: String)
// 测试数据集
val testDataset = Seq(
Cat(null, null, null),
Cat("喵", 1, "母"),
Cat("嗷", 3, "公"),
Cat("喵", 2, "母"),
Cat("嗷", 1, "公")
)
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder().master("local").getOrCreate()
import sparkSession.implicits._
val catDF = sparkSession.sparkContext.makeRDD(testDataset).toDF
/**
* Spark 自带的`concat_ws(cols: Column*, sep: String)`函数,只要有一个Column
* 的值为`null`,`concat`的结果就会变为`null`。有时我们并不想这么做,那么我们实现
* 一个`myConcat`方法解决这个问题
* 注意:Spark UDF 不支持变长参数`cols: String*`,不过可以用下面的方式实现
*/
val myConcatFunc = (cols: Seq[Any], sep: String) => cols.filterNot(_ == null).mkString(sep)
// 使用 register() 方法
// 这种方式注册的 udf 方法,只能在`selectExpr`中可见,而对于`DataFrame API`是不可见的
sparkSession.udf.register("myConcat", myConcatFunc)
catDF.selectExpr("myConcat(array(name, age, sex), '-') as concat").show()
// 使用 udf()
// DataFrame API
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.array
import org.apache.spark.sql.functions.lit
val myConcat = udf(myConcatFunc)
val seq = lit("-")
catDF.select(myConcat(array("name", "age", "sex"), seq).alias("concat")).show()
}
}
UDAF
SparkSQL Join Operations
SparkSQL DataFrame Common Operators
DataFrame 是一种泛型类型为 Row 的 Dataset,即
type DataFrame = Dataset[Row]
SQL
Lateral View Explode
Explode Map
SELECT account_id AS volc_account_id,
product_code AS pm_product_code,
MIN(`created_time`) AS stage_time
FROM (
SELECT account_id,
temp.product_code,
temp.tag_value,
row_number() OVER (
PARTITION BY
account_id,
temp.product_code,
temp.tag_value
ORDER BY
tag.created_time DESC
) AS rn,
tag.created_time
FROM xxx.dim_account_tag_df AS tag
LATERAL VIEW
EXPLODE(json_to_map(tag.tag_value)) temp AS product_code,
tag_value
WHERE tag.date = '${date}'
AND tag.tag_key = 'xxx'
)
WHERE rn = 1
AND tag_value = 1
GROUP BY
account_id,
product_code
Common
Union Values
INSERT OVERWRITE TABLE db.table PARTITION (date = '${date}')
SELECT id,
sla_condition_str,
sla_id,
lower_condition,
lower_target,
upper_condition,
upper_target,
compensate_rate,
version,
created_time,
updated_time,
status,
is_deleted
FROM db.tablename
WHERE date = '${date}'
UNION
VALUES
(
1000001, -- id
'[0.99,1.0]', -- condition_str
1000001, -- sla_id
'', -- lower_condition
'', --lower_target
'', -- upper_condition
'', -- upper_target
'', -- compensate_rate
NULL, -- version
NULL, -- created_time
NULL, -- updated_time
0,
'0' --is_deleted
)
SparkSQL 中 <=>
和 =
号的区别,在 where 或者 on 条件中比较重要,当数据都为NULL
时使用=
会导致关联结果为NULL
而没有成功关联造成数据丢失
SELECT a <=> b, -- true
a = b -- NULL
FROM (
SELECT NULL AS a,
NULL AS b
)
错误率补点
WITH
-- TLS Kafka 不保证数据 Exactly once,这里完全去重是为了做类似幂等的处理
distinct_log AS(
SELECT DISTINCT *
FROM eps_volc_sla.ods_sla_raw_http_log_from_tls_hourly
WHERE date = '${date}'
AND hour = '${hour}'
AND sli_key IN (
-- TOS
'tos_http_request_success_rate',
-- TLS
'tls_request_err_rate'
)
),
-- 补点逻辑,小时级任务按小时补
template AS (
SELECT account_id,
extra,
sli_key,
CAST((UNIX_TIMESTAMP('${date}${hour}', 'yyyyMMddHH') + i * (period * 60)) AS BIGINT) AS sli_time
FROM (
SELECT account_id,
extra,
sli_key,
period
FROM distinct_log
GROUP BY
account_id,
extra,
sli_key,
period
)
LATERAL VIEW
-- 例如: 一分钟一个点(period=1),一个小时就需要构造 60 个点 => SPLIT(SPACE(59), ' ')
-- TOS 和 TLS 现在都是按照 1 小时补点
POSEXPLODE(SPLIT(SPACE(60 / period - 1), ' ')) seq AS i,
x
),
-- 指标计算过程
indicator AS (
SELECT (`time` - `time` % (60 * period)) AS sli_time,
UNIX_TIMESTAMP() AS event_time,
sli_key,
extra,
account_id,
CASE WHEN sli_key = 'tos_http_request_success_rate' THEN
-- TOS 请求成功率
1 - CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
WHEN sli_key IN ('tls_request_err_rate') THEN
-- TLS 请求错误率
CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
END AS sli_value
FROM distinct_log
GROUP BY
account_id,
extra,
sli_key,
period,
(`time` - `time` % (60 * period))
)
INSERT OVERWRITE TABLE eps_volc_sla.dwd_sli_event_log_tls_hourly PARTITION (date = '${date}', hour = '${hour}')
SELECT template.sli_time,
indicator.event_time,
template.sli_key,
template.extra,
template.account_id AS vol_account_id,
CASE WHEN template.sli_key = 'tos_http_request_success_rate' THEN
-- TOS 成功率,默认成功率为 1
COALESCE(indicator.sli_value, 1)
WHEN template.sli_key IN ('tls_request_err_rate') THEN
-- TLS 错误率,默认错误率为 0
COALESCE(indicator.sli_value, 0)
END AS sli_value
FROM template
LEFT JOIN
indicator
ON template.account_id = indicator.account_id
AND template.extra = indicator.extra
AND template.sli_key = indicator.sli_key
AND template.sli_time = indicator.sli_time