Reference
Window Functions
row_number
-- 线索转化情况
select  field_name,
        count(distinct sf_id) as num
from    (
            select  sf_id,
                    field_name,
                    before_value,
                    after_value,
                    modified_time,
                    -- row_number() 可以不指定窗口范围
                    row_number() over w as row_id
            from    eps_clickhouse.app_cloud_lead_modification_history_rt
            where   field_name in (
                                   'converted_opportunity_id', -- 商机
                                   'converted_account_id', -- 客户
                                   'converted_contact_id' -- 联系人
                    )
            and     modified_time between 1666000000 and 1666181576
            and     after_value is not null
            window  w as (
                partition by
                    sf_id, field_name
                order by
                    modified_time desc
                rows    between unbounded preceding and unbounded following
            
        )
where   row_id = 1
and     after_value != '' -- 不为空值代表有转换商机/客户/联系人
group by
    field_name;
first_value
-- 有效线索变化情况
select  count(
            distinct if(
                -- 最早一条记录的原始值不属于有效线索
                first_before_value not in (
                                           'Transfer Account', --  转出客户
                                           'Transfer Contact', -- 转出联系人
                                           'Transfer Opportunities', -- 转出商机
                                           'General Lead', -- 一般线索
                                           'Lead Nurture', -- 线索培育
                                           'Insufficient Budget' -- 费用不足
                )
                -- 最新一条记录的变更值属于有效线索
                and last_after_value in (
                                         'Transfer Account', --  转出客户
                                         'Transfer Contact', -- 转出联系人
                                         'Transfer Opportunities', -- 转出商机
                                         'General Lead', -- 一般线索
                                         'Lead Nurture', -- 线索培育
                                         'Insufficient Budget' -- 费用不足
                ),
                sf_id,
                null
            )
        ) as effect_lead_increment, -- 有效线索增加
        count(
            distinct if(
                -- 最早一条记录的原始值属于有效线索
                first_before_value in (
                                       'Transfer Account', --  转出客户
                                       'Transfer Contact', -- 转出联系人
                                       'Transfer Opportunities', -- 转出商机
                                       'General Lead', -- 一般线索
                                       'Lead Nurture', -- 线索培育
                                       'Insufficient Budget' -- 费用不足
                )
                -- 最新一条记录的变更值不属于有效线索
                and last_after_value in (
                                         'Transfer Account', --  转出客户
                                         'Transfer Contact', -- 转出联系人
                                         'Transfer Opportunities', -- 转出商机
                                         'General Lead', -- 一般线索
                                         'Lead Nurture', -- 线索培育
                                         'Insufficient Budget' -- 费用不足
                ),
                sf_id,
                null
            )
        ) as effect_lead_reduce -- 有效线索减少
from    (
            select  sf_id,
                    -- first_value() & last_value() 开窗必须指定 rows 窗口范围,否则返回的是当前的值 
                    first_value(before_value) over w as first_before_value, --最早一条记录的原始值
                    last_value(after_value) over w as last_after_value -- 最新一条记录的变更值
            from    eps_clickhouse.app_cloud_lead_modification_history_rt
            where   field_name in ('lead_follow_up_status')
            and     modified_time between 1666000000 and 1666181576
            window  w as (
                partition by
                    sf_id
                order by
                    modified_time asc
                rows    between unbounded preceding and unbounded following
            )
        );
SQL Optimization
ClickHouse Join 查询模型原理
优化场景 - 分页查询
-- 常规分页查询实现,查询耗时 30s+ (涉及全部数据排序,并且分组聚合计算字段很多,大概几十个...)
select  accounting_period,
        owner_id,
        sum(original_bill_amount) as original_bill_amount,
        ......
from    eps_data_bc.app_trade_bc_bill_charge_item_daily_rf as main
where   biz_period >= '2008-01-01 00:00:00'
......
group by
        accounting_period,
        owner_id
        ......
having  original_bill_amount != 0
order by
        accounting_period desc,
        ......
limit   10, 20 
SETTINGS enable_optimize_predicate_expression = 0, 
prefer_localhost_replica = 0;
JOIN 优化
优化思路
- 通过 Join 子查询减少数据扫描数量,降低磁盘 IO
- 将 Shuffle Join 改成 Local Join,数据本地计算,避免数据 Shuffle 带来的高额网络 IO 开销
通过 distributed_product_mode 参数开启分布式 Join,适用于分布式表 IN/JOIN 子查询,使用该参数时需要注意
- 仅适用于 IN/JOIN 的子查询语句
- 仅适用当主 FROM 表是包含 1 个分片以上的分布式表
- 涉及的子查询表是包含 1 个分片上分布式表(不然没啥效果)
- 不能用于 table-valued 远程函数(Not used for a table-valued remote function.)
可选参数值
- deny— 默认值,禁止使用两个分布式表 IN/JOIN 类型的子查询 (返回- Double-distributed in/JOIN subqueries is denied异常)
- local— 将子查询中的分布式数据库和表替换成远程分片的本地数据库和表,只在每个远程节点本地进行 IN/JOIN 计算(Normal IN/JOIN),没有数据 Shuffle,涉及网络 IO 开销很小。- Colocate Join需要分片键保证参与 JOIN 的数据都分布在一个 Shard 节点上才可以,否则会得出错误的结果。例如涉及 JOIN 的几张表都按 bill_owner_id 作为分片键存储,IN/JOIN 条件包含 bill_owner_id 字段,保证相关联的两条数据都在一个节点上,就可以进行本地计算
- global— 将 IN/JOIN 替换成- GLOBAL IN或- GLOBAL JOIN,对应- Broadcast Join,会把子查询的结果广播到各个节点执行,Shuffle 有很大的网络 IO 开销
- allow— 允许使用这些类型的子查询
Colocate Join
-- 优化方案1: 深分页场景性能不好,但浅分页很快, 2s 左右
select  accounting_period,
        owner_id,
        sum(original_bill_amount) as original_bill_amount,
    ......
    from    eps_data_bc.app_trade_bc_bill_charge_item_daily_rf as main
    join    (
    select  accounting_period,
    owner_id,
    ......
    from    eps_data_bc.app_trade_bc_bill_charge_item_daily_rf
    where   biz_period >= '2008-01-01 00:00:00'
    ......
    group by
    accounting_period,
    owner_id
    ......
    having  sum(original_bill_amount) != 0
    order by
    accounting_period desc,
    ......
        -- 取第 10-20 条时,也把每个节点的 0-20 条全部查出来排序,保证数据全局有序
        -- Offset 永远是 0,类似 ES 的分页查询,但会随着分页的增加,这里需要查询排序的数据会越来越大
    limit   0, 20
    ) as dim
on      main.accounting_period = dim.accounting_period
    and     main.owner_id = dim.owner_id
    and     ......
    group by
    accounting_period,
    owner_id
    ......
    order by
    accounting_period desc,
    ......
    limit   10, 10 SETTINGS
    -- 1. Only applied for IN and JOIN subqueries.
    -- 2. Only if the FROM section uses a distributed table containing more than one shard.
    -- Join 只做本地节点的关联聚合,不做全局的关联聚合,需要保证相关联的数据在分布在一个节点上
    distributed_product_mode = 'local', prefer_localhost_replica = 0;
Broadcast Join
-- 优化方案2: 测试耗时 6s-12s
select  main.accounting_period,
        main.owner_id,
        sum(original_bill_amount) as original_bill_amount,
        ......
from    eps_data_bc.app_trade_bc_bill_charge_item_daily_rf as main
-- 这里换成 global join 优先级更高,会覆盖 distributed_product_mode 参数配置
join    (
            
            -- 保证数据唯一就行,可以是主键,或者联合唯一键,理论上涉及字段越少速度越快,
            -- 因此可以用 siphash64(accounting_period, ...) as uid 优化(极少概率出现 Hash 碰撞)
            select  accounting_period,
                    owner_id,
                    ......
            from    eps_data_bc.app_trade_bc_bill_charge_item_daily_rf
            where   biz_period >= '2008-01-01 00:00:00'
            ......
            group by
                    accounting_period,
                    owner_id
                    ......
            having  sum(original_bill_amount) != 0 -- 这个条件就会增加 2-4s 的查询时长
            order by
                    accounting_period desc,
                    ......
            limit   10, 10
                    -- 这里不能开 no_merge 否则数据排序会有问题,必须全局排序,开的话就成每个节点单独排序了,会返回 50 条数据
                    SETTINGS distributed_group_by_no_merge = 0
        ) as dim
        -- 和分组字段一致,且包含分片键 owner_id
on      main.accounting_period = dim.accounting_period
and     main.owner_id = dim.owner_id
and     ......
where   main.biz_period >= '2008-01-01 00:00:00'
......
group by
        accounting_period,
        owner_id
        ......
order by
        accounting_period desc,
        ......
SETTINGS
        -- 这里分布式子查询必须是 `global` 不然,每个节点的 10,10 数据返回不一样
        -- `global` — Replaces the IN/JOIN query with GLOBAL IN/GLOBAL JOIN
        -- 不写这个参数,上面用 global join 也一样,这个参数优先级不如 global join 高
        distributed_product_mode = 'global',
        -- 这里可以开启 group_by_no_merge,bill_owner_id 分片键已经限制了节点聚合数据
        -- 但整体感觉没什么用... 开不开效果一样不知道是不是继承了子查询的配置
        distributed_group_by_no_merge = 1;
IN 子查询优化
优化思路:减少数据扫描数量,降低磁盘 IO 开销
-- 用 IN 子查询(排序键)加速查询
select  id,
        toYYYYMM(accounting_period) as accounting_period_alias,
        bill_id,
        payer_id,
    ...
    from    data_bc_lf.app_trade_bc_bill_rf
where
    accounting_period = '2023-05-01'
  and     bill_owner_id = 2100215562
  and     uid in (
    select  uid -- 其中 uid 为排序键(如果是物理排序键,in 查询还会有额外的加速效果)
    from    data_bc_lf.app_trade_bc_bill_rf
    where   accounting_period = '2023-05-01'
  and     bill_owner_id = 2100215562
  and     bill_type = 'normal'
  and     subject_no in ('3423', '2065')
  and     data_display_rule in ('2', '3')
    order by
    expense_begin_time desc,
    uid
    limit   1000, 10
    ) SETTINGS enable_optimize_predicate_expression = 0, max_threads = 80, distributed_group_by_no_merge = 1, prefer_localhost_replica = 0
    if bill_owner_id is not empty
    , distributed_product_mode = 'local'
arrayJoin
一张表可以通过这种方式做到多种不同分析维度的动态聚合,而不用担心数据异常膨胀问题,每次只使用一组维度分析时动态膨胀即可
-- 如果 arrayJoin 的是不同字段,那么多次 arrayJoin 最会导致笛卡尔积,
-- 如果 arrayJoin 的内容完全相同,只会执行一次,不会产生笛卡尔积
-- 
-- 例如下面的 SQL 只会返回两条数据
-- SELECT 
--   categories,
--   arrayJoin(categories_1) AS category_1,
--   arrayJoin(categories_1) AS category_2
--   ...
--
-- 下面的会产生 2*2=4 条数据,虽然用的是同一个字段,但是实际语句并不相同
-- SELECT  
--   categories,
--   arrayJoin(if(empty(categories_1), [''], categories_1)) AS category_1,
--   arrayJoin(if(empty(categories_1), ['{}'], categories_1)) AS category_2
--
-- 下面的 SQL 会返回 2*2*2=8 条数据
SELECT  categories,
        arrayJoin(categories_1) AS category_1,
        arrayJoin(categories_2) AS category_2,
        arrayJoin(categories_3) AS category_3
FROM    (
            SELECT  categories,
                    arrayMap(
                        json -> JSONExtractString(json, 'label'),
                        categories
                    ) AS categories_1,
                    arrayMap(
                        json -> JSONExtractString(json, 'children', 'label'),
                        categories
                    ) AS categories_2,
                    arrayMap(
                        json -> JSONExtractString(json, 'children', 'children', 'label'),
                        categories
                    ) AS categories_3
            FROM    (
                        SELECT  JSONExtractArrayRaw(
                                    JSONExtractRaw(
                                        '{
    "业务模块": {
        "field_key": "field_a03070",
        "field_value": [
            {
                "label": "平台基础",
                "value": "cjxocs11a",
                "children": {
                    "label": "基础设施",
                    "value": "cjxocs11a",
                    "children": {
                        "label": "IAM",
                        "value": "cjxocs11a"
                    }
                }
            },
            {
                "label": "账务财资",
                "value": "cjxocs11a",
                "children": {
                    "label": "数据平台",
                    "value": "cjxocs11a",
                    "children": {
                        "label": "数据工程",
                        "value": "cjxocs11a"
                    }
                }
            }
        ],
        "target_state": null,
        "field_type_key": "tree_multi_select",
        "field_alias": ""
    }
}',
                                        '业务模块'
                                    ),
                                    'field_value'
                                ) AS categories
                    )
        )
