Add support for editing for published statuses (#16697)

* Add support for editing for published statuses

* Fix references to stripped-out code

* Various fixes and improvements

* Further fixes and improvements

* Fix updates being potentially sent to unauthorized recipients

* Various fixes and improvements

* Fix wrong words in test

* Fix notifying accounts that were tagged but were not in the audience

* Fix mistake
This commit is contained in:
Eugen Rochko 2022-01-19 22:37:27 +01:00 committed by GitHub
parent 2d1f082bb6
commit 1060666c58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
56 changed files with 1415 additions and 574 deletions

View file

@ -1,54 +1,32 @@
# frozen_string_literal: true
class ActivityPub::DistributionWorker
include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push'
class ActivityPub::DistributionWorker < ActivityPub::RawDistributionWorker
# Distribute a new status or an edit of a status to all the places
# where the status is supposed to go or where it was interacted with
def perform(status_id)
@status = Status.find(status_id)
@account = @status.account
return if skip_distribution?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
end
relay! if relayable?
distribute!
rescue ActiveRecord::RecordNotFound
true
end
private
def skip_distribution?
@status.direct_visibility? || @status.limited_visibility?
end
def relayable?
@status.public_visibility?
end
protected
def inboxes
# Deliver the status to all followers.
# If the status is a reply to another local status, also forward it to that
# status' authors' followers.
@inboxes ||= if @status.in_reply_to_local_account? && @status.distributable?
@account.followers.or(@status.thread.account.followers).inboxes
else
@account.followers.inboxes
end
@inboxes ||= StatusReachFinder.new(@status).inboxes
end
def payload
@payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account))
@payload ||= Oj.dump(serialize_payload(activity, ActivityPub::ActivitySerializer, signer: @account))
end
def relay!
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
[payload, @account.id, inbox_url]
end
def activity
ActivityPub::ActivityPresenter.from_status(@status)
end
def options
{ synchronize_followers: @status.private_visibility? }
end
end

View file

@ -2,22 +2,47 @@
class ActivityPub::RawDistributionWorker
include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push'
# Base worker for when you want to queue up a bunch of deliveries of
# some payload. In this case, we have already generated JSON and
# we are going to distribute it to the account's followers minus
# the explicitly provided inboxes
def perform(json, source_account_id, exclude_inboxes = [])
@account = Account.find(source_account_id)
@account = Account.find(source_account_id)
@json = json
@exclude_inboxes = exclude_inboxes
ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url|
[json, @account.id, inbox_url]
end
distribute!
rescue ActiveRecord::RecordNotFound
true
end
private
protected
def distribute!
return if inboxes.empty?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, source_account_id, inbox_url, options]
end
end
def payload
@json
end
def source_account_id
@account.id
end
def inboxes
@inboxes ||= @account.followers.inboxes
@inboxes ||= @account.followers.inboxes - @exclude_inboxes
end
def options
nil
end
end

View file

@ -1,34 +0,0 @@
# frozen_string_literal: true
# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
# Should be removed in a subsequent release.
class ActivityPub::ReplyDistributionWorker
include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push'
def perform(status_id)
@status = Status.find(status_id)
@account = @status.thread&.account
return unless @account.present? && @status.distributable?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @status.account_id, inbox_url]
end
rescue ActiveRecord::RecordNotFound
true
end
private
def inboxes
@inboxes ||= @account.followers.inboxes
end
def payload
@payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account))
end
end

View file

@ -1,33 +1,24 @@
# frozen_string_literal: true
class ActivityPub::UpdateDistributionWorker
include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push'
class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
# Distribute an profile update to servers that might have a copy
# of the account in question
def perform(account_id, options = {})
@options = options.with_indifferent_access
@account = Account.find(account_id)
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[signed_payload, @account.id, inbox_url]
end
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
[signed_payload, @account.id, inbox_url]
end
distribute!
rescue ActiveRecord::RecordNotFound
true
end
private
protected
def inboxes
@inboxes ||= @account.followers.inboxes
@inboxes ||= AccountReachFinder.new(@account).inboxes
end
def signed_payload
@signed_payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
def payload
@payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
end
end

View file

@ -3,10 +3,10 @@
class DistributionWorker
include Sidekiq::Worker
def perform(status_id)
def perform(status_id, options = {})
RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
if lock.acquired?
FanOutOnWriteService.new.call(Status.find(status_id))
FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
else
raise Mastodon::RaceConditionError
end

