Help us learn about your current experience with the documentation. Take the survey.

GitLab EventStore

背景

单体 GitLab 项目正在变得越来越大,同时也有更多的领域被定义出来。 因此,这些领域由于时间耦合而变得相互纠缠。

一个典型的例子是 PostReceive worker, 其中很多事情跨越了多个领域。如果新的行为需要响应新提交的推送, 那么我们就会在 PostReceive 或其子组件(例如 Git::ProcessRefChangesService)中的某个地方添加代码。

这种类型的架构:

  • 违反了单一职责原则。
  • 增加了在不熟悉的代码库中添加代码的风险。你可能不知道其中的细微差别, 这可能会引入 bug 或性能下降。
  • 违反了领域边界。在特定的命名空间内(例如 Git::),我们突然看到 来自其他领域的类(如 Ci::MergeRequests::)。

什么是 EventStore?

Gitlab:EventStore 是一个基于现有 Sidekiq worker 和可观测性构建的基础 pub-sub 系统。 我们使用这个系统在建模领域时采用事件驱动的方法,同时保持最小耦合。

这基本上保留了现有的 Sidekiq worker 以执行异步工作,但反转了依赖关系。

EventStore 示例

当创建 CI pipeline 时,我们会更新任何与 pipeline ref 匹配的 merge request 的 head pipeline。 然后 merge request 可以显示最新 pipeline 的状态。

不使用 EventStore

我们修改 Ci::CreatePipelineService 并添加逻辑(如 if 语句)来检查 pipeline 是否被创建。 然后我们安排一个 worker 来为 MergeRequests:: 领域运行一些副作用。

这种风格违反了开闭原则, 并且不必要地添加了来自其他领域的副作用逻辑,增加了耦合:

graph LR
  subgraph ci[CI]
    cp[CreatePipelineService]
  end

  subgraph mr[MergeRequests]
    upw[UpdateHeadPipelineWorker]
  end

  subgraph no[Namespaces::Onboarding]
    pow[PipelinesOnboardedWorker]
  end

  cp -- perform_async --> upw
  cp -- perform_async --> pow

使用 EventStore

Ci::CreatePipelineService 发布一个事件 Ci::PipelineCreatedEvent,其责任到此为止。

MergeRequests:: 领域可以使用一个 worker MergeRequests::UpdateHeadPipelineWorker 来订阅这个事件,所以:

  • 副作用被异步调度,不会影响发出领域事件的主业务事务。
  • 可以在不修改主业务事务的情况下添加更多副作用。
  • 我们可以清楚地看到涉及的领域及其所有权。
  • 我们可以识别系统中发生的事件,因为它们被明确声明。

使用 Gitlab::EventStore 时,订阅者(Sidekiq worker)和领域事件的架构之间仍然存在耦合。 这种耦合程度远小于将主事务(Ci::CreatePipelineService)耦合到:

  • 多个订阅者。
  • 多种调用订阅者的方式(包括条件调用)。
  • 多种传递参数的方式。
graph LR
  subgraph ci[CI]
    cp[CreatePipelineService]
    cp -- publish --> e[PipelineCreateEvent]
  end

  subgraph mr[MergeRequests]
    upw[UpdateHeadPipelineWorker]
  end

  subgraph no[Namespaces::Onboarding]
    pow[PipelinesOnboardedWorker]
  end

  upw -. subscribe .-> e
  pow -. subscribe .-> e

每个订阅者本身就是一个 Sidekiq worker,可以指定与其负责的工作类型相关的任何属性。 例如,一个订阅者可以定义 urgency: high,而另一个不太重要的订阅者可以设置 urgency: low

EventStore 只是一个抽象层,允许我们实现依赖倒置。这有助于 将业务事务与副作用(通常在其他领域执行)分离。

当事件被发布时,EventStore 在每个订阅的 worker 上调用 perform_async, 将事件信息作为参数传递。这基本上在每个订阅者的队列上调度了一个 Sidekiq job。

这意味着订阅者的工作方式没有其他变化,因为它们只是 Sidekiq worker。 例如:如果某个 worker(订阅者)执行 job 失败,该 job 会被放回 Sidekiq 重试。

EventStore 优势

  • 如果副作用很重要,可以通过更改 worker 权重来设置订阅者(Sidekiq worker)运行得更快。
  • 自动强制副作用异步运行。 这使得其他领域可以安全地订阅事件,而不会影响主业务事务的性能。

