Merge commit '32f0e619f0' into kb_migration_development

This commit is contained in:
KMY 2023-05-07 14:51:21 +09:00
commit 7c118ed1d0
446 changed files with 6295 additions and 3456 deletions

View file

@ -67,7 +67,7 @@ class ActivityPub::FetchRemoteActorService < BaseService
end
def split_acct(acct)
acct.gsub(/\Aacct:/, '').split('@')
acct.delete_prefix('acct:').split('@')
end
def supported_context?

View file

@ -2,12 +2,15 @@
class ActivityPub::FetchRemoteStatusService < BaseService
include JsonLdHelper
include DomainControlHelper
include Redisable
DISCOVERIES_PER_REQUEST = 1000
# Should be called when uri has already been checked for locality
def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil, expected_actor_uri: nil, request_id: nil)
return if domain_not_allowed?(uri)
@request_id = request_id || "#{Time.now.utc.to_i}-status-#{uri}"
@json = if prefetched_body.nil?
fetch_resource(uri, id, on_behalf_of)

View file

@ -24,7 +24,7 @@ class ActivityPub::ProcessAccountService < BaseService
# The key does not need to be unguessable, it just needs to be somewhat unique
@options[:request_id] ||= "#{Time.now.utc.to_i}-#{username}@#{domain}"
with_lock("process_account:#{@uri}") do
with_redis_lock("process_account:#{@uri}") do
@account = Account.remote.find_by(uri: @uri) if @options[:only_key]
@account ||= Account.find_remote(@username, @domain)
@old_public_key = @account&.public_key

View file

@ -35,7 +35,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
last_edit_date = @status.edited_at.presence || @status.created_at
# Only allow processing one create/update per status at a time
with_lock("create:#{@uri}") do
with_redis_lock("create:#{@uri}") do
Status.transaction do
record_previous_edit!
update_media_attachments!
@ -58,7 +58,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
end
def handle_implicit_update!
with_lock("create:#{@uri}") do
with_redis_lock("create:#{@uri}") do
update_poll!(allow_significant_changes: false)
queue_poll_notifications!
end

View file