View file

@ -3,9 +3,10 @@
class FeedInsertWorker
include Sidekiq::Worker
def perform(status_id, id, type = :home)
@type = type.to_sym
@status = Status.find(status_id)
def perform(status_id, id, type = :home, options = {})
@type = type.to_sym
@status = Status.find(status_id)
@options = options.symbolize_keys
case @type
when :home
@ -23,10 +24,12 @@ class FeedInsertWorker
private
def check_and_insert
return if feed_filtered?
perform_push
perform_notify if notify?
if feed_filtered?
perform_unpush if update?
else
perform_push
perform_notify if notify?
end
end
def feed_filtered?
@ -47,13 +50,26 @@ class FeedInsertWorker
def perform_push
case @type
when :home
FeedManager.instance.push_to_home(@follower, @status)
FeedManager.instance.push_to_home(@follower, @status, update: update?)
when :list
FeedManager.instance.push_to_list(@list, @status)
FeedManager.instance.push_to_list(@list, @status, update: update?)
end
end
def perform_unpush
case @type
when :home
FeedManager.instance.unpush_from_home(@follower, @status, update: true)
when :list
FeedManager.instance.unpush_from_list(@list, @status, update: true)
end
end
def perform_notify
NotifyService.new.call(@follower, :status, @status)
end
def update?
@options[:update]
end
end

View file

@ -12,6 +12,8 @@ class LocalNotificationWorker
activity = activity_class_name.constantize.find(activity_id)
end
return if Notification.where(account: receiver, activity: activity).any?
NotifyService.new.call(receiver, type || activity_class_name.underscore, activity)
rescue ActiveRecord::RecordNotFound
true

View file

@ -6,19 +6,44 @@ class PollExpirationNotifyWorker
sidekiq_options lock: :until_executed
def perform(poll_id)
poll = Poll.find(poll_id)
@poll = Poll.find(poll_id)
# Notify poll owner and remote voters
if poll.local?
ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id)
NotifyService.new.call(poll.account, :poll, poll)
end
return if does_not_expire?
requeue! && return if not_due_yet?
# Notify local voters
poll.votes.includes(:account).group(:account_id).select(:account_id).map(&:account).select(&:local?).each do |account|
NotifyService.new.call(account, :poll, poll)
end
notify_remote_voters_and_owner! if @poll.local?
notify_local_voters!
rescue ActiveRecord::RecordNotFound
true
end
def self.remove_from_scheduled(poll_id)
queue = Sidekiq::ScheduledSet.new
queue.select { |scheduled| scheduled.klass == name && scheduled.args[0] == poll_id }.map(&:delete)
end
private
def does_not_expire?
@poll.expires_at.nil?
end
def not_due_yet?
@poll.expires_at.present? && !@poll.expired?
end
def requeue!
PollExpirationNotifyWorker.perform_at(@poll.expires_at + 5.minutes, @poll.id)
end
def notify_remote_voters_and_owner!
ActivityPub::DistributePollUpdateWorker.perform_async(@poll.status.id)
NotifyService.new.call(@poll.account, :poll, @poll)
end
def notify_local_voters!
@poll.voters.merge(Account.local).find_each do |account|
NotifyService.new.call(account, :poll, @poll)
end
end
end

View file

@ -2,15 +2,38 @@
class PushUpdateWorker
include Sidekiq::Worker
include Redisable
def perform(account_id, status_id, timeline_id = nil)
account = Account.find(account_id)
status = Status.find(status_id)
message = InlineRenderer.render(status, account, :status)
timeline_id = "timeline:#{account.id}" if timeline_id.nil?
def perform(account_id, status_id, timeline_id = nil, options = {})
@account = Account.find(account_id)
@status = Status.find(status_id)
@timeline_id = timeline_id || "timeline:#{account.id}"
@options = options.symbolize_keys
Redis.current.publish(timeline_id, Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
publish!
rescue ActiveRecord::RecordNotFound
true
end
private
def payload
InlineRenderer.render(@status, @account, :status)
end
def message
Oj.dump(
event: update? ? :'status.update' : :update,
payload: payload,
queued_at: (Time.now.to_f * 1000.0).to_i
)
end
def publish!
redis.publish(@timeline_id, message)
end
def update?
@options[:update]
end
end