Help us learn about your current experience with the documentation. Take the survey.
在数据库表中去重数据库记录
本指南介绍了一种为现有数据表引入数据库级别唯一性约束(唯一索引)的策略。
要求:
- 与列相关的属性修改(
INSERT,UPDATE)只能通过 ActiveRecord 进行(该技术依赖于 AR 回调)。 - 重复数据很少见,大多数是由于并发记录创建导致的。可以通过 teleport 检查生产数据库表来验证这一点(请联系数据库管理员寻求帮助)。
总运行时间主要取决于数据库表中的记录数。迁移需要扫描所有记录;为了满足部署后迁移的运行时间限制(约 10 分钟),可以将少于 1000 万行的数据库表视为小表。
小表的去重策略
该策略需要 3 个里程碑。例如,我们将基于 title 列对 issues 表进行去重,其中对于给定的 project_id 列,title 必须是唯一的。
里程碑 1:
- 通过迁移后脚本(post-migration)向表中添加一个新的数据库索引(非唯一)(如果尚不存在)。
- 添加模型级别的唯一性验证,以减少重复数据的可能性(如果尚不存在)。
- 添加一个事务级别的 advisory lock 以防止创建重复记录。
仅第二步本身并不能防止重复记录,更多信息请参阅 Rails guides。
创建索引的迁移后脚本:
def up
add_concurrent_index :issues, [:project_id, :title], name: INDEX_NAME
end
def down
remove_concurrent_index_by_name :issues, INDEX_NAME
endIssue 模型的验证和 advisory lock:
class Issue < ApplicationRecord
validates :title, uniqueness: { scope: :project_id }
before_validation :prevent_concurrent_inserts
private
# 当另一个数据库事务尝试插入相同数据时,此方法将会阻塞。
# 在其他事务释放锁之后,唯一性验证可能会因记录不唯一的验证错误而失败。
# 没有这个代码块,唯一性验证将无法检测到重复记录,因为事务之间看不到彼此的更改。
def prevent_concurrent_inserts
return if project_id.nil? || title.nil?
lock_key = ['issues', project_id, title].join('-')
lock_expression = "hashtext(#{connection.quote(lock_key)})"
connection.execute("SELECT pg_advisory_xact_lock(#{lock_expression})")
end
end里程碑 2:
- 在部署后迁移中实现去重逻辑。
- 将现有索引替换为唯一索引。
如何解决重复问题(例如,合并属性、保留最新记录)取决于构建在数据库表之上的功能。在本例中,我们保留最新的记录。
def up
model = define_batchable_model('issues')
# 对表进行单次遍历
model.each_batch do |batch|
# 查找重复的 (project_id, title) 对
duplicates = model
.where("(project_id, title) IN (#{batch.select(:project_id, :title).to_sql})")
.group(:project_id, :title)
.having('COUNT(*) > 1')
.pluck(:project_id, :title)
value_list = Arel::Nodes::ValuesList.new(duplicates).to_sql
# 通过 (project_id, title) 对定位所有记录,并保留最新的记录。
# 如果重复数据很少,查找速度应该足够快。
cleanup_query = <<~SQL
WITH duplicated_records AS MATERIALIZED (
SELECT
id,
ROW_NUMBER() OVER (PARTITION BY project_id, title ORDER BY project_id, title, id DESC) AS row_number
FROM issues
WHERE (project_id, title) IN (#{value_list})
ORDER BY project_id, title
)
DELETE FROM issues
WHERE id IN (
SELECT id FROM duplicated_records WHERE row_number > 1
)
SQL
model.connection.execute(cleanup_query)
end
end
def down
# no-op
end这是一个破坏性操作,无法回滚。请确保对去重逻辑进行了彻底的测试。
将旧索引替换为唯一索引:
def up
add_concurrent_index :issues, [:project_id, :title], name: UNIQUE_INDEX_NAME, unique: true
remove_concurrent_index_by_name :issues, INDEX_NAME
end
def down
add_concurrent_index :issues, [:project_id, :title], name: INDEX_NAME
remove_concurrent_index_by_name :issues, UNIQUE_INDEX_NAME
end里程碑 3:
- 通过移除
prevent_concurrent_insertsActiveRecord 回调方法来移除 advisory lock。
此里程碑必须在 required stop 之后。
大表的去重策略
在对大表进行去重时,我们可以将批处理和去重逻辑移至 batched background migration。
里程碑 1:
- 通过迁移后脚本(post migration)向表中添加一个新的数据库索引(非唯一)。
- 添加模型级别的唯一性验证,以减少重复数据的可能性(如果尚不存在)。
- 添加一个事务级别的 advisory lock 以防止创建重复记录。
里程碑 2:
- 在批处理后台迁移中实现去重逻辑,并在部署后迁移中将其排队。
里程碑 3:
- 完成批处理后台迁移。
- 将现有索引替换为唯一索引。
- 通过移除
prevent_concurrent_insertsActiveRecord 回调方法来移除 advisory lock。
此里程碑必须在 required stop 之后。