Spark SQL

Reference

SQLConf 源码

// 定义了 SparkSQL 的配置属性
org.apache.spark.sql.internal.SQLConf
org.apache.spark.sql.internal.StaticSQLConf

SparkSQL Functions

SparkSQL 的函数类型结构如下:

Aggregate-like Functions

Aggregate Functions

-- Examples
SELECT  bool_and(col) AS result
FROM    (
          VALUES (false),
                 (false),
                 (NULL)
        ) AS tab(col);

Window Functions


User Defined Functions

UDF

注册 SparkSQL UDF 有两种方式sparkSession.udf.register()org.apache.spark.sql.function.udf()

object Test {

  case class Cat(name: String, age: Integer, sex: String)

  // 测试数据集
  val testDataset = Seq(
    Cat(null, null, null),
    Cat("喵", 1, "母"),
    Cat("嗷", 3, "公"),
    Cat("喵", 2, "母"),
    Cat("嗷", 1, "公")
  )

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    val sparkSession = SparkSession.builder().master("local").getOrCreate()
    import sparkSession.implicits._
    val catDF = sparkSession.sparkContext.makeRDD(testDataset).toDF

    /**
     * Spark 自带的`concat_ws(cols: Column*, sep: String)`函数,只要有一个Column 
     * 的值为`null`,`concat`的结果就会变为`null`。有时我们并不想这么做,那么我们实现
     * 一个`myConcat`方法解决这个问题
     * 注意:Spark UDF 不支持变长参数`cols: String*`,不过可以用下面的方式实现
     */
    val myConcatFunc = (cols: Seq[Any], sep: String) => cols.filterNot(_ == null).mkString(sep)

    // 使用 register() 方法
    // 这种方式注册的 udf 方法,只能在`selectExpr`中可见,而对于`DataFrame API`是不可见的
    sparkSession.udf.register("myConcat", myConcatFunc)
    catDF.selectExpr("myConcat(array(name, age, sex), '-') as concat").show()

    // 使用 udf()
    // DataFrame API
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.array
    import org.apache.spark.sql.functions.lit
    val myConcat = udf(myConcatFunc)
    val seq = lit("-")
    catDF.select(myConcat(array("name", "age", "sex"), seq).alias("concat")).show()
  }
}

UDAF

SparkSQL Join Operations

MySQL Join 示意图

SparkSQL DataFrame Common Operators

DataFrame 是一种泛型类型为 Row 的 Dataset,即type DataFrame = Dataset[Row]

SQL

Lateral View Explode

Explode Map

SELECT  account_id AS volc_account_id,
        product_code AS pm_product_code,
        MIN(`created_time`) AS stage_time
FROM    (
            SELECT  account_id,
                    temp.product_code,
                    temp.tag_value,
                    row_number() OVER (
                        PARTITION BY
                                account_id,
                                temp.product_code,
                                temp.tag_value
                        ORDER BY
                                tag.created_time DESC
                    ) AS rn,
                    tag.created_time
            FROM    xxx.dim_account_tag_df AS tag
            LATERAL VIEW
                    EXPLODE(json_to_map(tag.tag_value)) temp AS product_code,
                    tag_value
            WHERE   tag.date = '${date}'
            AND     tag.tag_key = 'xxx'
        )
WHERE   rn = 1
AND     tag_value = 1
GROUP BY
        account_id,
        product_code

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