Spark SQL

Reference

SQLConf 源码

// 定义了 SparkSQL 的配置属性
org.apache.spark.sql.internal.SQLConf
org.apache.spark.sql.internal.StaticSQLConf

SparkSQL Functions

SparkSQL 的函数类型结构如下:

Aggregate-like Functions

Aggregate Functions

-- Examples
SELECT  bool_and(col) AS result
FROM    (
          VALUES (false),
                 (false),
                 (NULL)
        ) AS tab(col);

Window Functions


User Defined Functions

UDF

注册 SparkSQL UDF 有两种方式sparkSession.udf.register()org.apache.spark.sql.function.udf()

object Test {

  case class Cat(name: String, age: Integer, sex: String)

  // 测试数据集
  val testDataset = Seq(
    Cat(null, null, null),
    Cat("喵", 1, "母"),
    Cat("嗷", 3, "公"),
    Cat("喵", 2, "母"),
    Cat("嗷", 1, "公")
  )

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    val sparkSession = SparkSession.builder().master("local").getOrCreate()
    import sparkSession.implicits._
    val catDF = sparkSession.sparkContext.makeRDD(testDataset).toDF

    /**
     * Spark 自带的`concat_ws(cols: Column*, sep: String)`函数,只要有一个Column 
     * 的值为`null`,`concat`的结果就会变为`null`。有时我们并不想这么做,那么我们实现
     * 一个`myConcat`方法解决这个问题
     * 注意:Spark UDF 不支持变长参数`cols: String*`,不过可以用下面的方式实现
     */
    val myConcatFunc = (cols: Seq[Any], sep: String) => cols.filterNot(_ == null).mkString(sep)

    // 使用 register() 方法
    // 这种方式注册的 udf 方法,只能在`selectExpr`中可见,而对于`DataFrame API`是不可见的
    sparkSession.udf.register("myConcat", myConcatFunc)
    catDF.selectExpr("myConcat(array(name, age, sex), '-') as concat").show()

    // 使用 udf()
    // DataFrame API
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.array
    import org.apache.spark.sql.functions.lit
    val myConcat = udf(myConcatFunc)
    val seq = lit("-")
    catDF.select(myConcat(array("name", "age", "sex"), seq).alias("concat")).show()
  }
}

UDAF

SQL 调优

Spark Sql 调优详解,何止性能翻倍!

JOIN ON 条件中避免使用 OR 条件,在 MySQL 中会导致索引失效,执行时间变长,改成两个查询再 UNION / UNION ALL + GROUP BY 去重

Spark SQL 执行顺序

# SQL 的语法如下
(8) SELECT 
(1) FROM [left_table]
(3) <join_type> JOIN <right_table>
(2) ON <join_condition>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_field_list>
(6) WITH <cube|roll_up>
(7) HAVING <having_condition>
(10) ORDER BY <order_by_field_list>

# 按执行顺序排序后
(1) FROM [left_table]
(2) ON <join_condition>
(3) <join_type> JOIN <right_table>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_field_list>
(6) WITH <cube|roll_up>
(7) HAVING <having_condition>
(8) SELECT (9) DISTINCT <select_field_list>
(10) ORDER BY <order_by_field_list>

SparkSQL Join

SQL 的所有操作,可以分为简单操作(如过滤WHERE、限制次数LIMIT等)和聚合操作(GROUP BYJOIN等),其中 JOIN 操作是最复杂、代价最大的操作类型。

Join Operations

MySQL Join 示意图

Join 策略与原理

当前 SparkSQL 支持 5 种 JOIN 策略,其中前两者归根到底都属于Hash Join,只不过在Hash Join之前需要先 Shuffle 还是先 Broadcast。

  1. Broadcast Hash Join
  2. Shuffle Hash Join
  3. Shuffle Sort Merge Join
  4. Cartesian Product Join: 笛卡尔乘积连接
  5. Broadcast Nested Loop Join: 广播嵌套循环连接

