Merge commit '9c2d5b534f' into upstream-20250314

This commit is contained in:
KMY 2025-03-14 08:35:27 +09:00
commit 6548462ecb
84 changed files with 1719 additions and 418 deletions

View file

@ -0,0 +1,68 @@
# frozen_string_literal: true
# Fetch all replies to a status, querying recursively through
# ActivityPub replies collections, fetching any statuses that
# we either don't already have or we haven't checked for new replies
# in the Status::FETCH_REPLIES_COOLDOWN_MINUTES interval
class ActivityPub::FetchAllRepliesWorker
include Sidekiq::Worker
include ExponentialBackoff
include JsonLdHelper
sidekiq_options queue: 'pull', retry: 3
# Global max replies to fetch per request (all replies, recursively)
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i
MAX_PAGES = (ENV['FETCH_REPLIES_MAX_PAGES'] || 500).to_i
def perform(root_status_id, options = {})
@root_status = Status.remote.find_by(id: root_status_id)
return unless @root_status&.should_fetch_replies?
@root_status.touch(:fetched_replies_at)
Rails.logger.debug { "FetchAllRepliesWorker - #{@root_status.uri}: Fetching all replies for status: #{@root_status}" }
uris_to_fetch, n_pages = get_replies(@root_status.uri, MAX_PAGES, options)
return if uris_to_fetch.nil?
fetched_uris = uris_to_fetch.clone.to_set
until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES || n_pages >= MAX_PAGES
next_reply = uris_to_fetch.pop
next if next_reply.nil?
new_reply_uris, new_n_pages = get_replies(next_reply, MAX_PAGES - n_pages, options)
next if new_reply_uris.nil?
new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) }
uris_to_fetch.concat(new_reply_uris)
fetched_uris = fetched_uris.merge(new_reply_uris)
n_pages += new_n_pages
end
Rails.logger.debug { "FetchAllRepliesWorker - #{@root_status.uri}: fetched #{fetched_uris.length} replies" }
# Workers shouldn't be returning anything, but this is used in tests
fetched_uris
end
private
def get_replies(status_uri, max_pages, options = {})
replies_collection_or_uri = get_replies_uri(status_uri)
return if replies_collection_or_uri.nil?
ActivityPub::FetchAllRepliesService.new.call(status_uri, replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys)
end
def get_replies_uri(parent_status_uri)
fetch_resource(parent_status_uri, true)&.fetch('replies', nil)
rescue => e
Rails.logger.info { "FetchAllRepliesWorker - #{@root_status.uri}: Caught exception while resolving replies URI #{parent_status_uri}: #{e} - #{e.message}" }
# Raise if we can't get the collection for top-level status to trigger retry
raise e if parent_status_uri == @root_status.uri
nil
end
end

View file

@ -7,7 +7,7 @@ class ActivityPub::FetchRepliesWorker
sidekiq_options queue: 'pull', retry: 3
def perform(parent_status_id, replies_uri, options = {})
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys)
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id).account.uri, replies_uri, **options.deep_symbolize_keys)
rescue ActiveRecord::RecordNotFound
true
end

View file

@ -1,6 +1,8 @@
# frozen_string_literal: true
class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
DEBOUNCE_DELAY = 5.seconds
sidekiq_options queue: 'push', lock: :until_executed, lock_ttl: 1.day.to_i
# Distribute an profile update to servers that might have a copy