@ -1,59 +1,67 @@
# frozen_string_literal: true
require 'rubygems/package'
require 'zip'
class BackupService < BaseService
include Payloadable
include ContextHelper
attr_reader :account, :backup, :collection
attr_reader :account, :backup
def call(backup)
@backup = backup
@account = backup.user.account
build_json!
build_archive!
end
private
def build_json!
@collection = serialize(collection_presenter, ActivityPub::CollectionSerializer)
def build_outbox_json!(file)
skeleton = serialize(collection_presenter, ActivityPub::CollectionSerializer)
skeleton[:@context] = full_context
skeleton[:orderedItems] = ['!PLACEHOLDER!']
skeleton = Oj.dump(skeleton)
prepend, append = skeleton.split('"!PLACEHOLDER!"')
add_comma = false
file.write(prepend)
account.statuses.with_includes.reorder(nil).find_in_batches do |statuses|
statuses.each do |status|
item = serialize_payload(ActivityPub::ActivityPresenter.from_status(status), ActivityPub::ActivitySerializer, signer: @account)
item.delete(:@context)
file.write(',') if add_comma
add_comma = true
file.write(statuses.map do |status|
item = serialize_payload(ActivityPub::ActivityPresenter.from_status(status), ActivityPub::ActivitySerializer)
item.delete('@context')
unless item[:type] == 'Announce' || item[:object][:attachment].blank?
item[:object][:attachment].each do |attachment|
attachment[:url] = Addressable::URI.parse(attachment[:url]).path.gsub(/\A\/system\//, '')
attachment[:url] = Addressable::URI.parse(attachment[:url]).path.delete_prefix('/system/')
end
end
@collection[:orderedItems] << item
end
Oj.dump(item)
end.join(','))
GC.start
end
file.write(append)
end
def build_archive!
tmp_file = Tempfile.new(%w(archive .tar.gz))
tmp_file = Tempfile.new(%w(archive .zip))
File.open(tmp_file, 'wb') do |file|
Zlib::GzipWriter.wrap(file) do |gz|
Gem::Package::TarWriter.new(gz) do |tar|
dump_media_attachments!(tar)
dump_outbox!(tar)
dump_likes!(tar)
dump_bookmarks!(tar)
dump_actor!(tar)
end
end
Zip::File.open(tmp_file, create: true) do |zipfile|
dump_outbox!(zipfile)
dump_media_attachments!(zipfile)
dump_likes!(zipfile)
dump_bookmarks!(zipfile)
dump_actor!(zipfile)
end
archive_filename = "#{['archive', Time.now.utc.strftime('%Y%m%d%H%M%S'), SecureRandom.hex(16)].join('-')}.tar.gz"
archive_filename = "#{['archive', Time.now.utc.strftime('%Y%m%d%H%M%S'), SecureRandom.hex(16)].join('-')}.zip"
@backup.dump = ActionDispatch::Http::UploadedFile.new(tempfile: tmp_file, filename: archive_filename)
@backup.processed = true
@ -63,27 +71,28 @@ class BackupService < BaseService
tmp_file.unlink
end
def dump_media_attachments!(tar)
def dump_media_attachments!(zipfile)
MediaAttachment.attached.where(account: account).reorder(nil).find_in_batches do |media_attachments|
media_attachments.each do |m|
next unless m.file&.path
path = m.file&.path
next unless path
download_to_tar(tar, m.file, m.file.path)
path = path.gsub(/\A.*\/system\//, '')
path = path.gsub(/\A\/+/, '')
download_to_zip(zipfile, m.file, path)
end
GC.start
end
end
def dump_outbox!(tar)
json = Oj.dump(collection)
tar.add_file_simple('outbox.json', 0o444, json.bytesize) do |io|
io.write(json)
def dump_outbox!(zipfile)
zipfile.get_output_stream('outbox.json') do |io|
build_outbox_json!(io)
end
end
def dump_actor!(tar)
def dump_actor!(zipfile)
actor = serialize(account, ActivityPub::ActorSerializer)
actor[:icon][:url] = "avatar#{File.extname(actor[:icon][:url])}" if actor[:icon]
@ -92,51 +101,66 @@ class BackupService < BaseService
actor[:likes] = 'likes.json'
actor[:bookmarks] = 'bookmarks.json'
download_to_tar(tar, account.avatar, "avatar#{File.extname(account.avatar.path)}") if account.avatar.exists?
download_to_tar(tar, account.header, "header#{File.extname(account.header.path)}") if account.header.exists?
download_to_zip(tar, account.avatar, "avatar#{File.extname(account.avatar.path)}") if account.avatar.exists?
download_to_zip(tar, account.header, "header#{File.extname(account.header.path)}") if account.header.exists?
json = Oj.dump(actor)
tar.add_file_simple('actor.json', 0o444, json.bytesize) do |io|
zipfile.get_output_stream('actor.json') do |io|
io.write(json)
end
end
def dump_likes!(tar)
collection = serialize(ActivityPub::CollectionPresenter.new(id: 'likes.json', type: :ordered, size: 0, items: []), ActivityPub::CollectionSerializer)
def dump_likes!(zipfile)
skeleton = serialize(ActivityPub::CollectionPresenter.new(id: 'likes.json', type: :ordered, size: 0, items: []), ActivityPub::CollectionSerializer)
skeleton.delete(:totalItems)
skeleton[:orderedItems] = ['!PLACEHOLDER!']
skeleton = Oj.dump(skeleton)
prepend, append = skeleton.split('"!PLACEHOLDER!"')
Status.reorder(nil).joins(:favourites).includes(:account).merge(account.favourites).find_in_batches do |statuses|
statuses.each do |status|
collection[:totalItems] += 1
collection[:orderedItems] << ActivityPub::TagManager.instance.uri_for(status)
zipfile.get_output_stream('likes.json') do |io|
io.write(prepend)
add_comma = false
Status.reorder(nil).joins(:favourites).includes(:account).merge(account.favourites).find_in_batches do |statuses|
io.write(',') if add_comma
add_comma = true
io.write(statuses.map do |status|
Oj.dump(ActivityPub::TagManager.instance.uri_for(status))
end.join(','))
GC.start
end
GC.start
end
json = Oj.dump(collection)
tar.add_file_simple('likes.json', 0o444, json.bytesize) do |io|
io.write(json)
io.write(append)
end
end
def dump_bookmarks!(tar)
collection = serialize(ActivityPub::CollectionPresenter.new(id: 'bookmarks.json', type: :ordered, size: 0, items: []), ActivityPub::CollectionSerializer)
def dump_bookmarks!(zipfile)
skeleton = serialize(ActivityPub::CollectionPresenter.new(id: 'bookmarks.json', type: :ordered, size: 0, items: []), ActivityPub::CollectionSerializer)
skeleton.delete(:totalItems)
skeleton[:orderedItems] = ['!PLACEHOLDER!']
skeleton = Oj.dump(skeleton)
prepend, append = skeleton.split('"!PLACEHOLDER!"')
Status.reorder(nil).joins(:bookmarks).includes(:account).merge(account.bookmarks).find_in_batches do |statuses|
statuses.each do |status|
collection[:totalItems] += 1
collection[:orderedItems] << ActivityPub::TagManager.instance.uri_for(status)
zipfile.get_output_stream('bookmarks.json') do |io|
io.write(prepend)
add_comma = false
Status.reorder(nil).joins(:bookmarks).includes(:account).merge(account.bookmarks).find_in_batches do |statuses|
io.write(',') if add_comma
add_comma = true
io.write(statuses.map do |status|
Oj.dump(ActivityPub::TagManager.instance.uri_for(status))
end.join(','))
GC.start
end
GC.start
end
json = Oj.dump(collection)
tar.add_file_simple('bookmarks.json', 0o444, json.bytesize) do |io|
io.write(json)
io.write(append)
end
end
@ -159,10 +183,10 @@ class BackupService < BaseService
CHUNK_SIZE = 1.megabyte
def download_to_tar(tar, attachment, filename)
def download_to_zip(zipfile, attachment, filename)
adapter = Paperclip.io_adapters.for(attachment)
tar.add_file_simple(filename, 0o444, adapter.size) do |io|
zipfile.get_output_stream(filename) do |io|
while (buffer = adapter.read(CHUNK_SIZE))
io.write(buffer)
end

View file

@ -0,0 +1,60 @@
# frozen_string_literal: true
class BulkImportRowService
def call(row)
@account = row.bulk_import.account
@data = row.data
@type = row.bulk_import.type.to_sym
case @type
when :following, :blocking, :muting
target_acct = @data['acct']
target_domain = domain(target_acct)
@target_account = stoplight_wrap_request(target_domain) { ResolveAccountService.new.call(target_acct, { check_delivery_availability: true }) }
return false if @target_account.nil?
when :bookmarks
target_uri = @data['uri']
target_domain = Addressable::URI.parse(target_uri).normalized_host
@target_status = ActivityPub::TagManager.instance.uri_to_resource(target_uri, Status)
return false if @target_status.nil? && ActivityPub::TagManager.instance.local_uri?(target_uri)
@target_status ||= stoplight_wrap_request(target_domain) { ActivityPub::FetchRemoteStatusService.new.call(target_uri) }
return false if @target_status.nil?
end
case @type
when :following
FollowService.new.call(@account, @target_account, reblogs: @data['show_reblogs'], notify: @data['notify'], languages: @data['languages'])
when :blocking
BlockService.new.call(@account, @target_account)
when :muting
MuteService.new.call(@account, @target_account, notifications: @data['hide_notifications'])
when :bookmarks
return false unless StatusPolicy.new(@account, @target_status).show?
@account.bookmarks.find_or_create_by!(status: @target_status)
end
true
rescue ActiveRecord::RecordNotFound
false
end
def domain(uri)
domain = uri.is_a?(Account) ? uri.domain : uri.split('@')[1]
TagManager.instance.local_domain?(domain) ? nil : TagManager.instance.normalize_domain(domain)
end
def stoplight_wrap_request(domain, &block)
if domain.present?
Stoplight("source:#{domain}", &block)
.with_fallback { nil }
.with_threshold(1)
.with_cool_off_time(5.minutes.seconds)
.with_error_handler { |error, handle| error.is_a?(HTTP::Error) || error.is_a?(OpenSSL::SSL::SSLError) ? handle.call(error) : raise(error) }
.run
else
yield
end
end
end

View file

@ -0,0 +1,160 @@
# frozen_string_literal: true
class BulkImportService < BaseService
def call(import)
@import = import
@account = @import.account
case @import.type.to_sym
when :following
import_follows!
when :blocking
import_blocks!
when :muting
import_mutes!
when :domain_blocking
import_domain_blocks!
when :bookmarks
import_bookmarks!
end
@import.update!(state: :finished, finished_at: Time.now.utc) if @import.processed_items == @import.total_items
rescue
@import.update!(state: :finished, finished_at: Time.now.utc)
raise
end
private
def extract_rows_by_acct
local_domain_suffix = "@#{Rails.configuration.x.local_domain}"
@import.rows.to_a.index_by { |row| row.data['acct'].delete_suffix(local_domain_suffix) }
end
def import_follows!
rows_by_acct = extract_rows_by_acct
if @import.overwrite?
@account.following.find_each do |followee|
row = rows_by_acct.delete(followee.acct)
if row.nil?
UnfollowService.new.call(@account, followee)
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
# Since we're updating the settings of an existing relationship, we can safely call
# FollowService directly
FollowService.new.call(@account, followee, reblogs: row.data['show_reblogs'], notify: row.data['notify'], languages: row.data['languages'])
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
[row.id]
end
end
def import_blocks!
rows_by_acct = extract_rows_by_acct
if @import.overwrite?
@account.blocking.find_each do |blocked_account|
row = rows_by_acct.delete(blocked_account.acct)
if row.nil?
UnblockService.new.call(@account, blocked_account)
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
BlockService.new.call(@account, blocked_account)
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
[row.id]
end
end
def import_mutes!
rows_by_acct = extract_rows_by_acct
if @import.overwrite?
@account.muting.find_each do |muted_account|
row = rows_by_acct.delete(muted_account.acct)
if row.nil?
UnmuteService.new.call(@account, muted_account)
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
MuteService.new.call(@account, muted_account, notifications: row.data['hide_notifications'])
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
[row.id]
end
end
def import_domain_blocks!
domains = @import.rows.map { |row| row.data['domain'] }
if @import.overwrite?
@account.domain_blocks.find_each do |domain_block|
domain = domains.delete(domain_block)
@account.unblock_domain!(domain_block.domain) if domain.nil?
end
end
@import.rows.delete_all
domains.each { |domain| @account.block_domain!(domain) }
@import.update!(processed_items: @import.total_items, imported_items: @import.total_items)
AfterAccountDomainBlockWorker.push_bulk(domains) do |domain|
[@account.id, domain]
end
end
def import_bookmarks!
rows_by_uri = @import.rows.index_by { |row| row.data['uri'] }
if @import.overwrite?
@account.bookmarks.includes(:status).find_each do |bookmark|
row = rows_by_uri.delete(ActivityPub::TagManager.instance.uri_for(bookmark.status))
if row.nil?
bookmark.destroy!
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_uri.values) do |row|
[row.id]
end
end
end

View file

@ -122,12 +122,17 @@ class FanOutOnWriteService < BaseService
domain = @account.domain || Rails.configuration.x.local_domain
antennas = Antenna.availables
antennas = antennas.left_joins(:antenna_accounts).where(any_accounts: true).or(Antenna.availables.left_joins(:antenna_accounts).where(antenna_accounts: { exclude: false, account: @account }))
antennas = antennas.left_joins(:antenna_domains).where(any_domains: true).or(Antenna.availables.left_joins(:antenna_accounts).left_joins(:antenna_domains).where(antenna_domains: { exclude: false, name: domain }))
antennas = antennas.left_joins(:antenna_tags).where(any_tags: true).or(Antenna.availables.left_joins(:antenna_accounts).left_joins(:antenna_domains).left_joins(:antenna_tags).where(antenna_tags: { exclude: false, tag: @status.tags }))
antennas = antennas.where(account: @account.followers) if @status.visibility.to_sym == :unlisted
antennas = antennas.left_joins(:antenna_domains).where(any_domains: true).or(Antenna.left_joins(:antenna_domains).where(antenna_domains: { name: domain }))
antennas = antennas.where(with_media_only: false) unless @status.with_media?
antennas = antennas.where.not(account: @account.blocking)
antennas = Antenna.where(id: antennas.select(:id))
antennas = antennas.left_joins(:antenna_accounts).where(any_accounts: true).or(Antenna.left_joins(:antenna_accounts).where(antenna_accounts: { account: @account }))
tag_ids = @status.tags.pluck(:id)
antennas = Antenna.where(id: antennas.select(:id))
antennas = antennas.left_joins(:antenna_tags).where(any_tags: true).or(Antenna.left_joins(:antenna_tags).where(antenna_tags: { tag_id: tag_ids }))
antennas.in_batches do |ans|
ans.each do |antenna|
next unless antenna.enabled?

View file

@ -23,7 +23,7 @@ class FetchLinkCardService < BaseService
@url = @original_url.to_s
with_lock("fetch:#{@original_url}") do
with_redis_lock("fetch:#{@original_url}") do
@card = PreviewCard.find_by(url: @url)
process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image?
end

View file

@ -4,6 +4,7 @@ class FetchResourceService < BaseService
include JsonLdHelper
ACCEPT_HEADER = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", text/html;q=0.1'
ACTIVITY_STREAM_LINK_TYPES = ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].freeze
attr_reader :response_code
@ -65,7 +66,7 @@ class FetchResourceService < BaseService
def process_html(response)
page = Nokogiri::HTML(response.body_with_limit)
json_link = page.xpath('//link[@rel="alternate"]').find { |link| ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(link['type']) }
json_link = page.xpath('//link[@rel="alternate"]').find { |link| ACTIVITY_STREAM_LINK_TYPES.include?(link['type']) }
process(json_link['href'], terminal: true) unless json_link.nil?
end

View file

@ -9,10 +9,10 @@ class FollowMigrationService < FollowService
def call(source_account, target_account, old_target_account, bypass_locked: false)
@old_target_account = old_target_account
follow = source_account.active_relationships.find_by(target_account: old_target_account)
reblogs = follow&.show_reblogs?
notify = follow&.notify?
languages = follow&.languages
@original_follow = source_account.active_relationships.find_by(target_account: old_target_account)
reblogs = @original_follow&.show_reblogs?
notify = @original_follow&.notify?
languages = @original_follow&.languages
super(source_account, target_account, reblogs: reblogs, notify: notify, languages: languages, bypass_locked: bypass_locked, bypass_limit: true)
end
@ -21,6 +21,7 @@ class FollowMigrationService < FollowService
def request_follow!
follow_request = @source_account.request_follow!(@target_account, **follow_options.merge(rate_limit: @options[:with_rate_limit], bypass_limit: @options[:bypass_limit]))
migrate_list_accounts!
if @target_account.local?
LocalNotificationWorker.perform_async(@target_account.id, follow_request.id, follow_request.class.name, 'follow_request')
@ -32,9 +33,30 @@ class FollowMigrationService < FollowService
follow_request
end
def change_follow_options!
migrate_list_accounts!
super
end
def change_follow_request_options!
migrate_list_accounts!
super
end
def direct_follow!
follow = super
migrate_list_accounts!
UnfollowService.new.call(@source_account, @old_target_account, skip_unmerge: true)
follow
end
def migrate_list_accounts!
ListAccount.where(follow_id: @original_follow.id).includes(:list).find_each do |list_account|
list_account.list.accounts << @target_account
rescue ActiveRecord::RecordInvalid
nil
end
end
end

View file

@ -2,6 +2,9 @@
require 'csv'
# NOTE: This is a deprecated service, only kept to not break ongoing imports
# on upgrade. See `BulkImportService` for its replacement.
class ImportService < BaseService
ROWS_PROCESSING_LIMIT = 20_000

View file

@ -18,7 +18,7 @@ class RemoveStatusService < BaseService
@account = status.account
@options = options
with_lock("distribute:#{@status.id}") do
with_redis_lock("distribute:#{@status.id}") do
@status.discard_with_reblogs
StatusPin.find_by(status: @status)&.destroy
@ -88,7 +88,7 @@ class RemoveStatusService < BaseService
status_reach_finder = StatusReachFinder.new(@status, unsafe: true)
ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes, limit: 1_000) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes + status_reach_finder.inboxes_for_misskey, limit: 1_000) do |inbox_url|
[signed_activity_json, @account.id, inbox_url]
end
end

View file

@ -100,13 +100,13 @@ class ResolveAccountService < BaseService
end
def split_acct(acct)
acct.gsub(/\Aacct:/, '').split('@')
acct.delete_prefix('acct:').split('@')
end
def fetch_account!
return unless activitypub_ready?
with_lock("resolve:#{@username}@#{@domain}") do
with_redis_lock("resolve:#{@username}@#{@domain}") do
@account = ActivityPub::FetchRemoteAccountService.new.call(actor_url, suppress_errors: @options[:suppress_errors])
end

