Move DB queries related to channels in a separate module
This commit is contained in:
parent
d94d4c2045
commit
c021b93b5c
9 changed files with 164 additions and 52 deletions
|
@ -649,13 +649,7 @@ get "/subscription_manager" do |env|
|
|||
format = env.params.query["format"]?
|
||||
format ||= "rss"
|
||||
|
||||
if user.subscriptions.empty?
|
||||
values = "'{}'"
|
||||
else
|
||||
values = "VALUES #{user.subscriptions.map { |id| %(('#{id}')) }.join(",")}"
|
||||
end
|
||||
|
||||
subscriptions = PG_DB.query_all("SELECT * FROM channels WHERE id = ANY(#{values})", as: InvidiousChannel)
|
||||
subscriptions = Invidious::Database::Channels.select(user.subscriptions)
|
||||
subscriptions.sort_by!(&.author.downcase)
|
||||
|
||||
if action_takeout
|
||||
|
|
|
@ -152,21 +152,14 @@ def get_batch_channels(channels, db, refresh = false, pull_all_videos = true, ma
|
|||
end
|
||||
|
||||
def get_channel(id, db, refresh = true, pull_all_videos = true)
|
||||
if channel = db.query_one?("SELECT * FROM channels WHERE id = $1", id, as: InvidiousChannel)
|
||||
if channel = Invidious::Database::Channels.select(id)
|
||||
if refresh && Time.utc - channel.updated > 10.minutes
|
||||
channel = fetch_channel(id, db, pull_all_videos: pull_all_videos)
|
||||
channel_array = channel.to_a
|
||||
args = arg_array(channel_array)
|
||||
|
||||
db.exec("INSERT INTO channels VALUES (#{args}) \
|
||||
ON CONFLICT (id) DO UPDATE SET author = $2, updated = $3", args: channel_array)
|
||||
Invidious::Database::Channels.insert(channel, update_on_conflict: true)
|
||||
end
|
||||
else
|
||||
channel = fetch_channel(id, db, pull_all_videos: pull_all_videos)
|
||||
channel_array = channel.to_a
|
||||
args = arg_array(channel_array)
|
||||
|
||||
db.exec("INSERT INTO channels VALUES (#{args})", args: channel_array)
|
||||
Invidious::Database::Channels.insert(channel)
|
||||
end
|
||||
|
||||
return channel
|
||||
|
@ -241,10 +234,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
|
|||
|
||||
# We don't include the 'premiere_timestamp' here because channel pages don't include them,
|
||||
# meaning the above timestamp is always null
|
||||
was_insert = db.query_one("INSERT INTO channel_videos VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \
|
||||
ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \
|
||||
updated = $4, ucid = $5, author = $6, length_seconds = $7, \
|
||||
live_now = $8, views = $10 returning (xmax=0) as was_insert", *video.to_tuple, as: Bool)
|
||||
was_insert = Invidious::Database::ChannelVideos.insert(video)
|
||||
|
||||
if was_insert
|
||||
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions")
|
||||
|
@ -284,10 +274,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
|
|||
# We are notified of Red videos elsewhere (PubSub), which includes a correct published date,
|
||||
# so since they don't provide a published date here we can safely ignore them.
|
||||
if Time.utc - video.published > 1.minute
|
||||
was_insert = db.query_one("INSERT INTO channel_videos VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \
|
||||
ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \
|
||||
updated = $4, ucid = $5, author = $6, length_seconds = $7, \
|
||||
live_now = $8, views = $10 returning (xmax=0) as was_insert", *video.to_tuple, as: Bool)
|
||||
was_insert = Invidious::Database::ChannelVideos.insert(video)
|
||||
|
||||
db.exec("UPDATE users SET notifications = array_append(notifications, $1), \
|
||||
feed_needs_update = true WHERE $2 = ANY(subscriptions)", video.id, video.ucid) if was_insert
|
||||
|
|
149
src/invidious/database/channels.cr
Normal file
149
src/invidious/database/channels.cr
Normal file
|
@ -0,0 +1,149 @@
|
|||
require "./base.cr"
|
||||
|
||||
#
|
||||
# This module contains functions related to the "channels" table.
|
||||
#
|
||||
module Invidious::Database::Channels
|
||||
extend self
|
||||
|
||||
# -------------------
|
||||
# Insert / delete
|
||||
# -------------------
|
||||
|
||||
def insert(channel : InvidiousChannel, update_on_conflict : Bool = false)
|
||||
channel_array = channel.to_a
|
||||
|
||||
request = <<-SQL
|
||||
INSERT INTO channels
|
||||
VALUES (#{arg_array(channel_array)})
|
||||
SQL
|
||||
|
||||
if update_on_conflict
|
||||
request += <<-SQL
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET author = $2, updated = $3
|
||||
SQL
|
||||
end
|
||||
|
||||
PG_DB.exec(request, args: channel_array)
|
||||
end
|
||||
|
||||
# -------------------
|
||||
# Update
|
||||
# -------------------
|
||||
|
||||
def update_author(id : String, author : String)
|
||||
request = <<-SQL
|
||||
UPDATE channels
|
||||
SET updated = $1, author = $2, deleted = false
|
||||
WHERE id = $3
|
||||
SQL
|
||||
|
||||
PG_DB.exec(request, Time.utc, author, id)
|
||||
end
|
||||
|
||||
def update_mark_deleted(id : String)
|
||||
request = <<-SQL
|
||||
UPDATE channels
|
||||
SET updated = $1, deleted = true
|
||||
WHERE id = $2
|
||||
SQL
|
||||
|
||||
PG_DB.exec(request, Time.utc, id)
|
||||
end
|
||||
|
||||
# -------------------
|
||||
# Select
|
||||
# -------------------
|
||||
|
||||
def select(id : String) : InvidiousChannel?
|
||||
request = <<-SQL
|
||||
SELECT * FROM channels
|
||||
WHERE id = $1
|
||||
SQL
|
||||
|
||||
return PG_DB.query_one?(request, id, as: InvidiousChannel)
|
||||
end
|
||||
|
||||
def select(ids : Array(String)) : Array(InvidiousChannel)?
|
||||
return [] of InvidiousChannel if ids.empty?
|
||||
values = ids.map { |id| %(('#{id}')) }.join(",")
|
||||
|
||||
request = <<-SQL
|
||||
SELECT * FROM channels
|
||||
WHERE id = ANY(VALUES #{values})
|
||||
SQL
|
||||
|
||||
return PG_DB.query_all(request, as: InvidiousChannel)
|
||||
end
|
||||
end
|
||||
|
||||
#
|
||||
# This module contains functions related to the "channel_videos" table.
|
||||
#
|
||||
module Invidious::Database::ChannelVideos
|
||||
extend self
|
||||
|
||||
# -------------------
|
||||
# Insert
|
||||
# -------------------
|
||||
|
||||
# This function returns the status of the query (i.e: success?)
|
||||
def insert(video : ChannelVideo, with_premiere_timestamp : Bool = false) : Bool
|
||||
if with_premiere_timestamp
|
||||
last_items = "premiere_timestamp = $9, views = $10"
|
||||
else
|
||||
last_items = "views = $10"
|
||||
end
|
||||
|
||||
request = <<-SQL
|
||||
INSERT INTO channel_videos
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET title = $2, published = $3, updated = $4, ucid = $5,
|
||||
author = $6, length_seconds = $7, live_now = $8, #{last_items}
|
||||
RETURNING (xmax=0) AS was_insert
|
||||
SQL
|
||||
|
||||
return PG_DB.query_one(request, *video.to_tuple, as: Bool)
|
||||
end
|
||||
|
||||
# -------------------
|
||||
# Select
|
||||
# -------------------
|
||||
|
||||
def select(ids : Array(String)) : Array(ChannelVideo)
|
||||
return [] of ChannelVideo if ids.empty?
|
||||
|
||||
request = <<-SQL
|
||||
SELECT * FROM channel_videos
|
||||
WHERE id IN (#{arg_array(ids)})
|
||||
ORDER BY published DESC
|
||||
SQL
|
||||
|
||||
return PG_DB.query_all(request, args: ids, as: ChannelVideo)
|
||||
end
|
||||
|
||||
def select_notfications(ucid : String, since : Time) : Array(ChannelVideo)
|
||||
request = <<-SQL
|
||||
SELECT * FROM channel_videos
|
||||
WHERE ucid = $1 AND published > $2
|
||||
ORDER BY published DESC
|
||||
LIMIT 15
|
||||
SQL
|
||||
|
||||
return PG_DB.query_all(request, ucid, since, as: ChannelVideo)
|
||||
end
|
||||
|
||||
def select_popular_videos : Array(ChannelVideo)
|
||||
request = <<-SQL
|
||||
SELECT DISTINCT ON (ucid) *
|
||||
FROM channel_videos
|
||||
WHERE ucid IN (SELECT channel FROM (SELECT UNNEST(subscriptions) AS channel FROM users) AS d
|
||||
GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40)
|
||||
ORDER BY ucid, published DESC
|
||||
SQL
|
||||
|
||||
PG_DB.query_all(request, as: ChannelVideo)
|
||||
end
|
||||
end
|
|
@ -235,11 +235,12 @@ def create_notification_stream(env, topics, connection_channel)
|
|||
spawn do
|
||||
begin
|
||||
if since
|
||||
since_unix = Time.unix(since.not_nil!)
|
||||
|
||||
topics.try &.each do |topic|
|
||||
case topic
|
||||
when .match(/UC[A-Za-z0-9_-]{22}/)
|
||||
PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15",
|
||||
topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video|
|
||||
Invidious::Database::ChannelVideos.select_notfications(topic, since_unix).each do |video|
|
||||
response = JSON.parse(video.to_json(locale))
|
||||
|
||||
if fields_text = env.params.query["fields"]?
|
||||
|
|
|
@ -1,11 +1,4 @@
|
|||
class Invidious::Jobs::PullPopularVideosJob < Invidious::Jobs::BaseJob
|
||||
QUERY = <<-SQL
|
||||
SELECT DISTINCT ON (ucid) *
|
||||
FROM channel_videos
|
||||
WHERE ucid IN (SELECT channel FROM (SELECT UNNEST(subscriptions) AS channel FROM users) AS d
|
||||
GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40)
|
||||
ORDER BY ucid, published DESC
|
||||
SQL
|
||||
POPULAR_VIDEOS = Atomic.new([] of ChannelVideo)
|
||||
private getter db : DB::Database
|
||||
|
||||
|
@ -14,7 +7,7 @@ class Invidious::Jobs::PullPopularVideosJob < Invidious::Jobs::BaseJob
|
|||
|
||||
def begin
|
||||
loop do
|
||||
videos = db.query_all(QUERY, as: ChannelVideo)
|
||||
videos = Invidious::Database::ChannelVideos.select_popular_videos
|
||||
.sort_by!(&.published)
|
||||
.reverse!
|
||||
|
||||
|
|
|
@ -35,11 +35,11 @@ class Invidious::Jobs::RefreshChannelsJob < Invidious::Jobs::BaseJob
|
|||
lim_fibers = max_fibers
|
||||
|
||||
LOGGER.trace("RefreshChannelsJob: #{id} fiber : Updating DB")
|
||||
db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id)
|
||||
Invidious::Database::Channels.update_author(id, channel.author)
|
||||
rescue ex
|
||||
LOGGER.error("RefreshChannelsJob: #{id} : #{ex.message}")
|
||||
if ex.message == "Deleted or invalid channel"
|
||||
db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id)
|
||||
Invidious::Database::Channels.update_mark_deleted(id)
|
||||
else
|
||||
lim_fibers = 1
|
||||
LOGGER.error("RefreshChannelsJob: #{id} fiber : backing off for #{backoff}s")
|
||||
|
|
|
@ -72,13 +72,7 @@ module Invidious::Routes::API::V1::Authenticated
|
|||
env.response.content_type = "application/json"
|
||||
user = env.get("user").as(User)
|
||||
|
||||
if user.subscriptions.empty?
|
||||
values = "'{}'"
|
||||
else
|
||||
values = "VALUES #{user.subscriptions.map { |id| %(('#{id}')) }.join(",")}"
|
||||
end
|
||||
|
||||
subscriptions = PG_DB.query_all("SELECT * FROM channels WHERE id = ANY(#{values})", as: InvidiousChannel)
|
||||
subscriptions = Invidious::Database::Channels.select(user.subscriptions)
|
||||
|
||||
JSON.build do |json|
|
||||
json.array do
|
||||
|
|
|
@ -416,10 +416,7 @@ module Invidious::Routes::Feeds
|
|||
views: video.views,
|
||||
})
|
||||
|
||||
was_insert = PG_DB.query_one("INSERT INTO channel_videos VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (id) DO UPDATE SET title = $2, published = $3,
|
||||
updated = $4, ucid = $5, author = $6, length_seconds = $7,
|
||||
live_now = $8, premiere_timestamp = $9, views = $10 returning (xmax=0) as was_insert", *video.to_tuple, as: Bool)
|
||||
was_insert = Invidious::Database::ChannelVideos.insert(video, with_premiere_timestamp: true)
|
||||
|
||||
PG_DB.exec("UPDATE users SET notifications = array_append(notifications, $1),
|
||||
feed_needs_update = true WHERE $2 = ANY(subscriptions)", video.id, video.ucid) if was_insert
|
||||
|
|
|
@ -242,10 +242,7 @@ def get_subscription_feed(db, user, max_results = 40, page = 1)
|
|||
|
||||
if user.preferences.notifications_only && !notifications.empty?
|
||||
# Only show notifications
|
||||
|
||||
args = arg_array(notifications)
|
||||
|
||||
notifications = db.query_all("SELECT * FROM channel_videos WHERE id IN (#{args}) ORDER BY published DESC", args: notifications, as: ChannelVideo)
|
||||
notifications = Invidious::Database::ChannelVideos.select(notifications)
|
||||
videos = [] of ChannelVideo
|
||||
|
||||
notifications.sort_by!(&.published).reverse!
|
||||
|
|
Loading…
Reference in a new issue