Spark SQL

Reference

SQLConf 源码

// 定义了 SparkSQL 的配置属性
org.apache.spark.sql.internal.SQLConf
org.apache.spark.sql.internal.StaticSQLConf

SparkSQL Functions

SparkSQL 的函数类型结构如下:

Aggregate-like Functions

Aggregate Functions

-- Examples
SELECT  bool_and(col) AS result
FROM    (
          VALUES (false),
                 (false),
                 (NULL)
        ) AS tab(col);

Window Functions


User Defined Functions

UDF

注册 SparkSQL UDF 有两种方式sparkSession.udf.register()org.apache.spark.sql.function.udf()

object Test {

  case class Cat(name: String, age: Integer, sex: String)

  // 测试数据集
  val testDataset = Seq(
    Cat(null, null, null),
    Cat("喵", 1, "母"),
    Cat("嗷", 3, "公"),
    Cat("喵", 2, "母"),
    Cat("嗷", 1, "公")
  )

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    val sparkSession = SparkSession.builder().master("local").getOrCreate()
    import sparkSession.implicits._
    val catDF = sparkSession.sparkContext.makeRDD(testDataset).toDF

    /**
     * Spark 自带的`concat_ws(cols: Column*, sep: String)`函数,只要有一个Column 
     * 的值为`null`,`concat`的结果就会变为`null`。有时我们并不想这么做,那么我们实现
     * 一个`myConcat`方法解决这个问题
     * 注意:Spark UDF 不支持变长参数`cols: String*`,不过可以用下面的方式实现
     */
    val myConcatFunc = (cols: Seq[Any], sep: String) => cols.filterNot(_ == null).mkString(sep)

    // 使用 register() 方法
    // 这种方式注册的 udf 方法,只能在`selectExpr`中可见,而对于`DataFrame API`是不可见的
    sparkSession.udf.register("myConcat", myConcatFunc)
    catDF.selectExpr("myConcat(array(name, age, sex), '-') as concat").show()

    // 使用 udf()
    // DataFrame API
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.array
    import org.apache.spark.sql.functions.lit
    val myConcat = udf(myConcatFunc)
    val seq = lit("-")
    catDF.select(myConcat(array("name", "age", "sex"), seq).alias("concat")).show()
  }
}

UDAF

SparkSQL Join Operations

MySQL Join 示意图

SparkSQL DataFrame Common Operators

DataFrame 是一种泛型类型为 Row 的 Dataset,即type DataFrame = Dataset[Row]

SQL

Lateral View Explode

Explode Map

SELECT  account_id AS volc_account_id,
        product_code AS pm_product_code,
        MIN(`created_time`) AS stage_time
FROM    (
            SELECT  account_id,
                    temp.product_code,
                    temp.tag_value,
                    row_number() OVER (
                        PARTITION BY
                                account_id,
                                temp.product_code,
                                temp.tag_value
                        ORDER BY
                                tag.created_time DESC
                    ) AS rn,
                    tag.created_time
            FROM    xxx.dim_account_tag_df AS tag
            LATERAL VIEW
                    EXPLODE(json_to_map(tag.tag_value)) temp AS product_code,
                    tag_value
            WHERE   tag.date = '${date}'
            AND     tag.tag_key = 'xxx'
        )
WHERE   rn = 1
AND     tag_value = 1
GROUP BY
        account_id,
        product_code

Common

Union Values

INSERT OVERWRITE TABLE db.table PARTITION (date = '${date}')
SELECT  id,
        sla_condition_str,
        sla_id,
        lower_condition,
        lower_target,
        upper_condition,
        upper_target,
        compensate_rate,
        version,
        created_time,
        updated_time,
        status,
        is_deleted
FROM    db.tablename
WHERE   date = '${date}'
UNION
VALUES 
(
    1000001, -- id
    '[0.99,1.0]', -- condition_str
    1000001, -- sla_id
    '', -- lower_condition
    '', --lower_target
    '', -- upper_condition
    '', -- upper_target
    '', -- compensate_rate
    NULL, -- version
    NULL, -- created_time
    NULL, -- updated_time
    0,
    '0' --is_deleted
)

SparkSQL 中 <=>= 号的区别,在 where 或者 on 条件中比较重要,当数据都为NULL时使用=会导致关联结果为NULL而没有成功关联造成数据丢失

SELECT  a <=> b, -- true
        a = b    -- NULL
FROM    (
            SELECT  NULL AS a,
                    NULL AS b
        )

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