From 0337df3a427bf7b6b99f397d1643868b98987750 Mon Sep 17 00:00:00 2001
From: Claire <claire.github-309c@sitedethib.com>
Date: Thu, 2 Nov 2023 15:58:37 +0100
Subject: [PATCH] Fix posts from threads received out-of-order sometimes not
 being inserted into timelines (#27653)

---
 app/services/fan_out_on_write_service.rb     |   8 +-
 app/workers/thread_resolve_worker.rb         |   9 +-
 spec/lib/activitypub/activity/create_spec.rb | 103 +++++++++++++++++++
 3 files changed, 116 insertions(+), 4 deletions(-)

diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb
index 2554756a5d..f2a79c9fc9 100644
--- a/app/services/fan_out_on_write_service.rb
+++ b/app/services/fan_out_on_write_service.rb
@@ -8,6 +8,7 @@ class FanOutOnWriteService < BaseService
   # @param [Hash] options
   # @option options [Boolean] update
   # @option options [Array<Integer>] silenced_account_ids
+  # @option options [Boolean] skip_notifications
   def call(status, options = {})
     @status    = status
     @account   = status.account
@@ -37,8 +38,11 @@ class FanOutOnWriteService < BaseService
 
   def fan_out_to_local_recipients!
     deliver_to_self!
-    notify_mentioned_accounts!
-    notify_about_update! if update?
+
+    unless @options[:skip_notifications]
+      notify_mentioned_accounts!
+      notify_about_update! if update?
+    end
 
     case @status.visibility.to_sym
     when :public, :unlisted, :private
diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb
index 3206c45f63..d4cefb3fdc 100644
--- a/app/workers/thread_resolve_worker.rb
+++ b/app/workers/thread_resolve_worker.rb
@@ -7,13 +7,18 @@ class ThreadResolveWorker
   sidekiq_options queue: 'pull', retry: 3
 
   def perform(child_status_id, parent_url, options = {})
-    child_status  = Status.find(child_status_id)
-    parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
+    child_status = Status.find(child_status_id)
+    return if child_status.in_reply_to_id.present?
+
+    parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
+    parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
 
     return if parent_status.nil?
 
     child_status.thread = parent_status
     child_status.save!
+
+    DistributionWorker.perform_async(child_status_id, { 'skip_notifications' => true }) if child_status.within_realtime_window?
   rescue ActiveRecord::RecordNotFound
     true
   end
diff --git a/spec/lib/activitypub/activity/create_spec.rb b/spec/lib/activitypub/activity/create_spec.rb
index f6c24754c0..8425f2127c 100644
--- a/spec/lib/activitypub/activity/create_spec.rb
+++ b/spec/lib/activitypub/activity/create_spec.rb
@@ -23,6 +23,109 @@ RSpec.describe ActivityPub::Activity::Create do
     stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' })
   end
 
+  describe 'processing posts received out of order' do
+    let(:follower) { Fabricate(:account, username: 'bob') }
+
+    let(:object_json) do
+      {
+        id: [ActivityPub::TagManager.instance.uri_for(sender), 'post1'].join('/'),
+        type: 'Note',
+        to: [
+          'https://www.w3.org/ns/activitystreams#Public',
+          ActivityPub::TagManager.instance.uri_for(follower),
+        ],
+        content: '@bob lorem ipsum',
+        published: 1.hour.ago.utc.iso8601,
+        updated: 1.hour.ago.utc.iso8601,
+        tag: {
+          type: 'Mention',
+          href: ActivityPub::TagManager.instance.uri_for(follower),
+        },
+      }
+    end
+
+    let(:reply_json) do
+      {
+        id: [ActivityPub::TagManager.instance.uri_for(sender), 'reply'].join('/'),
+        type: 'Note',
+        inReplyTo: object_json[:id],
+        to: [
+          'https://www.w3.org/ns/activitystreams#Public',
+          ActivityPub::TagManager.instance.uri_for(follower),
+        ],
+        content: '@bob lorem ipsum',
+        published: Time.now.utc.iso8601,
+        updated: Time.now.utc.iso8601,
+        tag: {
+          type: 'Mention',
+          href: ActivityPub::TagManager.instance.uri_for(follower),
+        },
+      }
+    end
+
+    def activity_for_object(json)
+      {
+        '@context': 'https://www.w3.org/ns/activitystreams',
+        id: [json[:id], 'activity'].join('/'),
+        type: 'Create',
+        actor: ActivityPub::TagManager.instance.uri_for(sender),
+        object: json,
+      }.with_indifferent_access
+    end
+
+    before do
+      follower.follow!(sender)
+    end
+
+    around do |example|
+      Sidekiq::Testing.fake! do
+        example.run
+        Sidekiq::Worker.clear_all
+      end
+    end
+
+    it 'correctly processes posts and inserts them in timelines', :aggregate_failures do
+      # Simulate a temporary failure preventing from fetching the parent post
+      stub_request(:get, object_json[:id]).to_return(status: 500)
+
+      # When receiving the reply…
+      described_class.new(activity_for_object(reply_json), sender, delivery: true).perform
+
+      # NOTE: Refering explicitly to the workers is a bit awkward
+      DistributionWorker.drain
+      FeedInsertWorker.drain
+
+      # …it creates a status with an unknown parent
+      reply = Status.find_by(uri: reply_json[:id])
+      expect(reply.reply?).to be true
+      expect(reply.in_reply_to_id).to be_nil
+
+      # …and creates a notification
+      expect(LocalNotificationWorker.jobs.size).to eq 1
+
+      # …but does not insert it into timelines
+      expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_nil
+
+      # When receiving the parent…
+      described_class.new(activity_for_object(object_json), sender, delivery: true).perform
+
+      Sidekiq::Worker.drain_all
+
+      # …it creates a status and insert it into timelines
+      parent = Status.find_by(uri: object_json[:id])
+      expect(parent.reply?).to be false
+      expect(parent.in_reply_to_id).to be_nil
+      expect(reply.reload.in_reply_to_id).to eq parent.id
+
+      # Check that the both statuses have been inserted into the home feed
+      expect(redis.zscore(FeedManager.instance.key(:home, follower.id), parent.id)).to be_within(0.1).of(parent.id.to_f)
+      expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_within(0.1).of(reply.id.to_f)
+
+      # Creates two notifications
+      expect(Notification.count).to eq 2
+    end
+  end
+
   describe '#perform' do
     context 'when fetching' do
       subject { described_class.new(json, sender) }