树形结构中的批量迭代(概念验证)
GitLab 中的群组层级结构以树形表示,其中根元素是顶级命名空间(namespace),子元素是子群组或新引入的 Namespaces::ProjectNamespace 记录。
该树结构通过 namespaces 表中的 parent_id 列实现。该列指向父级命名空间记录,顶级命名空间没有 parent_id。
gitlab-org 的部分层级结构示例:
flowchart TD
A("gitlab-org (9979)") --- B("quality (2750817)")
B --- C("engineering-productivity (16947798)")
B --- D("performance-testing (9453799)")
A --- F("charts (5032027)")
A --- E("ruby (14018648)")
高效遍历群组层级结构有多种潜在用途,尤其是在后台任务中,这些任务需要对群组层级执行查询,此时稳定安全的执行比快速运行更重要。批量迭代需要更多网络往返,但每个批次的性能特征相似。
几个典型场景:
- 对每个子群组执行操作。
- 对层级结构中的每个项目执行操作。
- 对层级结构中的每个议题(issue)执行操作。
问题陈述
群组层级结构可能变得非常庞大,导致单次查询无法及时加载全部数据,查询会因语句超时(statement timeout)而失败。
解决超大群组相关的可扩展性问题需要我们以不同格式(反规范化)存储相同数据。但如果无法加载群组层级结构,反规范化就无法实现。
一种反规范化技术是存储给定群组的所有后代群组 ID。这将加速需要加载群组及其子群组的查询。例如:
flowchart TD
A(1) --- B(2)
A --- C(3)
C --- D(4)
| GROUP_ID | DESCENDANT_GROUP_IDS |
|---|---|
| 1 | [2,3,4] |
| 2 | [] |
| 3 | [4] |
| 4 | [] |
通过此结构,确定所有子群组只需读取数据库中的一行,而非四行。对于包含 1000 个群组的层级结构,这可能带来巨大性能提升。
这种反规范化解决了层级结构读取问题,但我们仍需找到在表中持久化这些数据的方法。由于群组及其层级可能非常大,无法期望单次查询能完成工作。
SELECT id FROM namespaces WHERE traversal_ids && ARRAY[9970]上述查询对大型群组可能超时,因此我们需要分批处理数据。
在树形结构中实现批量逻辑是之前未探索过的领域,实现起来相当复杂。基于 EachBatch 或 find_in_batches 的方案无法工作,因为:
- 数据(群组 ID)在层级结构中未排序。
- 子群组中的群组不知道顶级群组 ID。
算法
批量查询通过递归 CTE SQL 查询实现,每个批次最多读取 N 行。由于树形结构,读取 N 行不一定意味着读取了 N 个群组 ID。如果树结构非最优,一个批次可能返回更少(但绝不会更多)的群组 ID。
该查询实现了深度优先树遍历逻辑,数据库扫描树的第一分支直到叶子节点。我们选择深度优先算法,因为当批次完成时,查询必须返回足够信息供下一批次(游标)使用。在 GitLab 中,我们限制树深度为 20,这意味着在最坏情况下,游标最多包含 19 个元素。
实现广度优先树遍历算法不切实际,因为一个群组可能有无限多的后代,导致游标变得巨大。
- 创建一个包含以下内容的初始行:
- 当前处理的群组 ID(顶级群组 ID)
- 两个数组(树深度和收集的 ID)
- 用于跟踪查询中行读取次数的计数器
- 递归处理该行,根据条件执行以下操作之一:
- 加载第一个子命名空间,如果不在叶子节点则更新当前处理的命名空间 ID(向下遍历分支)
- 如果当前深度还有剩余行,则加载下一个命名空间记录
- 向上跳一个节点,处理更高层级的行
- 继续处理直到读取数量达到
LIMIT(批次大小) - 找到最后处理的行(包含游标数据)和所有收集的记录 ID
WITH RECURSIVE result AS (
(
SELECT
9970 AS current_id, /* 当前处理的命名空间 ID */
ARRAY[9970]::int[] AS depth, /* 游标 */
ARRAY[9970]::int[] AS ids, /* 收集的 ID */
1::bigint AS reads,
'initialize' AS action
) UNION ALL
(
WITH cte AS ( /* 用于多次引用 result CTE 的技巧 */
select * FROM result
)
SELECT * FROM (
(
SELECT /* 向下遍历分支 */
namespaces.id,
cte.depth || namespaces.id,
cte.ids || namespaces.id,
cte.reads + 1,
'walkdown'
FROM namespaces, cte
WHERE
namespaces.parent_id = cte.current_id
ORDER BY namespaces.id ASC
LIMIT 1
) UNION ALL
(
SELECT /* 查找同一层级的下一个元素 */
namespaces.id,
cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id,
cte.ids || namespaces.id,
cte.reads + 1,
'next'
FROM namespaces, cte
WHERE
namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND
namespaces.id > cte.depth[array_length(cte.depth, 1)]
ORDER BY namespaces.id ASC
LIMIT 1
) UNION ALL
(
SELECT /* 完成当前层级时向上跳一个节点 */
cte.current_id,
cte.depth[:array_length(cte.depth, 1) - 1],
cte.ids,
cte.reads + 1,
'jump'
FROM cte
WHERE cte.depth <> ARRAY[]::int[]
LIMIT 1
)
) next_row LIMIT 1
)
)
SELECT current_id, depth, ids, action
FROM result current_id | depth | ids | action
------------+--------------+------------------------+------------
24 | {24} | {24} | initialize
25 | {24,25} | {24,25} | walkdown
26 | {24,26} | {24,25,26} | next
112 | {24,112} | {24,25,26,112} | next
113 | {24,113} | {24,25,26,112,113} | next
114 | {24,113,114} | {24,25,26,112,113,114} | walkdown
114 | {24,113} | {24,25,26,112,113,114} | jump
114 | {24} | {24,25,26,112,113,114} | jump
114 | {} | {24,25,26,112,113,114} | jump使用此查询查找群组层级中的所有命名空间 ID 可能比其他查询方法(如当前基于 traversal_ids 列的 self_and_descendants 实现)更慢。上述查询仅应在实现群组层级的批量迭代时使用。
Ruby 中的基础批量实现:
class NamespaceEachBatch
def initialize(namespace_id:, cursor: nil)
@namespace_id = namespace_id
@cursor = cursor || { current_id: namespace_id, depth: [namespace_id] }
end
def each_batch(of: 500)
current_cursor = cursor.dup
first_iteration = true
loop do
new_cursor, ids = load_batch(cursor: current_cursor, of: of, first_iteration: first_iteration)
first_iteration = false
current_cursor = new_cursor
yield ids
break if new_cursor[:depth].empty?
end
end
private
# 返回命名空间 ID 数组
def load_batch(cursor:, of:, first_iteration: false)
recursive_cte = Gitlab::SQL::RecursiveCTE.new(:result,
union_args: { remove_order: false, remove_duplicates: false })
ids = first_iteration ? namespace_id.to_s : ""
recursive_cte << Namespace.select(
Arel.sql(Integer(cursor.fetch(:current_id)).to_s).as('current_id'),
Arel.sql("ARRAY[#{cursor.fetch(:depth).join(',')}]::int[]").as('depth'),
Arel.sql("ARRAY[#{ids}]::int[]").as('ids'),
Arel.sql("1::bigint AS count")
).from('(VALUES (1)) AS does_not_matter').limit(1)
cte = Gitlab::SQL::CTE.new(:cte, Namespace.select('*').from('result'))
union_query = Namespace.with(cte.to_arel).from_union(
walk_down,
next_elements,
up_one_level,
remove_duplicates: false,
remove_order: false
).select('current_id', 'depth', 'ids', 'count').limit(1)
recursive_cte << union_query
scope = Namespace.with
.recursive(recursive_cte.to_arel)
.from(recursive_cte.alias_to(Namespace.arel_table))
.limit(of)
row = Namespace.from(scope.arel.as('namespaces')).order(count: :desc).limit(1).first
[
{ current_id: row[:current_id], depth: row[:depth] },
row[:ids]
]
end
attr_reader :namespace_id, :cursor
def walk_down
Namespace.select(
Arel.sql('namespaces.id').as('current_id'),
Arel.sql('cte.depth || namespaces.id').as('depth'),
Arel.sql('cte.ids || namespaces.id').as('ids'),
Arel.sql('cte.count + 1').as('count')
).from('cte, LATERAL (SELECT id FROM namespaces WHERE parent_id = cte.current_id ORDER BY id LIMIT 1) namespaces')
end
def next_elements
Namespace.select(
Arel.sql('namespaces.id').as('current_id'),
Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id').as('depth'),
Arel.sql('cte.ids || namespaces.id').as('ids'),
Arel.sql('cte.count + 1').as('count')
).from('cte, LATERAL (SELECT id FROM namespaces WHERE namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND namespaces.id > cte.depth[array_length(cte.depth, 1)] ORDER BY id LIMIT 1) namespaces')
end
def up_one_level
Namespace.select(
Arel.sql('cte.current_id').as('current_id'),
Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1]').as('depth'),
Arel.sql('cte.ids').as('ids'),
Arel.sql('cte.count + 1').as('count')
).from('cte')
.where('cte.depth <> ARRAY[]::int[]')
.limit(1)
end
end
iterator = NamespaceEachBatch.new(namespace_id: 9970)
all_ids = []
iterator.each_batch do |ids|
all_ids.concat(ids)
end
# 测试
puts all_ids.count
puts all_ids.sort == Namespace.where('traversal_ids && ARRAY[9970]').pluck(:id).sort示例批量查询:
SELECT
"namespaces".*
FROM ( WITH RECURSIVE "result" AS ((
SELECT
15847356 AS current_id,
ARRAY[9970,
12061481,
12128714,
12445111,
15847356]::int[] AS depth,
ARRAY[]::int[] AS ids,
1::bigint AS count
FROM (
VALUES (1)) AS does_not_matter
LIMIT 1)
UNION ALL ( WITH "cte" AS MATERIALIZED (
SELECT
*
FROM
result
)
SELECT
current_id,
depth,
ids,
count
FROM ((
SELECT
namespaces.id AS current_id,
cte.depth || namespaces.id AS depth,
cte.ids || namespaces.id AS ids,
cte.count + 1 AS count
FROM
cte,
LATERAL (
SELECT
id
FROM
namespaces
WHERE
parent_id = cte.current_id
ORDER BY
id
LIMIT 1
) namespaces
)
UNION ALL (
SELECT
namespaces.id AS current_id,
cte.depth[:array_length(
cte.depth, 1
) - 1] || namespaces.id AS depth,
cte.ids || namespaces.id AS ids,
cte.count + 1 AS count
FROM
cte,
LATERAL (
SELECT
id
FROM
namespaces
WHERE
namespaces.parent_id = cte.depth[array_length(
cte.depth, 1
) - 1]
AND namespaces.id > cte.depth[array_length(
cte.depth, 1
)]
ORDER BY
id
LIMIT 1
) namespaces
)
UNION ALL (
SELECT
cte.current_id AS current_id,
cte.depth[:array_length(
cte.depth, 1
) - 1] AS depth,
cte.ids AS ids,
cte.count + 1 AS count
FROM
cte
WHERE (
cte.depth <> ARRAY[]::int[]
)
LIMIT 1
)
) namespaces
LIMIT 1
))
SELECT
"namespaces".*
FROM
"result" AS "namespaces"
LIMIT 500) namespaces
ORDER BY
"count" DESC
LIMIT 1执行计划:
Limit (cost=16.36..16.36 rows=1 width=76) (actual time=436.963..436.970 rows=1 loops=1)
Buffers: shared hit=3721 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000
-> Sort (cost=16.36..16.39 rows=11 width=76) (actual time=436.961..436.968 rows=1 loops=1)
Sort Key: namespaces.count DESC
Sort Method: top-N heapsort Memory: 27kB
Buffers: shared hit=3721 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000
-> Limit (cost=15.98..16.20 rows=11 width=76) (actual time=0.005..436.394 rows=500 loops=1)
Buffers: shared hit=3718 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000
CTE result
-> Recursive Union (cost=0.00..15.98 rows=11 width=76) (actual time=0.003..432.924 rows=500 loops=1)
Buffers: shared hit=3718 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000
-> Limit (cost=0.00..0.01 rows=1 width=76) (actual time=0.002..0.003 rows=1 loops=1)
I/O Timings: read=0.000 write=0.000
-> Result (cost=0.00..0.01 rows=1 width=76) (actual time=0.001..0.002 rows=1 loops=1)
I/O Timings: read=0.000 write=0.000
-> Limit (cost=0.76..1.57 rows=1 width=76) (actual time=0.862..0.862 rows=1 loops=499)
Buffers: shared hit=3718 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000
CTE cte
-> WorkTable Scan on result (cost=0.00..0.20 rows=10 width=76) (actual time=0.000..0.000 rows=1 loops=499)
I/O Timings: read=0.000 write=0.000
-> Append (cost=0.56..17.57 rows=21 width=76) (actual time=0.862..0.862 rows=1 loops=499)
Buffers: shared hit=3718 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000
-> Nested Loop (cost=0.56..7.77 rows=10 width=76) (actual time=0.675..0.675 rows=0 loops=499)
Buffers: shared hit=1693 read=357 dirtied=1
I/O Timings: read=327.812 write=0.000
-> CTE Scan on cte (cost=0.00..0.20 rows=10 width=76) (actual time=0.001..0.001 rows=1 loops=499)
I/O Timings: read=0.000 write=0.000
-> Limit (cost=0.56..0.73 rows=1 width=4) (actual time=0.672..0.672 rows=0 loops=499)
Buffers: shared hit=1693 read=357 dirtied=1
I/O Timings: read=327.812 write=0.000
-> Index Only Scan using index_namespaces_on_parent_id_and_id on public.namespaces namespaces_1 (cost=0.56..5.33 rows=29 width=4) (actual time=0.671..0.671 rows=0 loops=499)
Index Cond: (namespaces_1.parent_id = cte.current_id)
Heap Fetches: 7
Buffers: shared hit=1693 read=357 dirtied=1
I/O Timings: read=327.812 write=0.000
-> Nested Loop (cost=0.57..9.45 rows=10 width=76) (actual time=0.208..0.208 rows=1 loops=442)
Buffers: shared hit=2025 read=66 dirtied=7
I/O Timings: read=84.778 write=0.000
-> CTE Scan on cte cte_1 (cost=0.00..0.20 rows=10 width=72) (actual time=0.000..0.000 rows=1 loops=442)
I/O Timings: read=0.000 write=0.000
-> Limit (cost=0.57..0.89 rows=1 width=4) (actual time=0.203..0.203 rows=1 loops=442)
Buffers: shared hit=2025 read=66 dirtied=7
I/O Timings: read=84.778 write=0.000
-> Index Only Scan using index_namespaces_on_parent_id_and_id on public.namespaces namespaces_2 (cost=0.57..3.77 rows=10 width=4) (actual time=0.201..0.201 rows=1 loops=442)
Index Cond: ((namespaces_2.parent_id = (cte_1.depth)[(array_length(cte_1.depth, 1) - 1)]) AND (namespaces_2.id > (cte_1.depth)[array_length(cte_1.depth, 1)]))
Heap Fetches: 35
Buffers: shared hit=2025 read=66 dirtied=6
I/O Timings: read=84.778 write=0.000
-> Limit (cost=0.00..0.03 rows=1 width=76) (actual time=0.003..0.003 rows=1 loops=59)
I/O Timings: read=0.000 write=0.000
-> CTE Scan on cte cte_2 (cost=0.00..0.29 rows=9 width=76) (actual time=0.002..0.002 rows=1 loops=59)
Filter: (cte_2.depth <> '{}'::integer[])
Rows Removed by Filter: 0
I/O Timings: read=0.000 write=0.000
-> CTE Scan on result namespaces (cost=0.00..0.22 rows=11 width=76) (actual time=0.005..436.240 rows=500 loops=1)
Buffers: shared hit=3718 read=423 dirtied=8
I/O Timings: read=412.590 write=0.000