影响 JOIN 操作效率的三大因素

  1. Join type is equi-join or not: 连接类型是否为等值连接。等值连接是一个在连接条件中只包含 equals 比较的连接,而非等值连接包含除 equals 以外的任何比较,例如<,>,>=,<=。由于非等值连接需要对不确定的值的范围进行比较,因而嵌套循环是必须的。因此,对于非等值连接,SparkSQL 只支持Broadcast Nested Loop Join(广播嵌套循环连接)和Cartesian Product Join(笛卡尔乘积连接)。而所有连接运算符都支持等值连接。SparkSQL 定义了ExtractEquiJoinKeys模式,JoinSelection(规划连接操作的核心对象)使用它来检查逻辑连接计划是否是等值连接。如果是等值连接,连接的元素将从逻辑计划中提取出来,包括连接类型、左键、右键、连接条件、左连接关系、右连接关系和连接提示。这些元素的信息构成了接下来连接规划过程的基础。
  2. Join strategy hint: 连接策略提示。
  3. Size of Join relations: 连接数据集的大小。

Spark Join 选择

SparkSQL Join 谓词下推 PushPredicateThroughJoin

SparkSQL DataFrame Common Operators

DataFrame 是一种泛型类型为 Row 的 Dataset,即type DataFrame = Dataset[Row]

SQL

Lateral View Explode

Explode Map

SELECT  account_id AS volc_account_id,
        product_code AS pm_product_code,
        MIN(`created_time`) AS stage_time
FROM    (
            SELECT  account_id,
                    temp.product_code,
                    temp.tag_value,
                    row_number() OVER (
                        PARTITION BY
                                account_id,
                                temp.product_code,
                                temp.tag_value
                        ORDER BY
                                tag.created_time DESC
                    ) AS rn,
                    tag.created_time
            FROM    xxx.dim_account_tag_df AS tag
            LATERAL VIEW
                    EXPLODE(json_to_map(tag.tag_value)) temp AS product_code,
                    tag_value
            WHERE   tag.date = '${date}'
            AND     tag.tag_key = 'xxx'
        )
WHERE   rn = 1
AND     tag_value = 1
GROUP BY
        account_id,
        product_code

Common

Union Values

INSERT OVERWRITE TABLE db.table PARTITION (date = '${date}')
SELECT  id,
        sla_condition_str,
        sla_id,
        lower_condition,
        lower_target,
        upper_condition,
        upper_target,
        compensate_rate,
        version,
        created_time,
        updated_time,
        status,
        is_deleted
FROM    db.tablename
WHERE   date = '${date}'
UNION
VALUES 
(
    1000001, -- id
    '[0.99,1.0]', -- condition_str
    1000001, -- sla_id
    '', -- lower_condition
    '', --lower_target
    '', -- upper_condition
    '', -- upper_target
    '', -- compensate_rate
    NULL, -- version
    NULL, -- created_time
    NULL, -- updated_time
    0,
    '0' --is_deleted
)

SparkSQL 中 <=>= 号的区别,在 where 或者 on 条件中比较重要,当数据都为NULL时使用=会导致关联结果为NULL而没有成功关联造成数据丢失

SELECT  a <=> b, -- true
        a = b    -- NULL
FROM    (
            SELECT  NULL AS a,
                    NULL AS b
        )

错误率补点

