Add: #595 リモート保留中アカウントからメンションが来た場合にuriを記録し、承認時にフェッチしに行く処理 (#620)

* Add: #591 リモート保留中アカウントからメンションが来た場合にuriを記録し、承認時にフェッチしに行く処理

* Rename fetch_remove_status_worker.rb to fetch_remote_status_worker.rb

* Wip

* Add lock code
This commit is contained in:
KMY(雪あすか) 2024-02-29 12:54:47 +09:00 committed by GitHub
parent b2acc7dbb8
commit 2ab9ea642a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 307 additions and 15 deletions

View file

@ -50,6 +50,11 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def create_status
return reject_payload! if unsupported_object_type? || non_matching_uri_hosts?(@account.uri, object_uri) || tombstone_exists? || !related_to_local_activity?
if @account.suspended?
process_pending_status if @account.remote_pending?
return
end
with_redis_lock("create:#{object_uri}") do
return if delete_arrived_first?(object_uri) || poll_vote?
@ -405,6 +410,20 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
ActivityPub::DistributePollUpdateWorker.perform_in(3.minutes, replied_to_status.id) unless replied_to_status.preloadable_poll.hide_totals?
end
def process_pending_status
with_redis_lock("pending_status:#{@object['id']}") do
return if PendingStatus.exists?(uri: @object['id'])
fetch_account = as_array(@object['tag'])
.filter_map { |tag| equals_or_includes?(tag['type'], 'Mention') && tag['href'] && ActivityPub::TagManager.instance.local_uri?(tag['href']) && ActivityPub::TagManager.instance.uri_to_resource(tag['href'], Account) }
.first
fetch_account ||= (audience_to + audience_cc).filter_map { |uri| ActivityPub::TagManager.instance.local_uri?(uri) && ActivityPub::TagManager.instance.uri_to_resource(uri, Account) }.first
fetch_account ||= Account.representative
PendingStatus.create!(account: @account, uri: @object['id'], fetch_account: fetch_account)
end
end
def resolve_thread(status)
return unless status.reply? && status.thread.nil? && Request.valid_url?(in_reply_to_uri)

View file

@ -20,6 +20,11 @@ class ActivityPub::Activity::Follow < ActivityPub::Activity
return
end
if @account.suspended?
PendingFollowRequest.create!(account: @account, target_account: target_account, uri: @json['id']) if @account.remote_pending?
return
end
if target_account.blocking?(@account) || target_account.domain_blocking?(@account.domain) || target_account.moved? || target_account.instance_actor? || block_new_follow?
reject_follow_request!(target_account)
return
@ -33,13 +38,6 @@ class ActivityPub::Activity::Follow < ActivityPub::Activity
return
end
if @account.suspended? && @account.remote_pending?
PendingFollowRequest.create!(account: @account, target_account: target_account, uri: @json['id'])
return
elsif @account.suspended?
return
end
follow_request = FollowRequest.create!(account: @account, target_account: target_account, uri: @json['id'])
if request_pending_follow?(@account, target_account)

View file

@ -298,14 +298,19 @@ class Account < ApplicationRecord
end
def approve_remote!
return unless remote_pending
update!(remote_pending: false)
unsuspend!
EnableFollowRequestsWorker.perform_async(id)
ActivateRemoteAccountWorker.perform_async(id)
end
def reject_remote!
return unless remote_pending
update!(remote_pending: false, suspension_origin: :local)
pending_follow_requests.destroy_all
pending_statuses.destroy_all
suspend!
end

View file

@ -50,6 +50,11 @@ module Account::Associations
has_many :account_warnings, dependent: :destroy, inverse_of: :account
has_many :strikes, class_name: 'AccountWarning', foreign_key: :target_account_id, dependent: :destroy, inverse_of: :target_account
# Remote pendings
has_many :pending_follow_requests, dependent: :destroy
has_many :pending_statuses, dependent: :destroy
has_many :fetchable_pending_statuses, class_name: 'PendingStatus', foreign_key: :fetch_account_id, dependent: :destroy, inverse_of: :fetch_account
# Antennas (that the account is on, not owned by the account)
has_many :antenna_accounts, inverse_of: :account, dependent: :destroy
has_many :joined_antennas, class_name: 'Antenna', through: :antenna_accounts, source: :antenna