EventStore 劣势

  • EventStore 基于 Sidekiq 构建。 虽然 Sidekiq worker 支持重试和指数退避, 但有时 worker 会超过重试限制,导致 Sidekiq job 丢失。 此外,作为事件处理和灾难恢复的一部分,Sidekiq job 可能会被丢弃。 尽管许多重要的 GitLab 功能依赖于 Sidekiq 的持久性假设,但对于某些关键的数据完整性功能来说,这可能不可接受。 如果你需要确保工作最终完成,你可能需要在 Postgres 中实现一个队列机制, 其中 job 由 Sidekiq cron worker 拾取。你可以在 ::LooseForeignKeys::CleanupWorker::BatchedGitRefUpdates::ProjectCleanupWorker 中看到这种方法的示例。 通常会创建一个分区表,然后插入数据,稍后由 cron worker 处理, 并在完成某些工作后在数据库中标记为 processed。 还有在 Redis 中实现可靠队列的策略,如在 ::Elastic::ProcessBookkeepingService 中使用的策略。 如果你在代码库中引入新的队列模式,建议尽早寻求维护者的建议。
  • 或者,如果逻辑需要作为主业务事务的一部分处理,而不是副作用, 考虑不使用 EventStore
  • Sidekiq worker 默认没有限制,但如果存在耗尽共享资源的风险, 你应该考虑配置并发限制

定义事件

Event 对象表示在限界上下文中发生的领域事件。 生产者可以通过发布事件来通知其他限界上下文发生了某些事情,以便它们能够对此做出反应。 事件应该命名为 <domain_object><action>Event,其中 action 使用过去时态, 例如 ReviewerAddedEvent 而不是 AddReviewerEvent。 当限界上下文很明显时,可以省略 domain_object, 例如 MergeRequest::ApprovedEvent 而不是 MergeRequest::MergeRequestApprovedEvent

良好事件的指导原则

事件就像 API 或 UI 一样是公共接口。与你的产品和设计团队协作, 确保新事件能够满足订阅者的需求。尽可能让新事件遵循以下原则:

  • 语义化:事件应该描述限界上下文中发生的事情,而不是订阅者的预期操作。
  • 具体化:事件应该被狭义定义,而不必过于精确。这最小化了 订阅者必须执行的事件过滤量,以及它们需要订阅的唯一事件数量。 考虑使用属性来传达附加信息。
  • 作用域化:事件应该限定在其限界上下文中。避免发布关于不被你的限界上下文包含的领域对象的事件。

示例

原则 良好 不良
语义化 MergeRequest::ApprovedEvent MergeRequest::NotifyAuthorEvent
具体化 MergeRequest::ReviewerAddedEvent • MergeRequest::ChangedEvent
• MergeRequest::CodeownerAddedAsReviewerEvent
作用域化 MergeRequest::CreatedEvent Project::MergeRequestCreatedEvent

创建事件架构

app/events/<namespace>/ 下定义新的事件类,名称应代表过去发生的事情:

class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
  def schema
    {
      'type' => 'object',
      'required' => ['pipeline_id'],
      'properties' => {
        'pipeline_id' => { 'type' => 'integer' },
        'ref' => { 'type' => 'string' }
      }
    }
  end
end

架构必须是有效的JSON schema, 由 JSONSchemer gem 进行验证。 验证在初始化事件对象时立即进行,以确保发布者遵循与订阅者的约定。

你应该尽可能使用可选属性,这样架构变更需要的发布更少。 但是,required 属性可用于事件主题的唯一标识符。例如:

  • pipeline_id 可以是 Ci::PipelineCreatedEvent 的必需属性。
  • project_id 可以是 Projects::ProjectDeletedEvent 的必需属性。

只发布订阅者需要的属性,不要针对特定订阅者调整负载。 负载应该完整地表示事件,不应包含松散相关的属性。例如:

Ci::PipelineCreatedEvent.new(data: {
  pipeline_id: pipeline.id,
  # 除非所有订阅者都需要 merge request ID,
  # 否则这些数据可以由订阅者获取。
  merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})

发布更多属性的事件可以为订阅者提供他们最初需要的数据。 否则订阅者必须从数据库中获取额外的数据。 然而,这可能导致架构的持续变化,并可能添加不代表单一事实来源的属性。 最好将此技术用作性能优化。例如:当一个事件有许多订阅者, 它们都从数据库中重新获取相同的数据时。