实际效果,只返回两条数据
SELECT  JSONExtractRaw(
            arrayJoin(
                JSONExtractArrayRaw(
                    JSONExtractRaw(origin_business, '业务模块'),
                    'field_value'
                )
            ),
            'label'
        ) AS category_1,
        JSONExtractRaw(
            arrayJoin(
                JSONExtractArrayRaw(
                    JSONExtractRaw(origin_business, '业务模块'),
                    'field_value'
                )
            ),
            'children',
            'label'
        ) AS category_2,
        JSONExtractRaw(
            arrayJoin(
                JSONExtractArrayRaw(
                    JSONExtractRaw(origin_business, '业务模块'),
                    'field_value'
                )
            ),
            'children',
            'children',
            'label'
        ) AS category_3
FROM    (
            SELECT  '{
    "业务模块": {
        "field_key": "field_a03070",
        "field_value": [
            {
                "label": "平台基础",
                "value": "cjxocs11a",
                "children": {
                    "label": "基础设施",
                    "value": "cjxocs11a",
                    "children": {
                        "label": "IAM",
                        "value": "cjxocs11a"
                    }
                }
            },
            {
                "label": "账务财资",
                "value": "cjxocs11a",
                "children": {
                    "label": "数据平台",
                    "value": "cjxocs11a",
                    "children": {
                        "label": "数据工程",
                        "value": "cjxocs11a"
                    }
                }
            }
        ],
        "target_state": null,
        "field_type_key": "tree_multi_select",
        "field_alias": ""
    }
}' AS origin_business
        )
GROUP BY
        JSONExtractRaw(
            arrayJoin(
                JSONExtractArrayRaw(
                    JSONExtractRaw(origin_business, '业务模块'),
                    'field_value'
                )
            ),
            'label'
        ) -- category_1 
        ,
        JSONExtractRaw(
            arrayJoin(
                JSONExtractArrayRaw(
                    JSONExtractRaw(origin_business, '业务模块'),
                    'field_value'
                )
            ),
            'children',
            'label'
        ) -- category_2
        ,
        JSONExtractRaw(
            arrayJoin(
                JSONExtractArrayRaw(
                    JSONExtractRaw(origin_business, '业务模块'),
                    'field_value'
                )
            ),
            'children',
            'children',
            'label'
        ) -- category_3

 
            
            
 
                 
                
