Merge commit 'f877aa9d70
' into kb_migration
This commit is contained in:
commit
32f0e619f0
440 changed files with 6249 additions and 3435 deletions
13
app/workers/bulk_import_worker.rb
Normal file
13
app/workers/bulk_import_worker.rb
Normal file
|
@ -0,0 +1,13 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class BulkImportWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: false
|
||||
|
||||
def perform(import_id)
|
||||
import = BulkImport.find(import_id)
|
||||
import.update!(state: :in_progress)
|
||||
BulkImportService.new.call(import)
|
||||
end
|
||||
end
|
|
@ -6,7 +6,7 @@ class DistributionWorker
|
|||
include Lockable
|
||||
|
||||
def perform(status_id, options = {})
|
||||
with_lock("distribute:#{status_id}") do
|
||||
with_redis_lock("distribute:#{status_id}") do
|
||||
FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# NOTE: This is a deprecated worker, only kept to not break ongoing imports
|
||||
# on upgrade. See `Import::RowWorker` for its replacement.
|
||||
|
||||
class Import::RelationshipWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
|
|
33
app/workers/import/row_worker.rb
Normal file
33
app/workers/import/row_worker.rb
Normal file
|
@ -0,0 +1,33 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Import::RowWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 6, dead: false
|
||||
|
||||
sidekiq_retries_exhausted do |msg, _exception|
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
# Increment the total number of processed items, and bump the state of the import if needed
|
||||
bulk_import_id = BulkImportRow.where(id: msg['args'][0]).pick(:id)
|
||||
BulkImport.progress!(bulk_import_id) unless bulk_import_id.nil?
|
||||
end
|
||||
end
|
||||
|
||||
def perform(row_id)
|
||||
row = BulkImportRow.eager_load(bulk_import: :account).find_by(id: row_id)
|
||||
return true if row.nil?
|
||||
|
||||
imported = BulkImportRowService.new.call(row)
|
||||
|
||||
mark_as_processed!(row, imported)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def mark_as_processed!(row, imported)
|
||||
bulk_import_id = row.bulk_import_id
|
||||
row.destroy! if imported
|
||||
|
||||
BulkImport.progress!(bulk_import_id, imported: imported)
|
||||
end
|
||||
end
|
|
@ -1,5 +1,8 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# NOTE: This is a deprecated worker, only kept to not break ongoing imports
|
||||
# on upgrade. See `ImportWorker` for its replacement.
|
||||
|
||||
class ImportWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
|
|
|
@ -8,9 +8,9 @@ class MoveWorker
|
|||
@target_account = Account.find(target_account_id)
|
||||
|
||||
if @target_account.local? && @source_account.local?
|
||||
nb_moved = rewrite_follows!
|
||||
@source_account.update_count!(:followers_count, -nb_moved)
|
||||
@target_account.update_count!(:followers_count, nb_moved)
|
||||
num_moved = rewrite_follows!
|
||||
@source_account.update_count!(:followers_count, -num_moved)
|
||||
@target_account.update_count!(:followers_count, num_moved)
|
||||
else
|
||||
queue_follow_unfollows!
|
||||
end
|
||||
|
@ -29,12 +29,44 @@ class MoveWorker
|
|||
private
|
||||
|
||||
def rewrite_follows!
|
||||
num_moved = 0
|
||||
|
||||
# First, approve pending follow requests for the new account,
|
||||
# this allows correctly processing list memberships with pending
|
||||
# follow requests
|
||||
FollowRequest.where(account: @source_account.followers, target_account_id: @target_account.id).find_each do |follow_request|
|
||||
ListAccount.where(follow_id: follow_request.id).includes(:list).find_each do |list_account|
|
||||
list_account.list.accounts << @target_account
|
||||
rescue ActiveRecord::RecordInvalid
|
||||
nil
|
||||
end
|
||||
|
||||
follow_request.authorize!
|
||||
end
|
||||
|
||||
# Then handle accounts that follow both the old and new account
|
||||
@source_account.passive_relationships
|
||||
.where(account: Account.local)
|
||||
.where(account: @target_account.followers.local)
|
||||
.in_batches do |follows|
|
||||
ListAccount.where(follow: follows).includes(:list).find_each do |list_account|
|
||||
list_account.list.accounts << @target_account
|
||||
rescue ActiveRecord::RecordInvalid
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
# Finally, handle the common case of accounts not following the new account
|
||||
@source_account.passive_relationships
|
||||
.where(account: Account.local)
|
||||
.where.not(account: @target_account.followers.local)
|
||||
.where.not(account_id: @target_account.id)
|
||||
.in_batches
|
||||
.update_all(target_account_id: @target_account.id)
|
||||
.in_batches do |follows|
|
||||
ListAccount.where(follow: follows).in_batches.update_all(account_id: @target_account.id)
|
||||
num_moved += follows.update_all(target_account_id: @target_account.id)
|
||||
end
|
||||
|
||||
num_moved
|
||||
end
|
||||
|
||||
def queue_follow_unfollows!
|
||||
|
|
|
@ -38,17 +38,37 @@ class Scheduler::AccountsStatusesCleanupScheduler
|
|||
return if under_load?
|
||||
|
||||
budget = compute_budget
|
||||
first_policy_id = last_processed_id
|
||||
|
||||
# If the budget allows it, we want to consider all accounts with enabled
|
||||
# auto cleanup at least once.
|
||||
#
|
||||
# We start from `first_policy_id` (the last processed id in the previous
|
||||
# run) and process each policy until we loop to `first_policy_id`,
|
||||
# recording into `affected_policies` any policy that caused posts to be
|
||||
# deleted.
|
||||
#
|
||||
# After that, we set `full_iteration` to `false` and continue looping on
|
||||
# policies from `affected_policies`.
|
||||
first_policy_id = last_processed_id || 0
|
||||
first_iteration = true
|
||||
full_iteration = true
|
||||
affected_policies = []
|
||||
|
||||
loop do
|
||||
num_processed_accounts = 0
|
||||
|
||||
scope = AccountStatusesCleanupPolicy.where(enabled: true)
|
||||
scope = scope.where(id: first_policy_id...) if first_policy_id.present?
|
||||
scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
|
||||
scope.find_each(order: :asc) do |policy|
|
||||
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
|
||||
num_processed_accounts += 1 unless num_deleted.zero?
|
||||
budget -= num_deleted
|
||||
|
||||
unless num_deleted.zero?
|
||||
num_processed_accounts += 1
|
||||
affected_policies << policy.id if full_iteration
|
||||
end
|
||||
|
||||
full_iteration = false if !first_iteration && policy.id >= first_policy_id
|
||||
|
||||
if budget.zero?
|
||||
save_last_processed_id(policy.id)
|
||||
break
|
||||
|
@ -57,9 +77,10 @@ class Scheduler::AccountsStatusesCleanupScheduler
|
|||
|
||||
# The idea here is to loop through all policies at least once until the budget is exhausted
|
||||
# and start back after the last processed account otherwise
|
||||
break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
|
||||
break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)
|
||||
|
||||
first_policy_id = nil
|
||||
full_iteration = false unless first_iteration
|
||||
first_iteration = false
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -76,12 +97,28 @@ class Scheduler::AccountsStatusesCleanupScheduler
|
|||
|
||||
private
|
||||
|
||||
def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
|
||||
scope = AccountStatusesCleanupPolicy.where(enabled: true)
|
||||
|
||||
if full_iteration
|
||||
# If we are doing a full iteration, examine all policies we have not examined yet
|
||||
if first_iteration
|
||||
scope.where(id: first_policy_id...)
|
||||
else
|
||||
scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
|
||||
end
|
||||
else
|
||||
# Otherwise, examine only policies that previously yielded posts to delete
|
||||
scope.where(id: affected_policies)
|
||||
end
|
||||
end
|
||||
|
||||
def queue_under_load?(name, max_latency)
|
||||
Sidekiq::Queue.new(name).latency > max_latency
|
||||
end
|
||||
|
||||
def last_processed_id
|
||||
redis.get('account_statuses_cleanup_scheduler:last_policy_id')
|
||||
redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
|
||||
end
|
||||
|
||||
def save_last_processed_id(id)
|
||||
|
|
|
@ -23,6 +23,7 @@ class Scheduler::VacuumScheduler
|
|||
backups_vacuum,
|
||||
access_tokens_vacuum,
|
||||
feeds_vacuum,
|
||||
imports_vacuum,
|
||||
]
|
||||
end
|
||||
|
||||
|
@ -50,6 +51,10 @@ class Scheduler::VacuumScheduler
|
|||
Vacuum::FeedsVacuum.new
|
||||
end
|
||||
|
||||
def imports_vacuum
|
||||
Vacuum::ImportsVacuum.new
|
||||
end
|
||||
|
||||
def content_retention_policy
|
||||
ContentRetentionPolicy.current
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue