Add support for FASP data sharing (#34415)
This commit is contained in:
parent
3ea1f074ab
commit
a5a2c6dc7e
38 changed files with 1140 additions and 1 deletions
|
@ -0,0 +1,26 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Api::Fasp::DataSharing::V0::BackfillRequestsController < Api::Fasp::BaseController
|
||||
def create
|
||||
backfill_request = current_provider.fasp_backfill_requests.new(backfill_request_params)
|
||||
|
||||
respond_to do |format|
|
||||
format.json do
|
||||
if backfill_request.save
|
||||
render json: { backfillRequest: { id: backfill_request.id } }, status: 201
|
||||
else
|
||||
head 422
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def backfill_request_params
|
||||
params
|
||||
.permit(:category, :maxCount)
|
||||
.to_unsafe_h
|
||||
.transform_keys { |k| k.to_s.underscore }
|
||||
end
|
||||
end
|
|
@ -0,0 +1,10 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Api::Fasp::DataSharing::V0::ContinuationsController < Api::Fasp::BaseController
|
||||
def create
|
||||
backfill_request = current_provider.fasp_backfill_requests.find(params[:backfill_request_id])
|
||||
Fasp::BackfillWorker.perform_async(backfill_request.id)
|
||||
|
||||
head 204
|
||||
end
|
||||
end
|
|
@ -0,0 +1,25 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Api::Fasp::DataSharing::V0::EventSubscriptionsController < Api::Fasp::BaseController
|
||||
def create
|
||||
subscription = current_provider.fasp_subscriptions.create!(subscription_params)
|
||||
|
||||
render json: { subscription: { id: subscription.id } }, status: 201
|
||||
end
|
||||
|
||||
def destroy
|
||||
subscription = current_provider.fasp_subscriptions.find(params[:id])
|
||||
subscription.destroy
|
||||
|
||||
head 204
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def subscription_params
|
||||
params
|
||||
.permit(:category, :subscriptionType, :maxBatchSize, threshold: {})
|
||||
.to_unsafe_h
|
||||
.transform_keys { |k| k.to_s.underscore }
|
||||
end
|
||||
end
|
|
@ -32,6 +32,7 @@ class Fasp::Request
|
|||
def request_headers(verb, url, body = '')
|
||||
result = {
|
||||
'accept' => 'application/json',
|
||||
'content-type' => 'application/json',
|
||||
'content-digest' => content_digest(body),
|
||||
}
|
||||
result.merge(signature_headers(verb, url, result))
|
||||
|
|
|
@ -85,6 +85,7 @@ class Account < ApplicationRecord
|
|||
include Account::Associations
|
||||
include Account::Avatar
|
||||
include Account::Counters
|
||||
include Account::FaspConcern
|
||||
include Account::FinderConcern
|
||||
include Account::Header
|
||||
include Account::Interactions
|
||||
|
|
37
app/models/concerns/account/fasp_concern.rb
Normal file
37
app/models/concerns/account/fasp_concern.rb
Normal file
|
@ -0,0 +1,37 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Account::FaspConcern
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
after_commit :announce_new_account_to_subscribed_fasp, on: :create
|
||||
after_commit :announce_updated_account_to_subscribed_fasp, on: :update
|
||||
after_commit :announce_deleted_account_to_subscribed_fasp, on: :destroy
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def announce_new_account_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless discoverable?
|
||||
|
||||
uri = ActivityPub::TagManager.instance.uri_for(self)
|
||||
Fasp::AnnounceAccountLifecycleEventWorker.perform_async(uri, 'new')
|
||||
end
|
||||
|
||||
def announce_updated_account_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless discoverable? || saved_change_to_discoverable?
|
||||
|
||||
uri = ActivityPub::TagManager.instance.uri_for(self)
|
||||
Fasp::AnnounceAccountLifecycleEventWorker.perform_async(uri, 'update')
|
||||
end
|
||||
|
||||
def announce_deleted_account_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless discoverable?
|
||||
|
||||
uri = ActivityPub::TagManager.instance.uri_for(self)
|
||||
Fasp::AnnounceAccountLifecycleEventWorker.perform_async(uri, 'delete')
|
||||
end
|
||||
end
|
17
app/models/concerns/favourite/fasp_concern.rb
Normal file
17
app/models/concerns/favourite/fasp_concern.rb
Normal file
|
@ -0,0 +1,17 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Favourite::FaspConcern
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
after_commit :announce_trends_to_subscribed_fasp, on: :create
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def announce_trends_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
|
||||
Fasp::AnnounceTrendWorker.perform_async(status_id, 'favourite')
|
||||
end
|
||||
end
|
53
app/models/concerns/status/fasp_concern.rb
Normal file
53
app/models/concerns/status/fasp_concern.rb
Normal file
|
@ -0,0 +1,53 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Status::FaspConcern
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
after_commit :announce_new_content_to_subscribed_fasp, on: :create
|
||||
after_commit :announce_updated_content_to_subscribed_fasp, on: :update
|
||||
after_commit :announce_deleted_content_to_subscribed_fasp, on: :destroy
|
||||
after_commit :announce_trends_to_subscribed_fasp, on: :create
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def announce_new_content_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless account_indexable? && public_visibility?
|
||||
|
||||
# We need the uri here, but it is set in another `after_commit`
|
||||
# callback. Hooks included from modules are run before the ones
|
||||
# in the class itself and can neither be reordered nor is there
|
||||
# a way to declare dependencies.
|
||||
store_uri if uri.nil?
|
||||
Fasp::AnnounceContentLifecycleEventWorker.perform_async(uri, 'new')
|
||||
end
|
||||
|
||||
def announce_updated_content_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless account_indexable? && public_visibility?
|
||||
|
||||
Fasp::AnnounceContentLifecycleEventWorker.perform_async(uri, 'update')
|
||||
end
|
||||
|
||||
def announce_deleted_content_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless account_indexable? && public_visibility?
|
||||
|
||||
Fasp::AnnounceContentLifecycleEventWorker.perform_async(uri, 'delete')
|
||||
end
|
||||
|
||||
def announce_trends_to_subscribed_fasp
|
||||
return unless Mastodon::Feature.fasp_enabled?
|
||||
return unless account_indexable?
|
||||
|
||||
candidate_id, trend_source =
|
||||
if reblog_of_id
|
||||
[reblog_of_id, 'reblog']
|
||||
elsif in_reply_to_id
|
||||
[in_reply_to_id, 'reply']
|
||||
end
|
||||
Fasp::AnnounceTrendWorker.perform_async(candidate_id, trend_source) if candidate_id
|
||||
end
|
||||
end
|
|
@ -1,6 +1,8 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Fasp
|
||||
DATA_CATEGORIES = %w(account content).freeze
|
||||
|
||||
def self.table_name_prefix
|
||||
'fasp_'
|
||||
end
|
||||
|
|
67
app/models/fasp/backfill_request.rb
Normal file
67
app/models/fasp/backfill_request.rb
Normal file
|
@ -0,0 +1,67 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# == Schema Information
|
||||
#
|
||||
# Table name: fasp_backfill_requests
|
||||
#
|
||||
# id :bigint(8) not null, primary key
|
||||
# category :string not null
|
||||
# cursor :string
|
||||
# fulfilled :boolean default(FALSE), not null
|
||||
# max_count :integer default(100), not null
|
||||
# created_at :datetime not null
|
||||
# updated_at :datetime not null
|
||||
# fasp_provider_id :bigint(8) not null
|
||||
#
|
||||
class Fasp::BackfillRequest < ApplicationRecord
|
||||
belongs_to :fasp_provider, class_name: 'Fasp::Provider'
|
||||
|
||||
validates :category, presence: true, inclusion: Fasp::DATA_CATEGORIES
|
||||
validates :max_count, presence: true,
|
||||
numericality: { only_integer: true }
|
||||
|
||||
after_commit :queue_fulfillment_job, on: :create
|
||||
|
||||
def next_objects
|
||||
@next_objects ||= base_scope.to_a
|
||||
end
|
||||
|
||||
def next_uris
|
||||
next_objects.map { |o| ActivityPub::TagManager.instance.uri_for(o) }
|
||||
end
|
||||
|
||||
def more_objects_available?
|
||||
return false if next_objects.empty?
|
||||
|
||||
base_scope.where(id: ...(next_objects.last.id)).any?
|
||||
end
|
||||
|
||||
def advance!
|
||||
if more_objects_available?
|
||||
update!(cursor: next_objects.last.id)
|
||||
else
|
||||
update!(fulfilled: true)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def base_scope
|
||||
result = category_scope.limit(max_count).order(id: :desc)
|
||||
result = result.where(id: ...cursor) if cursor.present?
|
||||
result
|
||||
end
|
||||
|
||||
def category_scope
|
||||
case category
|
||||
when 'account'
|
||||
Account.discoverable.without_instance_actor
|
||||
when 'content'
|
||||
Status.indexable
|
||||
end
|
||||
end
|
||||
|
||||
def queue_fulfillment_job
|
||||
Fasp::BackfillWorker.perform_async(id)
|
||||
end
|
||||
end
|
|
@ -22,7 +22,9 @@
|
|||
class Fasp::Provider < ApplicationRecord
|
||||
include DebugConcern
|
||||
|
||||
has_many :fasp_backfill_requests, inverse_of: :fasp_provider, class_name: 'Fasp::BackfillRequest', dependent: :delete_all
|
||||
has_many :fasp_debug_callbacks, inverse_of: :fasp_provider, class_name: 'Fasp::DebugCallback', dependent: :delete_all
|
||||
has_many :fasp_subscriptions, inverse_of: :fasp_provider, class_name: 'Fasp::Subscription', dependent: :delete_all
|
||||
|
||||
validates :name, presence: true
|
||||
validates :base_url, presence: true, url: true
|
||||
|
|
43
app/models/fasp/subscription.rb
Normal file
43
app/models/fasp/subscription.rb
Normal file
|
@ -0,0 +1,43 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# == Schema Information
|
||||
#
|
||||
# Table name: fasp_subscriptions
|
||||
#
|
||||
# id :bigint(8) not null, primary key
|
||||
# category :string not null
|
||||
# max_batch_size :integer not null
|
||||
# subscription_type :string not null
|
||||
# threshold_likes :integer
|
||||
# threshold_replies :integer
|
||||
# threshold_shares :integer
|
||||
# threshold_timeframe :integer
|
||||
# created_at :datetime not null
|
||||
# updated_at :datetime not null
|
||||
# fasp_provider_id :bigint(8) not null
|
||||
#
|
||||
class Fasp::Subscription < ApplicationRecord
|
||||
TYPES = %w(lifecycle trends).freeze
|
||||
|
||||
belongs_to :fasp_provider, class_name: 'Fasp::Provider'
|
||||
|
||||
validates :category, presence: true, inclusion: Fasp::DATA_CATEGORIES
|
||||
validates :subscription_type, presence: true,
|
||||
inclusion: TYPES
|
||||
|
||||
scope :category_content, -> { where(category: 'content') }
|
||||
scope :category_account, -> { where(category: 'account') }
|
||||
scope :lifecycle, -> { where(subscription_type: 'lifecycle') }
|
||||
scope :trends, -> { where(subscription_type: 'trends') }
|
||||
|
||||
def threshold=(threshold)
|
||||
self.threshold_timeframe = threshold['timeframe'] || 15
|
||||
self.threshold_shares = threshold['shares'] || 3
|
||||
self.threshold_likes = threshold['likes'] || 3
|
||||
self.threshold_replies = threshold['replies'] || 3
|
||||
end
|
||||
|
||||
def timeframe_start
|
||||
threshold_timeframe.minutes.ago
|
||||
end
|
||||
end
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
class Favourite < ApplicationRecord
|
||||
include Paginable
|
||||
include Favourite::FaspConcern
|
||||
|
||||
update_index('statuses', :status)
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ class Status < ApplicationRecord
|
|||
include Discard::Model
|
||||
include Paginable
|
||||
include RateLimitable
|
||||
include Status::FaspConcern
|
||||
include Status::FetchRepliesConcern
|
||||
include Status::SafeReblogInsert
|
||||
include Status::SearchConcern
|
||||
|
@ -181,7 +182,7 @@ class Status < ApplicationRecord
|
|||
],
|
||||
thread: :account
|
||||
|
||||
delegate :domain, to: :account, prefix: true
|
||||
delegate :domain, :indexable?, to: :account, prefix: true
|
||||
|
||||
REAL_TIME_WINDOW = 6.hours
|
||||
|
||||
|
|
28
app/workers/fasp/announce_account_lifecycle_event_worker.rb
Normal file
28
app/workers/fasp/announce_account_lifecycle_event_worker.rb
Normal file
|
@ -0,0 +1,28 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Fasp::AnnounceAccountLifecycleEventWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'fasp', retry: 5
|
||||
|
||||
def perform(uri, event_type)
|
||||
Fasp::Subscription.includes(:fasp_provider).category_account.lifecycle.each do |subscription|
|
||||
announce(subscription, uri, event_type)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def announce(subscription, uri, event_type)
|
||||
Fasp::Request.new(subscription.fasp_provider).post('/data_sharing/v0/announcements', body: {
|
||||
source: {
|
||||
subscription: {
|
||||
id: subscription.id.to_s,
|
||||
},
|
||||
},
|
||||
category: 'account',
|
||||
eventType: event_type,
|
||||
objectUris: [uri],
|
||||
})
|
||||
end
|
||||
end
|
28
app/workers/fasp/announce_content_lifecycle_event_worker.rb
Normal file
28
app/workers/fasp/announce_content_lifecycle_event_worker.rb
Normal file
|
@ -0,0 +1,28 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Fasp::AnnounceContentLifecycleEventWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'fasp', retry: 5
|
||||
|
||||
def perform(uri, event_type)
|
||||
Fasp::Subscription.includes(:fasp_provider).category_content.lifecycle.each do |subscription|
|
||||
announce(subscription, uri, event_type)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def announce(subscription, uri, event_type)
|
||||
Fasp::Request.new(subscription.fasp_provider).post('/data_sharing/v0/announcements', body: {
|
||||
source: {
|
||||
subscription: {
|
||||
id: subscription.id.to_s,
|
||||
},
|
||||
},
|
||||
category: 'content',
|
||||
eventType: event_type,
|
||||
objectUris: [uri],
|
||||
})
|
||||
end
|
||||
end
|
61
app/workers/fasp/announce_trend_worker.rb
Normal file
61
app/workers/fasp/announce_trend_worker.rb
Normal file
|
@ -0,0 +1,61 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Fasp::AnnounceTrendWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'fasp', retry: 5
|
||||
|
||||
def perform(status_id, trend_source)
|
||||
status = ::Status.includes(:account).find(status_id)
|
||||
return unless status.account.indexable?
|
||||
|
||||
Fasp::Subscription.includes(:fasp_provider).category_content.trends.each do |subscription|
|
||||
announce(subscription, status.uri) if trending?(subscription, status, trend_source)
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
# status might not exist anymore, in which case there is nothing to do
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def trending?(subscription, status, trend_source)
|
||||
scope = scope_for(status, trend_source)
|
||||
threshold = threshold_for(subscription, trend_source)
|
||||
scope.where(created_at: subscription.timeframe_start..).count >= threshold
|
||||
end
|
||||
|
||||
def scope_for(status, trend_source)
|
||||
case trend_source
|
||||
when 'favourite'
|
||||
status.favourites
|
||||
when 'reblog'
|
||||
status.reblogs
|
||||
when 'reply'
|
||||
status.replies
|
||||
end
|
||||
end
|
||||
|
||||
def threshold_for(subscription, trend_source)
|
||||
case trend_source
|
||||
when 'favourite'
|
||||
subscription.threshold_likes
|
||||
when 'reblog'
|
||||
subscription.threshold_shares
|
||||
when 'reply'
|
||||
subscription.threshold_replies
|
||||
end
|
||||
end
|
||||
|
||||
def announce(subscription, uri)
|
||||
Fasp::Request.new(subscription.fasp_provider).post('/data_sharing/v0/announcements', body: {
|
||||
source: {
|
||||
subscription: {
|
||||
id: subscription.id.to_s,
|
||||
},
|
||||
},
|
||||
category: 'content',
|
||||
eventType: 'trending',
|
||||
objectUris: [uri],
|
||||
})
|
||||
end
|
||||
end
|
32
app/workers/fasp/backfill_worker.rb
Normal file
32
app/workers/fasp/backfill_worker.rb
Normal file
|
@ -0,0 +1,32 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Fasp::BackfillWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'fasp', retry: 5
|
||||
|
||||
def perform(backfill_request_id)
|
||||
backfill_request = Fasp::BackfillRequest.find(backfill_request_id)
|
||||
|
||||
announce(backfill_request)
|
||||
|
||||
backfill_request.advance!
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
# ignore missing backfill requests
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def announce(backfill_request)
|
||||
Fasp::Request.new(backfill_request.fasp_provider).post('/data_sharing/v0/announcements', body: {
|
||||
source: {
|
||||
backfillRequest: {
|
||||
id: backfill_request.id.to_s,
|
||||
},
|
||||
},
|
||||
category: backfill_request.category,
|
||||
objectUris: backfill_request.next_uris,
|
||||
moreObjectsAvailable: backfill_request.more_objects_available?,
|
||||
})
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue