Spark SQL

Reference

SQLConf 源码

// 定义了 SparkSQL 的配置属性
org.apache.spark.sql.internal.SQLConf
org.apache.spark.sql.internal.StaticSQLConf

SparkSQL Functions

SparkSQL 的函数类型结构如下:

Aggregate-like Functions

Aggregate Functions

-- Examples
SELECT  bool_and(col) AS result
FROM    (
          VALUES (false),
                 (false),
                 (NULL)
        ) AS tab(col);

Window Functions


User Defined Functions

UDF

注册 SparkSQL UDF 有两种方式sparkSession.udf.register()org.apache.spark.sql.function.udf()

object Test {

  case class Cat(name: String, age: Integer, sex: String)

  // 测试数据集
  val testDataset = Seq(
    Cat(null, null, null),
    Cat("喵", 1, "母"),
    Cat("嗷", 3, "公"),
    Cat("喵", 2, "母"),
    Cat("嗷", 1, "公")
  )

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    val sparkSession = SparkSession.builder().master("local").getOrCreate()
    import sparkSession.implicits._
    val catDF = sparkSession.sparkContext.makeRDD(testDataset).toDF

    /**
     * Spark 自带的`concat_ws(cols: Column*, sep: String)`函数,只要有一个Column 
     * 的值为`null`,`concat`的结果就会变为`null`。有时我们并不想这么做,那么我们实现
     * 一个`myConcat`方法解决这个问题
     * 注意:Spark UDF 不支持变长参数`cols: String*`,不过可以用下面的方式实现
     */
    val myConcatFunc = (cols: Seq[Any], sep: String) => cols.filterNot(_ == null).mkString(sep)

    // 使用 register() 方法
    // 这种方式注册的 udf 方法,只能在`selectExpr`中可见,而对于`DataFrame API`是不可见的
    sparkSession.udf.register("myConcat", myConcatFunc)
    catDF.selectExpr("myConcat(array(name, age, sex), '-') as concat").show()

    // 使用 udf()
    // DataFrame API
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.array
    import org.apache.spark.sql.functions.lit
    val myConcat = udf(myConcatFunc)
    val seq = lit("-")
    catDF.select(myConcat(array("name", "age", "sex"), seq).alias("concat")).show()
  }
}

UDAF

SparkSQL Join Operations

MySQL Join 示意图

SparkSQL DataFrame Common Operators

DataFrame 是一种泛型类型为 Row 的 Dataset,即type DataFrame = Dataset[Row]

SQL

Lateral View Explode

Explode Map

SELECT  account_id AS volc_account_id,
        product_code AS pm_product_code,
        MIN(`created_time`) AS stage_time
FROM    (
            SELECT  account_id,
                    temp.product_code,
                    temp.tag_value,
                    row_number() OVER (
                        PARTITION BY
                                account_id,
                                temp.product_code,
                                temp.tag_value
                        ORDER BY
                                tag.created_time DESC
                    ) AS rn,
                    tag.created_time
            FROM    xxx.dim_account_tag_df AS tag
            LATERAL VIEW
                    EXPLODE(json_to_map(tag.tag_value)) temp AS product_code,
                    tag_value
            WHERE   tag.date = '${date}'
            AND     tag.tag_key = 'xxx'
        )
WHERE   rn = 1
AND     tag_value = 1
GROUP BY
        account_id,
        product_code

Common

Union Values

INSERT OVERWRITE TABLE db.table PARTITION (date = '${date}')
SELECT  id,
        sla_condition_str,
        sla_id,
        lower_condition,
        lower_target,
        upper_condition,
        upper_target,
        compensate_rate,
        version,
        created_time,
        updated_time,
        status,
        is_deleted
FROM    db.tablename
WHERE   date = '${date}'
UNION
VALUES 
(
    1000001, -- id
    '[0.99,1.0]', -- condition_str
    1000001, -- sla_id
    '', -- lower_condition
    '', --lower_target
    '', -- upper_condition
    '', -- upper_target
    '', -- compensate_rate
    NULL, -- version
    NULL, -- created_time
    NULL, -- updated_time
    0,
    '0' --is_deleted
)

SparkSQL 中 <=>= 号的区别,在 where 或者 on 条件中比较重要,当数据都为NULL时使用=会导致关联结果为NULL而没有成功关联造成数据丢失

SELECT  a <=> b, -- true
        a = b    -- NULL
FROM    (
            SELECT  NULL AS a,
                    NULL AS b
        )

错误率补点

WITH
-- TLS Kafka 不保证数据 Exactly once,这里完全去重是为了做类似幂等的处理
distinct_log AS(
    SELECT  DISTINCT *
    FROM    eps_volc_sla.ods_sla_raw_http_log_from_tls_hourly
    WHERE   date = '${date}'
    AND     hour = '${hour}'
    AND     sli_key IN (
                -- TOS
                'tos_http_request_success_rate',
                -- TLS
                'tls_request_err_rate'
            )
),
-- 补点逻辑,小时级任务按小时补
template AS (
    SELECT  account_id,
            extra,
            sli_key,
            CAST((UNIX_TIMESTAMP('${date}${hour}', 'yyyyMMddHH') + i * (period * 60)) AS BIGINT) AS sli_time
    FROM    (
                SELECT  account_id,
                        extra,
                        sli_key,
                        period
                FROM    distinct_log
                GROUP BY
                        account_id,
                        extra,
                        sli_key,
                        period
            )
    LATERAL VIEW
            -- 例如: 一分钟一个点(period=1),一个小时就需要构造 60 个点  => SPLIT(SPACE(59), ' ')
            -- TOS 和 TLS 现在都是按照 1 小时补点
            POSEXPLODE(SPLIT(SPACE(60 / period - 1), ' ')) seq AS i,
            x
),
-- 指标计算过程
indicator AS (
    SELECT  (`time` - `time` % (60 * period)) AS sli_time,
            UNIX_TIMESTAMP() AS event_time,
            sli_key,
            extra,
            account_id,
            CASE WHEN sli_key = 'tos_http_request_success_rate' THEN
                 -- TOS 请求成功率
                 1 - CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
                 WHEN sli_key IN ('tls_request_err_rate') THEN
                 -- TLS 请求错误率
                 CAST(COUNT(IF(val >= 500 AND val < 600, 1, NULL)) AS DOUBLE) / COUNT(val)
            END AS sli_value
    FROM    distinct_log
    GROUP BY
            account_id,
            extra,
            sli_key,
            period,
            (`time` - `time` % (60 * period))
)
INSERT OVERWRITE TABLE eps_volc_sla.dwd_sli_event_log_tls_hourly PARTITION (date = '${date}', hour = '${hour}')
SELECT  template.sli_time,
        indicator.event_time,
        template.sli_key,
        template.extra,
        template.account_id AS vol_account_id,
        CASE WHEN template.sli_key = 'tos_http_request_success_rate' THEN
             -- TOS 成功率,默认成功率为 1
             COALESCE(indicator.sli_value, 1)
             WHEN template.sli_key IN ('tls_request_err_rate') THEN
             -- TLS 错误率,默认错误率为 0
             COALESCE(indicator.sli_value, 0)
        END AS sli_value
FROM    template
LEFT JOIN
        indicator
ON      template.account_id = indicator.account_id
AND     template.extra = indicator.extra
AND     template.sli_key = indicator.sli_key
AND     template.sli_time = indicator.sli_time

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