View file

@ -68,7 +68,7 @@ class SuspendAccountService < BaseService
@account.media_attachments.find_each do |media_attachment|
attachment_names.each do |attachment_name|
attachment = media_attachment.public_send(attachment_name)
styles = [:original] | attachment.styles.keys
styles = MediaAttachment::DEFAULT_STYLES | attachment.styles.keys
next if attachment.blank?

View file

@ -2,7 +2,7 @@
class TagSearchService < BaseService
def call(query, options = {})
@query = query.strip.gsub(/\A#/, '')
@query = query.strip.delete_prefix('#')
@offset = options.delete(:offset).to_i
@limit = options.delete(:limit).to_i
@options = options

View file

@ -15,7 +15,7 @@ class UnfollowService < BaseService
@target_account = target_account
@options = options
with_lock("relationship:#{[source_account.id, target_account.id].sort.join(':')}") do
with_redis_lock("relationship:#{[source_account.id, target_account.id].sort.join(':')}") do
unfollow! || undo_follow_request!
end
end

View file

@ -64,7 +64,7 @@ class UnsuspendAccountService < BaseService
@account.media_attachments.find_each do |media_attachment|
attachment_names.each do |attachment_name|
attachment = media_attachment.public_send(attachment_name)
styles = [:original] | attachment.styles.keys
styles = MediaAttachment::DEFAULT_STYLES | attachment.styles.keys
next if attachment.blank?

View file

@ -18,7 +18,7 @@ class VoteService < BaseService
already_voted = true
with_lock("vote:#{@poll.id}:#{@account.id}") do
with_redis_lock("vote:#{@poll.id}:#{@account.id}") do
already_voted = @poll.votes.where(account: @account).exists?
ApplicationRecord.transaction do