View file

@ -74,7 +74,6 @@ module Account::Interactions
included do
# Follow relations
has_many :follow_requests, dependent: :destroy
has_many :pending_follow_requests, dependent: :destroy
with_options class_name: 'Follow', dependent: :destroy do
has_many :active_relationships, foreign_key: 'account_id', inverse_of: :account

View file

@ -0,0 +1,18 @@
# frozen_string_literal: true
# == Schema Information
#
# Table name: pending_statuses
#
# id :bigint(8) not null, primary key
# account_id :bigint(8) not null
# fetch_account_id :bigint(8) not null
# uri :string not null
# created_at :datetime not null
# updated_at :datetime not null
#
class PendingStatus < ApplicationRecord
belongs_to :account
belongs_to :fetch_account, class_name: 'Account'
end

View file

@ -1,6 +1,6 @@
# frozen_string_literal: true
class EnableFollowRequestsService < BaseService
class ActivateFollowRequestsService < BaseService
include Payloadable
include FollowHelper

View file

@ -0,0 +1,31 @@
# frozen_string_literal: true
class ActivateRemoteStatusesService < BaseService
include Payloadable
include FollowHelper
def call(account)
@account = account
PendingStatus.transaction do
PendingStatus.where(account: account).find_each do |status_info|
approve_status!(status_info)
end
end
end
private
def approve_status!(pending)
account_id = pending.account_id
fetch_account_id = pending.fetch_account_id
fetch_account = pending.fetch_account
uri = pending.uri
pending.destroy!
return if fetch_account.suspended?
return if ActivityPub::TagManager.instance.uri_to_resource(uri, Status).present?
ActivityPub::FetchRemoteStatusWorker.perform_async(uri, account_id, fetch_account_id)
end
end

View file

@ -56,7 +56,7 @@ class ActivityPub::ProcessCollectionService < BaseService
end
def activity_allowed_while_remote_pending?
%w(Follow).include?(@json['type']) || activity_allowed_while_suspended?
%w(Follow Create).include?(@json['type']) || activity_allowed_while_suspended?
end
def process_items(items)

View file

@ -20,6 +20,7 @@ class DeleteAccountService < BaseService
devices
domain_blocks
featured_tags
fetchable_pending_statuses
follow_requests
list_accounts
migrations
@ -29,6 +30,7 @@ class DeleteAccountService < BaseService
owned_lists
passive_relationships
pending_follow_requests
pending_statuses
report_notes
scheduled_statuses
scheduled_expiration_statuses
@ -51,6 +53,7 @@ class DeleteAccountService < BaseService
devices
domain_blocks
featured_tags
fetchable_pending_statuses
follow_requests
list_accounts
migrations
@ -59,6 +62,7 @@ class DeleteAccountService < BaseService
notifications
owned_lists
pending_follow_requests
pending_statuses
scheduled_statuses
scheduled_expiration_statuses
status_pins

View file

@ -1,6 +1,6 @@
# frozen_string_literal: true
class EnableFollowRequestsWorker
class ActivateRemoteAccountWorker
include Sidekiq::Worker
def perform(account_id)
@ -8,6 +8,7 @@ class EnableFollowRequestsWorker
return true if account.nil?
return true if account.suspended?
EnableFollowRequestsService.new.call(account)
ActivateFollowRequestsService.new.call(account)
ActivateRemoteStatusesService.new.call(account)
end
end

View file

@ -0,0 +1,17 @@
# frozen_string_literal: true
class ActivityPub::FetchRemoteStatusWorker
include Sidekiq::Worker
include Redisable
sidekiq_options queue: 'pull', retry: 3
def perform(uri, author_account_id, on_behalf_of_account_id)
author = Account.find(author_account_id)
on_behalf_of = on_behalf_of_account_id.present? ? Account.find(on_behalf_of_account_id) : nil
ActivityPub::FetchRemoteStatusService.new.call(uri, on_behalf_of: on_behalf_of, expected_actor_uri: ActivityPub::TagManager.instance.uri_for(author), request_id: uri)
rescue ActiveRecord::RecordNotFound, Mastodon::RaceConditionError
true
end
end

View file

@ -0,0 +1,13 @@
# frozen_string_literal: true
class CreatePendingStatuses < ActiveRecord::Migration[7.1]
def change
create_table :pending_statuses do |t|
t.references :account, null: false, foreign_key: { on_delete: :cascade }
t.references :fetch_account, null: false, foreign_key: { to_table: 'accounts', on_delete: :cascade }
t.string :uri, null: false, index: { unique: true }
t.timestamps
end
end
end

View file

@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.1].define(version: 2024_02_27_222450) do
ActiveRecord::Schema[7.1].define(version: 2024_02_27_225017) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
@ -1023,6 +1023,17 @@ ActiveRecord::Schema[7.1].define(version: 2024_02_27_222450) do
t.index ["uri"], name: "index_pending_follow_requests_on_uri", unique: true
end
create_table "pending_statuses", force: :cascade do |t|
t.bigint "account_id", null: false
t.bigint "fetch_account_id", null: false
t.string "uri", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["account_id"], name: "index_pending_statuses_on_account_id"
t.index ["fetch_account_id"], name: "index_pending_statuses_on_fetch_account_id"
t.index ["uri"], name: "index_pending_statuses_on_uri", unique: true
end
create_table "pghero_space_stats", force: :cascade do |t|
t.text "database"
t.text "schema"
@ -1617,6 +1628,8 @@ ActiveRecord::Schema[7.1].define(version: 2024_02_27_222450) do
add_foreign_key "one_time_keys", "devices", on_delete: :cascade
add_foreign_key "pending_follow_requests", "accounts", column: "target_account_id", on_delete: :cascade
add_foreign_key "pending_follow_requests", "accounts", on_delete: :cascade
add_foreign_key "pending_statuses", "accounts", column: "fetch_account_id", on_delete: :cascade
add_foreign_key "pending_statuses", "accounts", on_delete: :cascade
add_foreign_key "poll_votes", "accounts", on_delete: :cascade
add_foreign_key "poll_votes", "polls", on_delete: :cascade
add_foreign_key "polls", "accounts", on_delete: :cascade

View file

@ -91,6 +91,7 @@ namespace :dangerous do
20240218233621
20240227033337
20240227222450
20240227225017
)
# Removed: account_groups
target_tables = %w(
@ -111,6 +112,7 @@ namespace :dangerous do
ng_rule_histories
ngword_histories
pending_follow_requests
pending_statuses
scheduled_expiration_statuses
status_capability_tokens
status_references

View file

@ -0,0 +1,7 @@
# frozen_string_literal: true
Fabricator(:pending_status) do
account { Fabricate.build(:account) }
fetch_account { Fabricate.build(:account) }
uri { "https://example.com/#{Time.now.utc.nsec}" }
end

View file

@ -2468,6 +2468,40 @@ RSpec.describe ActivityPub::Activity::Create do
end
end
context 'when sender is in remote pending' do
subject { described_class.new(json, sender, delivery: true) }
let!(:local_account) { Fabricate(:account) }
let(:object_json) do
{
id: [ActivityPub::TagManager.instance.uri_for(sender), '#bar'].join,
type: 'Note',
content: 'Lorem ipsum',
to: local_account ? ActivityPub::TagManager.instance.uri_for(local_account) : 'https://www.w3.org/ns/activitystreams#Public',
}
end
before do
sender.update(suspended_at: Time.now.utc, suspension_origin: :local, remote_pending: true)
subject.perform
end
it 'does not create a status' do
status = sender.statuses.first
expect(status).to be_nil
end
it 'pending data is created' do
pending = PendingStatus.find_by(account: sender)
expect(pending).to_not be_nil
expect(pending.uri).to eq object_json[:id]
expect(pending.account_id).to eq sender.id
expect(pending.fetch_account_id).to eq local_account.id
end
end
context 'when sender is followed by local users' do
subject { described_class.new(json, sender, delivery: true) }

View file

@ -434,6 +434,18 @@ RSpec.describe Account do
end
end
describe '#approve_remote!' do
it 'calls worker' do
account = Fabricate(:account, suspended_at: Time.now.utc, suspension_origin: :local, remote_pending: true)
allow(ActivateRemoteAccountWorker).to receive(:perform_async)
account.approve_remote!
expect(account.remote_pending).to be false
expect(account.suspended?).to be false
expect(ActivateRemoteAccountWorker).to have_received(:perform_async).with(account.id)
end
end
describe '#favourited?' do
subject { Fabricate(:account) }

View file

@ -2,7 +2,7 @@
require 'rails_helper'
RSpec.describe EnableFollowRequestsService, type: :service do
RSpec.describe ActivateFollowRequestsService, type: :service do
subject { described_class.new.call(sender) }
let(:sender) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/actor') }

View file

@ -0,0 +1,67 @@
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe ActivateRemoteStatusesService, type: :service do
subject { described_class.new.call(sender) }
let(:sender) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/actor') }
let(:alice) { Fabricate(:account) }
let!(:pending_status) { Fabricate(:pending_status, account: sender, fetch_account: alice, uri: 'https://example.com/note') }
let(:payload) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: pending_status.uri,
attributedTo: sender.uri,
type: 'Note',
content: 'Lorem ipsum',
to: 'https://www.w3.org/ns/activitystreams#Public',
tag: [
{
type: 'Mention',
href: ActivityPub::TagManager.instance.uri_for(alice),
},
],
}
end
let(:json) { Oj.dump(payload) }
before do
stub_request(:get, 'https://example.com/note').to_return(status: 200, body: json, headers: { 'Content-Type': 'application/activity+json' })
end
context 'when has a pending status' do
before do
subject
end
it 'original status is fetched', :sidekiq_inline do
status = sender.statuses.first
expect(status).to_not be_nil
expect(status.text).to eq 'Lorem ipsum'
end
it 'pending request is removed' do
expect { pending_status.reload }.to raise_error ActiveRecord::RecordNotFound
end
end
context 'when target_account is suspended' do
before do
alice.suspend!
subject
end
it 'original status is not fetched', :sidekiq_inline do
status = sender.statuses.first
expect(status).to be_nil
end
it 'pending request is removed' do
expect { pending_status.reload }.to raise_error ActiveRecord::RecordNotFound
end
end
end

View file

@ -48,6 +48,9 @@ RSpec.describe DeleteAccountService, type: :service do
let!(:account_note) { Fabricate(:account_note, account: account) }
let!(:ng_rule_history) { Fabricate(:ng_rule_history, account: account) }
let!(:pending_follow_request) { Fabricate(:pending_follow_request, account: account) }
let!(:pending_status) { Fabricate(:pending_status, account: account, uri: 'https://example.com/note1') }
let!(:fetchable_pending_status) { Fabricate(:pending_status, fetch_account: account, uri: 'https://example.com/note2') }
it 'deletes associated owned and target records and target notifications' do
subject
@ -77,6 +80,9 @@ RSpec.describe DeleteAccountService, type: :service do
expect { circle_account.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect { circle_status.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect { bookmark_category_status.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect { pending_follow_request.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect { pending_status.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect { fetchable_pending_status.reload }.to raise_error(ActiveRecord::RecordNotFound)
end
def expect_deletion_of_associated_owned_records

View file

@ -0,0 +1,41 @@
# frozen_string_literal: true
require 'rails_helper'
describe ActivityPub::FetchRemoteStatusWorker do
subject { described_class.new }
let(:sender) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/actor') }
let(:payload) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: 'https://example.com/note',
attributedTo: sender.uri,
type: 'Note',
content: 'Lorem ipsum',
to: 'https://www.w3.org/ns/activitystreams#Public',
tag: [
{
type: 'Mention',
href: ActivityPub::TagManager.instance.uri_for(Fabricate(:account)),
},
],
}
end
let(:json) { Oj.dump(payload) }
before do
stub_request(:get, 'https://example.com/note').to_return(status: 200, body: json, headers: { 'Content-Type': 'application/activity+json' })
end
describe '#perform' do
it 'original status is fetched' do
subject.perform('https://example.com/note', sender.id, Fabricate(:account).id)
status = sender.statuses.first
expect(status).to_not be_nil
expect(status.text).to eq 'Lorem ipsum'
end
end
end