GitLab EventStore
背景
单体 GitLab 项目正在变得越来越大,同时也有更多的领域被定义出来。 因此,这些领域由于时间耦合而变得相互纠缠。
一个典型的例子是 PostReceive worker,
其中很多事情跨越了多个领域。如果新的行为需要响应新提交的推送,
那么我们就会在 PostReceive 或其子组件(例如 Git::ProcessRefChangesService)中的某个地方添加代码。
这种类型的架构:
- 违反了单一职责原则。
- 增加了在不熟悉的代码库中添加代码的风险。你可能不知道其中的细微差别, 这可能会引入 bug 或性能下降。
- 违反了领域边界。在特定的命名空间内(例如
Git::),我们突然看到 来自其他领域的类(如Ci::或MergeRequests::)。
什么是 EventStore?
Gitlab:EventStore 是一个基于现有 Sidekiq worker 和可观测性构建的基础 pub-sub 系统。
我们使用这个系统在建模领域时采用事件驱动的方法,同时保持最小耦合。
这基本上保留了现有的 Sidekiq worker 以执行异步工作,但反转了依赖关系。
EventStore 示例
当创建 CI pipeline 时,我们会更新任何与 pipeline ref 匹配的 merge request 的 head pipeline。
然后 merge request 可以显示最新 pipeline 的状态。
不使用 EventStore
我们修改 Ci::CreatePipelineService 并添加逻辑(如 if 语句)来检查 pipeline 是否被创建。
然后我们安排一个 worker 来为 MergeRequests:: 领域运行一些副作用。
这种风格违反了开闭原则, 并且不必要地添加了来自其他领域的副作用逻辑,增加了耦合:
graph LR
subgraph ci[CI]
cp[CreatePipelineService]
end
subgraph mr[MergeRequests]
upw[UpdateHeadPipelineWorker]
end
subgraph no[Namespaces::Onboarding]
pow[PipelinesOnboardedWorker]
end
cp -- perform_async --> upw
cp -- perform_async --> pow
使用 EventStore
Ci::CreatePipelineService 发布一个事件 Ci::PipelineCreatedEvent,其责任到此为止。
MergeRequests:: 领域可以使用一个 worker MergeRequests::UpdateHeadPipelineWorker 来订阅这个事件,所以:
- 副作用被异步调度,不会影响发出领域事件的主业务事务。
- 可以在不修改主业务事务的情况下添加更多副作用。
- 我们可以清楚地看到涉及的领域及其所有权。
- 我们可以识别系统中发生的事件,因为它们被明确声明。
使用 Gitlab::EventStore 时,订阅者(Sidekiq worker)和领域事件的架构之间仍然存在耦合。
这种耦合程度远小于将主事务(Ci::CreatePipelineService)耦合到:
- 多个订阅者。
- 多种调用订阅者的方式(包括条件调用)。
- 多种传递参数的方式。
graph LR
subgraph ci[CI]
cp[CreatePipelineService]
cp -- publish --> e[PipelineCreateEvent]
end
subgraph mr[MergeRequests]
upw[UpdateHeadPipelineWorker]
end
subgraph no[Namespaces::Onboarding]
pow[PipelinesOnboardedWorker]
end
upw -. subscribe .-> e
pow -. subscribe .-> e
每个订阅者本身就是一个 Sidekiq worker,可以指定与其负责的工作类型相关的任何属性。
例如,一个订阅者可以定义 urgency: high,而另一个不太重要的订阅者可以设置 urgency: low。
EventStore 只是一个抽象层,允许我们实现依赖倒置。这有助于 将业务事务与副作用(通常在其他领域执行)分离。
当事件被发布时,EventStore 在每个订阅的 worker 上调用 perform_async,
将事件信息作为参数传递。这基本上在每个订阅者的队列上调度了一个 Sidekiq job。
这意味着订阅者的工作方式没有其他变化,因为它们只是 Sidekiq worker。 例如:如果某个 worker(订阅者)执行 job 失败,该 job 会被放回 Sidekiq 重试。
EventStore 优势
- 如果副作用很重要,可以通过更改 worker 权重来设置订阅者(Sidekiq worker)运行得更快。
- 自动强制副作用异步运行。 这使得其他领域可以安全地订阅事件,而不会影响主业务事务的性能。
EventStore 劣势
EventStore基于 Sidekiq 构建。 虽然 Sidekiq worker 支持重试和指数退避, 但有时 worker 会超过重试限制,导致 Sidekiq job 丢失。 此外,作为事件处理和灾难恢复的一部分,Sidekiq job 可能会被丢弃。 尽管许多重要的 GitLab 功能依赖于 Sidekiq 的持久性假设,但对于某些关键的数据完整性功能来说,这可能不可接受。 如果你需要确保工作最终完成,你可能需要在 Postgres 中实现一个队列机制, 其中 job 由 Sidekiq cron worker 拾取。你可以在::LooseForeignKeys::CleanupWorker和::BatchedGitRefUpdates::ProjectCleanupWorker中看到这种方法的示例。 通常会创建一个分区表,然后插入数据,稍后由 cron worker 处理, 并在完成某些工作后在数据库中标记为processed。 还有在 Redis 中实现可靠队列的策略,如在::Elastic::ProcessBookkeepingService中使用的策略。 如果你在代码库中引入新的队列模式,建议尽早寻求维护者的建议。- 或者,如果逻辑需要作为主业务事务的一部分处理,而不是副作用,
考虑不使用
EventStore。 - Sidekiq worker 默认没有限制,但如果存在耗尽共享资源的风险, 你应该考虑配置并发限制。
定义事件
Event 对象表示在限界上下文中发生的领域事件。
生产者可以通过发布事件来通知其他限界上下文发生了某些事情,以便它们能够对此做出反应。
事件应该命名为 <domain_object><action>Event,其中 action 使用过去时态,
例如 ReviewerAddedEvent 而不是 AddReviewerEvent。
当限界上下文很明显时,可以省略 domain_object,
例如 MergeRequest::ApprovedEvent 而不是 MergeRequest::MergeRequestApprovedEvent。
良好事件的指导原则
事件就像 API 或 UI 一样是公共接口。与你的产品和设计团队协作, 确保新事件能够满足订阅者的需求。尽可能让新事件遵循以下原则:
- 语义化:事件应该描述限界上下文中发生的事情,而不是订阅者的预期操作。
- 具体化:事件应该被狭义定义,而不必过于精确。这最小化了 订阅者必须执行的事件过滤量,以及它们需要订阅的唯一事件数量。 考虑使用属性来传达附加信息。
- 作用域化:事件应该限定在其限界上下文中。避免发布关于不被你的限界上下文包含的领域对象的事件。
示例
| 原则 | 良好 | 不良 |
|---|---|---|
| 语义化 | MergeRequest::ApprovedEvent |
MergeRequest::NotifyAuthorEvent |
| 具体化 | MergeRequest::ReviewerAddedEvent |
• MergeRequest::ChangedEvent • MergeRequest::CodeownerAddedAsReviewerEvent |
| 作用域化 | MergeRequest::CreatedEvent |
Project::MergeRequestCreatedEvent |
创建事件架构
在 app/events/<namespace>/ 下定义新的事件类,名称应代表过去发生的事情:
class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
def schema
{
'type' => 'object',
'required' => ['pipeline_id'],
'properties' => {
'pipeline_id' => { 'type' => 'integer' },
'ref' => { 'type' => 'string' }
}
}
end
end架构必须是有效的JSON schema,
由 JSONSchemer gem 进行验证。
验证在初始化事件对象时立即进行,以确保发布者遵循与订阅者的约定。
你应该尽可能使用可选属性,这样架构变更需要的发布更少。
但是,required 属性可用于事件主题的唯一标识符。例如:
pipeline_id可以是Ci::PipelineCreatedEvent的必需属性。project_id可以是Projects::ProjectDeletedEvent的必需属性。
只发布订阅者需要的属性,不要针对特定订阅者调整负载。 负载应该完整地表示事件,不应包含松散相关的属性。例如:
Ci::PipelineCreatedEvent.new(data: {
pipeline_id: pipeline.id,
# 除非所有订阅者都需要 merge request ID,
# 否则这些数据可以由订阅者获取。
merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})发布更多属性的事件可以为订阅者提供他们最初需要的数据。 否则订阅者必须从数据库中获取额外的数据。 然而,这可能导致架构的持续变化,并可能添加不代表单一事实来源的属性。 最好将此技术用作性能优化。例如:当一个事件有许多订阅者, 它们都从数据库中重新获取相同的数据时。
更新事件
架构或事件名称的变更需要多次发布。当新版本正在部署时:
- 现有的发布者可以使用旧版本发布事件。
- 现有的订阅者可以使用旧版本消费事件。
- 事件作为 job 参数持久化在 Sidekiq 队列中,因此在部署期间我们可能有 2 个版本的架构。
由于架构变更最终影响 Sidekiq 参数,请参考我们的 Sidekiq 风格指南, 了解多次发布的注意事项。
重命名事件
- 发布 1:引入新事件并准备订阅者。
- 使用新名称引入事件的副本(你可以让旧事件继承新事件)。
- 如果订阅者 worker 知道事件名称,确保它们也能处理新事件。
- 发布 2:将新事件路由到订阅者。
- 更改发布者使用新事件。
- 将所有使用旧事件的订阅更改为使用新事件。
- 删除旧事件类。
添加属性
- 发布 1:
- 将新属性添加为可选(不是
required)。 - 更新订阅者,使其可以消费有和无新属性的事件。
- 将新属性添加为可选(不是
- 发布 2:
- 更改发布者提供新属性
- 发布 3:(如果属性应该是
required):- 更改架构和订阅者代码,使其始终期望该属性。
删除属性
- 发布 1:
- 如果属性是
required,将其设为可选。 - 更新订阅者,使其不总是期望该属性。
- 如果属性是
- 发布 2:
- 从事件发布中删除该属性。
- 从订阅者中删除处理该属性的代码。
其他变更
对于其他变更,如重命名属性,使用相同的步骤:
- 删除旧属性
- 添加新属性
发布事件
从前面的示例发布事件:
Gitlab::EventStore.publish(
Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id })
)事件应该尽可能从相关的 Service 类中分发。存在一些例外,
我们可能允许模型发布事件,如在状态机转换中。
例如,我们可以发布 Ci::BuildFinishedEvent 并让其他领域异步响应,
而不是调度运行一系列副作用的 Ci::BuildFinishedWorker。
ActiveRecord 回调对于表示领域事件来说太底层了。它们更多地表示
数据库记录的更改。可能有合理的情况,但我们应该将这些视为例外。
创建订阅者
订阅者是一个包含 Gitlab::EventStore::Subscriber 模块的 Sidekiq worker。
该模块负责 perform 方法,并通过 handle_event 方法提供了更好的抽象来安全地处理事件。例如:
module MergeRequests
class UpdateHeadPipelineWorker
include Gitlab::EventStore::Subscriber
def handle_event(event)
Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
# ...
end
end
end
end将订阅者注册到事件
要在 lib/gitlab/event_store.rb 中将 worker 订阅到特定事件,
在 Gitlab::EventStore.configure! 方法中添加如下一行:
为了确保与金丝雀部署的兼容性, 在注册订阅时,Sidekiq worker 必须在之前的部署中引入,或者我们必须使用功能标志。
module Gitlab
module EventStore
def self.configure!(store)
# ...
store.subscribe ::Sbom::ProcessTransferEventsWorker, to: ::Projects::ProjectTransferedEvent,
if: ->(event) do
actor = ::Project.actor_from_id(event.data[:project_id])
Feature.enabled?(:sync_project_archival_status_to_sbom_occurrences, actor)
end
# ...
end
end
end仅在 EE 代码库中定义的 worker 可以通过在 ee/lib/ee/gitlab/event_store.rb 中声明订阅
以相同方式订阅事件。
订阅在 Rails 应用加载时存储在内存中并立即冻结。 无法在运行时修改订阅。
条件分发事件
订阅可以指定接受事件的条件:
store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
to: ::Ci::PipelineCreatedEvent,
if: -> (event) { event.data[:merge_request_id].present? }这告诉事件存储器,如果条件满足,将 Ci::PipelineCreatedEvent 分发给订阅者。
这种技术可以避免在订阅者只对事件的一小部分感兴趣时调度 Sidekiq job。
使用条件分发时,它必须只包含廉价的条件,因为它们在每次发布给定事件时都会同步执行。
对于复杂条件,最好订阅所有事件,然后在订阅者 worker 的 handle_event 方法中处理逻辑。
延迟分发事件
订阅可以指定接收事件的延迟:
store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
to: ::Ci::PipelineCreatedEvent,
delay: 1.minutedelay 参数将事件的分发切换为使用订阅者 Sidekiq worker 的 perform_in 方法,
而不是 perform_async。
这种技术在发布大量事件时很有用,可以利用 Sidekiq 的去重功能。
发布事件组
在某些场景中,我们在单个业务事务中发布多个同类型事件。
这通过为每个事件调用 job 给 Sidekiq 带来了额外负载。
在这种情况下,我们可以通过调用 Gitlab::EventStore.publish_group 来发布事件组。
该方法接受一个相似类型的事件数组。默认情况下,订阅者 worker 接收最多 10 个事件的一组,
但可以通过在创建订阅时定义 group_size 参数来配置。
发布的事件数量根据配置的 group_size 分批分发到订阅者。
如果组数超过 100,我们会在每个组上调度 10 秒的延迟,以减少对 Sidekiq 的负载。
store.subscribe ::Security::RefreshProjectPoliciesWorker,
to: ::ProjectAuthorizations::AuthorizationsChangedEvent,
delay: 1.minute,
group_size: 25订阅者 worker 中的 handle_event 方法会被组中的每个事件调用。
移除订阅者
由于 Gitlab::EventStore 由 Sidekiq 支持,我们遵循相同的指南来
移除 Sidekiq worker,从以下步骤开始:
- 移除订阅以移除任何将 job 入队的代码
- 使订阅者 worker 变为空操作。为此,我们需要从 worker 中移除
Gitlab::EventStore::Subscriber模块。
测试
测试发布者
发布者的责任是确保事件被正确发布。
为了测试事件是否被正确发布,我们可以使用 RSpec 匹配器 :publish_event:
it 'publishes a ProjectDeleted event with project id and namespace id' do
expected_data = { project_id: project.id, namespace_id: project.namespace_id }
# 匹配器验证当块被调用时,块发布预期的事件和数据。
expect { destroy_project(project, user, {}) }
.to publish_event(Projects::ProjectDeletedEvent)
.with(expected_data)
end也可以在 :publish_event 匹配器内组合匹配器。
当我们想要断言事件是用某种类型的值创建的,但我们事先不知道该值时,这很有用。
一个例子是在创建新记录后发布事件。
it 'publishes a ProjectCreatedEvent with project id and namespace id' do
# project ID 只会在 `create_project` 在 expect 块中调用时生成。
expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }
expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
.to publish_event(Projects::ProjectCreatedEvent)
.with(expected_data)
end当你发布多个事件时,也可以检查未发布的事件。
it 'publishes a ProjectCreatedEvent with project id and namespace id' do
# project ID 在 `create_project` 在 `expect` 块中调用时生成。
expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }
expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
.to publish_event(Projects::ProjectCreatedEvent)
.with(expected_data)
.and not_publish_event(Projects::ProjectDeletedEvent)
end测试订阅者
订阅者必须确保发布的事件能够被正确消费。为此 我们添加了 helper 和共享示例来标准化测试订阅者的方式:
RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
let(:pipeline_created_event) { Ci::PipelineCreatedEvent.new(data: ({ pipeline_id: pipeline.id })) }
# 这个共享示例确保事件被发布并由当前订阅者(`described_class`)正确处理。
# 它还确保 worker 是幂等的。
it_behaves_like 'subscribes to event' do
let(:event) { pipeline_created_event }
end
# 这个共享示例确保发布的事件被忽略。这对于
# 条件分发测试可能很有用。
it_behaves_like 'ignores the published event' do
let(:event) { pipeline_created_event }
end
it 'does something' do
# 这个 helper 直接执行 `perform` 确保 `handle_event` 被正确调用。
consume_event(subscriber: described_class, event: pipeline_created_event)
# 运行断言
end
end最佳实践
- 保持 CE & EE 分离和兼容性:
- 在事件始终发生的相同代码中定义事件类并发布事件(CE 或 EE)。
- 如果事件是由 CE 功能引起的,事件类必须在 CE 中定义和发布。 同样,如果事件是由 EE 功能引起的,事件类必须在 EE 中定义和发布。
- 在依赖功能存在的相同代码中定义依赖事件的订阅者(CE 或 EE)。
- 你可以有在 CE 中发布的事件(例如
Projects::ProjectCreatedEvent)和 在 EE 中定义的依赖于此事件的订阅者(例如Security::SyncSecurityPolicyWorker)。
- 你可以有在 CE 中发布的事件(例如
- 在事件始终发生的相同代码中定义事件类并发布事件(CE 或 EE)。
- 在同一个限界上下文(顶级 Ruby 命名空间)中定义事件类并发布事件。
- 给定的限界上下文应该只发布与其自身上下文相关的事件。
- 评估订阅事件时的信号/噪声比。你在订阅者中处理了多少事件 vs 忽略了多少? 如果你只对事件的一小部分感兴趣,考虑使用条件分发。 在执行同步检查与条件分发或调度可能冗余的 worker 之间取得平衡。