Merge commit '71db616fed' into kb_migration

This commit is contained in:
KMY 2023-07-14 12:43:55 +09:00
commit f18fa97f0c
607 changed files with 3491 additions and 2677 deletions

View file

@ -3,6 +3,8 @@
class Trends::Links < Trends::Base
PREFIX = 'trending_links'
BATCH_SIZE = 100
self.default_options = {
threshold: 5,
review_threshold: 3,
@ -67,8 +69,21 @@ class Trends::Links < Trends::Base
end
def refresh(at_time = Time.now.utc)
preview_cards = PreviewCard.where(id: (recently_used_ids(at_time) + PreviewCardTrend.pluck(:preview_card_id)).uniq)
calculate_scores(preview_cards, at_time)
# First, recalculate scores for links that were trending previously. We split the queries
# to avoid having to load all of the IDs into Ruby just to send them back into Postgres
PreviewCard.where(id: PreviewCardTrend.select(:preview_card_id)).find_in_batches(batch_size: BATCH_SIZE) do |preview_cards|
calculate_scores(preview_cards, at_time)
end
# Then, calculate scores for links that were used today. There are potentially some
# duplicate items here that we might process one more time, but that should be fine
PreviewCard.where(id: recently_used_ids(at_time)).find_in_batches(batch_size: BATCH_SIZE) do |preview_cards|
calculate_scores(preview_cards, at_time)
end
# Now that all trends have up-to-date scores, and all the ones below the threshold have
# been removed, we can recalculate their positions
PreviewCardTrend.connection.exec_update('UPDATE preview_card_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM preview_card_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE preview_card_trends.id = t0.id')
end
def request_review
@ -139,10 +154,7 @@ class Trends::Links < Trends::Base
to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
PreviewCardTrend.transaction do
PreviewCardTrend.upsert_all(to_insert.map { |(score, preview_card)| { preview_card_id: preview_card.id, score: score, language: preview_card.language, allowed: preview_card.trendable? || false } }, unique_by: :preview_card_id) if to_insert.any?
PreviewCardTrend.where(preview_card_id: to_delete.map { |(_, preview_card)| preview_card.id }).delete_all if to_delete.any?
PreviewCardTrend.connection.exec_update('UPDATE preview_card_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM preview_card_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE preview_card_trends.id = t0.id')
end
PreviewCardTrend.upsert_all(to_insert.map { |(score, preview_card)| { preview_card_id: preview_card.id, score: score, language: preview_card.language, allowed: preview_card.trendable? || false } }, unique_by: :preview_card_id) if to_insert.any?
PreviewCardTrend.where(preview_card_id: to_delete.map { |(_, preview_card)| preview_card.id }).delete_all if to_delete.any?
end
end

View file

@ -68,12 +68,10 @@ class Trends::Query
alias to_a to_ary
def to_arel
tmp_ids = ids
if tmp_ids.empty?
if ids_for_key.empty?
klass.none
else
scope = klass.joins("join unnest(array[#{tmp_ids.join(',')}]) with ordinality as x (id, ordering) on #{klass.table_name}.id = x.id").reorder('x.ordering')
scope = klass.joins(sanitized_join_sql).reorder('x.ordering')
scope = scope.offset(@offset) if @offset.present?
scope = scope.limit(@limit) if @limit.present?
scope
@ -95,8 +93,22 @@ class Trends::Query
self
end
def ids
redis.zrevrange(key, 0, -1).map(&:to_i)
def ids_for_key
@ids_for_key ||= redis.zrevrange(key, 0, -1).map(&:to_i)
end
def sanitized_join_sql
ActiveRecord::Base.sanitize_sql_array(join_sql_array)
end
def join_sql_array
[join_sql_query, ids_for_key]
end
def join_sql_query
<<~SQL.squish
JOIN unnest(array[?]) WITH ordinality AS x (id, ordering) ON #{klass.table_name}.id = x.id
SQL
end
def perform_queries

View file

@ -3,6 +3,8 @@
class Trends::Statuses < Trends::Base
PREFIX = 'trending_statuses'
BATCH_SIZE = 100
self.default_options = {
threshold: 5,
review_threshold: 3,
@ -58,8 +60,21 @@ class Trends::Statuses < Trends::Base
end
def refresh(at_time = Time.now.utc)
statuses = Status.where(id: (recently_used_ids(at_time) + StatusTrend.pluck(:status_id)).uniq).includes(:status_stat, :account)
calculate_scores(statuses, at_time)
# First, recalculate scores for statuses that were trending previously. We split the queries
# to avoid having to load all of the IDs into Ruby just to send them back into Postgres
Status.where(id: StatusTrend.select(:status_id)).includes(:status_stat, :account).find_in_batches(batch_size: BATCH_SIZE) do |statuses|
calculate_scores(statuses, at_time)
end
# Then, calculate scores for statuses that were used today. There are potentially some
# duplicate items here that we might process one more time, but that should be fine
Status.where(id: recently_used_ids(at_time)).includes(:status_stat, :account).find_in_batches(batch_size: BATCH_SIZE) do |statuses|
calculate_scores(statuses, at_time)
end
# Now that all trends have up-to-date scores, and all the ones below the threshold have
# been removed, we can recalculate their positions
StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id')
end
def request_review
@ -117,10 +132,7 @@ class Trends::Statuses < Trends::Base
to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
StatusTrend.transaction do
StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any?
StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any?
StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id')
end
StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any?
StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any?
end
end