更新事件

架构或事件名称的变更需要多次发布。当新版本正在部署时:

  • 现有的发布者可以使用旧版本发布事件。
  • 现有的订阅者可以使用旧版本消费事件。
  • 事件作为 job 参数持久化在 Sidekiq 队列中,因此在部署期间我们可能有 2 个版本的架构。

由于架构变更最终影响 Sidekiq 参数,请参考我们的 Sidekiq 风格指南, 了解多次发布的注意事项。

重命名事件

  1. 发布 1:引入新事件并准备订阅者。
    • 使用新名称引入事件的副本(你可以让旧事件继承新事件)。
    • 如果订阅者 worker 知道事件名称,确保它们也能处理新事件。
  2. 发布 2:将新事件路由到订阅者。
    • 更改发布者使用新事件。
    • 将所有使用旧事件的订阅更改为使用新事件。
    • 删除旧事件类。

添加属性

  1. 发布 1:
    • 将新属性添加为可选(不是 required)。
    • 更新订阅者,使其可以消费有和无新属性的事件。
  2. 发布 2:
    • 更改发布者提供新属性
  3. 发布 3:(如果属性应该是 required):
    • 更改架构和订阅者代码,使其始终期望该属性。

删除属性

  1. 发布 1:
    • 如果属性是 required,将其设为可选。
    • 更新订阅者,使其不总是期望该属性。
  2. 发布 2:
    • 从事件发布中删除该属性。
    • 从订阅者中删除处理该属性的代码。

其他变更

对于其他变更,如重命名属性,使用相同的步骤:

  1. 删除旧属性
  2. 添加新属性

发布事件

前面的示例发布事件:

Gitlab::EventStore.publish(
  Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id })
)

事件应该尽可能从相关的 Service 类中分发。存在一些例外, 我们可能允许模型发布事件,如在状态机转换中。 例如,我们可以发布 Ci::BuildFinishedEvent 并让其他领域异步响应, 而不是调度运行一系列副作用的 Ci::BuildFinishedWorker

ActiveRecord 回调对于表示领域事件来说太底层了。它们更多地表示 数据库记录的更改。可能有合理的情况,但我们应该将这些视为例外。

创建订阅者

订阅者是一个包含 Gitlab::EventStore::Subscriber 模块的 Sidekiq worker。 该模块负责 perform 方法,并通过 handle_event 方法提供了更好的抽象来安全地处理事件。例如:

module MergeRequests
  class UpdateHeadPipelineWorker
    include Gitlab::EventStore::Subscriber

    def handle_event(event)
      Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
        # ...
      end
    end
  end
end

将订阅者注册到事件

要在 lib/gitlab/event_store.rb 中将 worker 订阅到特定事件, 在 Gitlab::EventStore.configure! 方法中添加如下一行:

为了确保与金丝雀部署的兼容性, 在注册订阅时,Sidekiq worker 必须在之前的部署中引入,或者我们必须使用功能标志。

module Gitlab
  module EventStore
    def self.configure!(store)
      # ...

      store.subscribe ::Sbom::ProcessTransferEventsWorker, to: ::Projects::ProjectTransferedEvent,
        if: ->(event) do
          actor = ::Project.actor_from_id(event.data[:project_id])
          Feature.enabled?(:sync_project_archival_status_to_sbom_occurrences, actor)
        end

      # ...
    end
  end
end

仅在 EE 代码库中定义的 worker 可以通过在 ee/lib/ee/gitlab/event_store.rb 中声明订阅 以相同方式订阅事件。

订阅在 Rails 应用加载时存储在内存中并立即冻结。 无法在运行时修改订阅。

条件分发事件

订阅可以指定接受事件的条件:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  if: -> (event) { event.data[:merge_request_id].present? }

这告诉事件存储器,如果条件满足,将 Ci::PipelineCreatedEvent 分发给订阅者。

这种技术可以避免在订阅者只对事件的一小部分感兴趣时调度 Sidekiq job。

使用条件分发时,它必须只包含廉价的条件,因为它们在每次发布给定事件时都会同步执行。

对于复杂条件,最好订阅所有事件,然后在订阅者 worker 的 handle_event 方法中处理逻辑。

延迟分发事件

订阅可以指定接收事件的延迟:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  delay: 1.minute

delay 参数将事件的分发切换为使用订阅者 Sidekiq worker 的 perform_in 方法, 而不是 perform_async