WITH
-- TLS Kafka 不保证数据 Exactly once,这里完全去重是为了做类似幂等的处理
distinct_log AS(
    SELECT  DISTINCT *
    FROM    eps_volc_sla.ods_sla_raw_http_log_from_tls_hourly
    WHERE   date = '${date}'
    AND     hour = '${hour}'
    AND     sli_key IN (
                -- TOS
                'tos_http_request_success_rate',
                -- TLS
                'tls_request_err_rate'
            )
),
-- 补点逻辑,小时级任务按小时补
template AS (
    SELECT  account_id,
            extra,
            sli_key,
            CAST((UNIX_TIMESTAMP('${date}${hour}', 'yyyyMMddHH') + i * (period * 60)) AS BIGINT) AS sli_time
    FROM    (
                SELECT  account_id,
                        extra,
                        sli_key,
                        period
                FROM    distinct_log
                GROUP BY
                        account_id,
                        extra,
                        sli_key,
                        period
            )
    LATERAL VIEW
            -- 例如: 一分钟一个点(period=1),一个小时就需要构造 60 个点  => SPLIT(SPACE(59), ' ')
            -- TOS 和 TLS 现在都是按照 1 小时补点
            POSEXPLODE(SPLIT(SPACE(60 / period - 1), ' ')) seq AS i,
            x
),
-- 指标计算过程
indicator AS (
    SELECT  (`time` - `time` % (60 * period)) AS sli_time,
            UNIX_TIMESTAMP() AS event_time,
            sli_key,
            extra,
            account_id,
            CASE WHEN sli_key = 'tos_http_request_success_rate' THEN
                 -- TOS 请求成功率
                 1 - CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
                 WHEN sli_key IN ('tls_request_err_rate') THEN
                 -- TLS 请求错误率
                 CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
            END AS sli_value
    FROM    distinct_log
    GROUP BY
            account_id,
            extra,
            sli_key,
            period,
            (`time` - `time` % (60 * period))
)
INSERT OVERWRITE TABLE eps_volc_sla.dwd_sli_event_log_tls_hourly PARTITION (date = '${date}', hour = '${hour}')
SELECT  template.sli_time,
        indicator.event_time,
        template.sli_key,
        template.extra,
        template.account_id AS vol_account_id,
        CASE WHEN template.sli_key = 'tos_http_request_success_rate' THEN
             -- TOS 成功率,默认成功率为 1
             COALESCE(indicator.sli_value, 1)
             WHEN template.sli_key IN ('tls_request_err_rate') THEN
             -- TLS 错误率,默认错误率为 0
             COALESCE(indicator.sli_value, 0)
        END AS sli_value
FROM    template
LEFT JOIN
        indicator
ON      template.account_id = indicator.account_id
AND     template.extra = indicator.extra
AND     template.sli_key = indicator.sli_key
AND     template.sli_time = indicator.sli_time

数据倾斜,打散大 KEY

-- 假设某个大客户 account_id = 100001 下有 1 亿条收入明细,现在要关联客户信息维表扩充一些客户属性
WITH dim_account_bucket AS(
    SELECT  account_id,
            bucket_num,
            bucket_no
    FROM    (
                SELECT  account_id, -- 账号 ID
                        (COUNT(1) / 1000000) AS bucket_num -- 根据账号拥有的收入明细数量决定要分桶的数据量,这里平均每 100 万条一个 bucket
                        -- 倾斜的收入明细表
                FROM    skewed_income_detail
                GROUP BY
                        account_id
            )
    LATERAL VIEW
            -- SPACE(bucket_num) 构造由 bucket_num 个空格拼接成的字符串
            -- SPLIT(SPACE(bucket_num), ' ') 将 bucket_num 个字符串按照空格拆分,生成一个长度为 bucket_num + 1,值为空字符串的数组
            -- POSEXPLODE(...) seq AS bucket_no, x; 将数组炸开并获取数组下标,其中 bucket_no 为从 0 开始的数组下标,x 为元素值
            POSEXPLODE(SPLIT(SPACE(bucket_num), ' ')) seq AS bucket_no,
            x
),
explode_account AS (
    SELECT  account.*,
            dim_account_bucket.bucket_num,
            dim_account_bucket.bucket_no
    FROM    account
    LEFT JOIN
            dim_account_bucket
    ON      account.id = dim_account_bucket.account_id
)
SELECT  *
FROM    skewed_income_detail
LEFT JOIN
        explode_account
        -- JOIN 的关联 KEY 从单一的 account_id 被打散成了,
        -- account_id + MOD(skewed_income_detail.id, explode_account.bucket_num)
        -- 这样一个 account_id 的收入明细就可以被拆分到多个节点上去并发执行了
ON      skewed_income_detail.account_id = explode_account.account_id
AND     MOD(skewed_income_detail.id, explode_account.bucket_num) = explode_account.bucket_no

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