这种技术在发布大量事件时很有用,可以利用 Sidekiq 的去重功能。

发布事件组

在某些场景中,我们在单个业务事务中发布多个同类型事件。 这通过为每个事件调用 job 给 Sidekiq 带来了额外负载。 在这种情况下,我们可以通过调用 Gitlab::EventStore.publish_group 来发布事件组。 该方法接受一个相似类型的事件数组。默认情况下,订阅者 worker 接收最多 10 个事件的一组, 但可以通过在创建订阅时定义 group_size 参数来配置。 发布的事件数量根据配置的 group_size 分批分发到订阅者。 如果组数超过 100,我们会在每个组上调度 10 秒的延迟,以减少对 Sidekiq 的负载。

store.subscribe ::Security::RefreshProjectPoliciesWorker,
  to: ::ProjectAuthorizations::AuthorizationsChangedEvent,
  delay: 1.minute,
  group_size: 25

订阅者 worker 中的 handle_event 方法会被组中的每个事件调用。

移除订阅者

由于 Gitlab::EventStore 由 Sidekiq 支持,我们遵循相同的指南来 移除 Sidekiq worker,从以下步骤开始:

  • 移除订阅以移除任何将 job 入队的代码
  • 使订阅者 worker 变为空操作。为此,我们需要从 worker 中移除 Gitlab::EventStore::Subscriber 模块。

测试

测试发布者

发布者的责任是确保事件被正确发布。

为了测试事件是否被正确发布,我们可以使用 RSpec 匹配器 :publish_event

it 'publishes a ProjectDeleted event with project id and namespace id' do
  expected_data = { project_id: project.id, namespace_id: project.namespace_id }

  # 匹配器验证当块被调用时,块发布预期的事件和数据。
  expect { destroy_project(project, user, {}) }
    .to publish_event(Projects::ProjectDeletedEvent)
    .with(expected_data)
end

也可以在 :publish_event 匹配器内组合匹配器。 当我们想要断言事件是用某种类型的值创建的,但我们事先不知道该值时,这很有用。 一个例子是在创建新记录后发布事件。

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # project ID 只会在 `create_project` 在 expect 块中调用时生成。
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
end

当你发布多个事件时,也可以检查未发布的事件。

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # project ID 在 `create_project` 在 `expect` 块中调用时生成。
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
    .and not_publish_event(Projects::ProjectDeletedEvent)
end

测试订阅者

订阅者必须确保发布的事件能够被正确消费。为此 我们添加了 helper 和共享示例来标准化测试订阅者的方式:

RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
  let(:pipeline_created_event) { Ci::PipelineCreatedEvent.new(data: ({ pipeline_id: pipeline.id })) }

  # 这个共享示例确保事件被发布并由当前订阅者(`described_class`)正确处理。
  # 它还确保 worker 是幂等的。
  it_behaves_like 'subscribes to event' do
    let(:event) { pipeline_created_event }
  end

  # 这个共享示例确保发布的事件被忽略。这对于
  # 条件分发测试可能很有用。
  it_behaves_like 'ignores the published event' do
    let(:event) { pipeline_created_event }
  end

  it 'does something' do
    # 这个 helper 直接执行 `perform` 确保 `handle_event` 被正确调用。
    consume_event(subscriber: described_class, event: pipeline_created_event)

    # 运行断言
  end
end

最佳实践

  • 保持 CE & EE 分离和兼容性
    • 在事件始终发生的相同代码中定义事件类并发布事件(CE 或 EE)。
      • 如果事件是由 CE 功能引起的,事件类必须在 CE 中定义和发布。 同样,如果事件是由 EE 功能引起的,事件类必须在 EE 中定义和发布。
    • 在依赖功能存在的相同代码中定义依赖事件的订阅者(CE 或 EE)。
      • 你可以有在 CE 中发布的事件(例如 Projects::ProjectCreatedEvent)和 在 EE 中定义的依赖于此事件的订阅者(例如 Security::SyncSecurityPolicyWorker)。
  • 在同一个限界上下文(顶级 Ruby 命名空间)中定义事件类并发布事件。
    • 给定的限界上下文应该只发布与其自身上下文相关的事件。
  • 评估订阅事件时的信号/噪声比。你在订阅者中处理了多少事件 vs 忽略了多少? 如果你只对事件的一小部分感兴趣,考虑使用条件分发。 在执行同步检查与条件分发或调度可能冗余的 worker 之间取得平衡。