-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
262 lines (240 loc) · 12.8 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from video_downloaders.yt_downloader import YouTubeDownloader
from video_downloaders.bili_downloader import BiliDownloader
import channel_meta_archiver
import subprocess
import file_util
from sql.sql_handler import SQLHandler
import discord_webhook
import os
from video_types import VideoType
import shutil
from archive_api import ArchiveAPI
from datetime import datetime
import cutlet
import requests
import configparser
import argparse
import json
def parse_arguments():
parser = argparse.ArgumentParser(description="Patchwork Worker")
parser.add_argument("--db", action="store_true", help="Read the queue directly from DB instead of the API")
parser.add_argument("--update_all_channel_meta", action="store_true", help="Update all channel metadata")
parser.add_argument("--configpath", type=str, default="config.ini", help="Path to worker config.ini file")
parser.add_argument("--cookies", type=str, default="cookies.txt", help="Path to cookies file")
return parser.parse_args()
ERROR_WAIT_TIME = 500 # seconds
COOLDOWN_WAIT_TIME = 250 # seconds
CONFIG = file_util.read_config("config.ini")
def send_heartbeat(status: str):
"""
Sends a heartbeat to the server
:param: status: str
"""
config = read_config("config.ini")
base_url = config.get("queue", "base_url")
password = config.get("queue", "worker_password")
name = config.get("queue", "worker_name")
headers = {'X-AUTHENTICATION': password}
requests.post(f"{base_url}/api/worker/heartbeat", headers=headers, data={"status": status, "name": name})
def read_config(file_path: str):
"""
Reads a config file and returns a dictionary of the config
:param: file_path: str
:return: dict
"""
config = configparser.ConfigParser()
config.read(file_path)
return config
def write_debug_log(message: str) -> None:
"""
Prints a message to the standard output with a timestamp
:param message: str
"""
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}")
def rclone_to_cloud():
"""
Uploads all files in output folder to respective S3 bucket locations
"""
write_debug_log("Preparing to upload files to cloud...")
write_debug_log("Upload video file to cloud using rclone")
subprocess.run(f'{CONFIG.get("path","rclone_path")} -P copy "{CONFIG.get("path","output_dir")}/video" "{CONFIG.get("path","rclone_video_target")}"', shell=True)
write_debug_log("Upload thumbnail file to cloud using rclone")
subprocess.run(f'{CONFIG.get("path","rclone_path")} -P copy "{CONFIG.get("path","output_dir")}/thumbnail" "{CONFIG.get("path","rclone_thumbnail_target")}"', shell=True)
write_debug_log("Upload metadata file to cloud using rclone")
subprocess.run(f'{CONFIG.get("path","rclone_path")} -P copy "{CONFIG.get("path","output_dir")}/metadata" "{CONFIG.get("path","rclone_metadata_target")}"', shell=True)
write_debug_log("Upload captions to cloud using rclone")
subprocess.run(f'{CONFIG.get("path","rclone_path")} -P copy "{CONFIG.get("path","output_dir")}/captions" "{CONFIG.get("path","rclone_captions_target")}"', shell=True)
def rclone_channel_images_to_cloud(pfp_path: str, banner_path: str):
"""
Uploads all channel images in output folder to respective S3 bucket locations
"""
write_debug_log("Preparing to upload channel images to cloud...")
write_debug_log("Upload banner file to cloud using rclone")
subprocess.run(f'{CONFIG.get("path","rclone_path")} -P copy "{banner_path}" "{CONFIG.get("path","rclone_banner_target")}"', shell=True)
write_debug_log("Upload pfp file to cloud using rclone")
subprocess.run(f'{CONFIG.get("path","rclone_path")} -P copy "{pfp_path}" "{CONFIG.get("path","rclone_pfp_target")}"', shell=True)
os.remove(pfp_path)
os.remove(banner_path)
def update_database(video_data: dict, video_type: VideoType, file_ext: str, file_size: float):
hostname = CONFIG.get("database", "host")
user = CONFIG.get("database", "user")
password = CONFIG.get("database", "password")
database = CONFIG.get("database", "database")
server = SQLHandler(hostname, user, password, database)
headers = "video_id, title, channel_name, channel_id, upload_date, description"
if server.check_row_exists("songs", "video_id", video_data["video_id"]):
write_debug_log("Video already exists in database. Updating row instead...")
server.execute_query(f"UPDATE songs SET title = '{video_data['title']}', channel_name = '{video_data['channel_name']}', channel_id = '{video_data['channel_id']}', upload_date = '{video_data['upload_date']}', description = '{video_data['description']}' WHERE video_id = '{video_data['video_id']}'")
else:
if server.insert_row("songs", headers, (video_data["video_id"], video_data["title"], video_data["channel_name"], video_data["channel_id"], video_data["upload_date"], video_data["description"])) is False:
write_debug_log("Error inserting row into database")
return
if server.insert_row("files", "video_id, size_mb, extension", (video_data["video_id"], file_size, file_ext)) is False:
write_debug_log("Error inserting file data into database")
return
katsu = cutlet.Cutlet()
romanized_title = katsu.romaji(video_data["title"])
if server.insert_row("romanized", "video_id, romanized_title", (video_data["video_id"], romanized_title)) is False:
write_debug_log("Error inserting romanization into database")
if server.check_row_exists("channels", "channel_id", video_data["channel_id"]) is False and video_type == VideoType.YOUTUBE:
channel_data = channel_meta_archiver.download_youtube_banner_pfp_desc(video_data["channel_id"], CONFIG.get("youtube", "api_key"))
romanized_name = katsu.romaji(channel_data.name)
rclone_channel_images_to_cloud(channel_data.pfp, channel_data.banner)
server.insert_row("channels", "channel_id, channel_name, romanized_name, description", (video_data["channel_id"], channel_data.name, romanized_name, channel_data.description))
if server.check_row_exists("channels", "channel_id", video_data["channel_id"]) is False and video_type == VideoType.BILIBILI:
print("[WARNING] Bilibili Channel Meta Description not supported yet!")
server.close_connection()
def archive_video(url: str, mode: int)->bool:
"""
Runs through the full routine of downloading a video, thumbnail, metadata, and captions
:param url: str
:param force: int - 0 for normal archival, 1 for force archival
"""
write_debug_log(f"New task received: {url} || Beginning archival...")
if os.path.exists(CONFIG.get("path", "output_dir")):
shutil.rmtree(CONFIG.get("path", "output_dir"))
def classify_video_type() -> tuple:
"""
Classifies the video type based on the URL
:return: VideoType
"""
if "youtube.com" in url or "youtu.be" in url:
return VideoType.YOUTUBE, YouTubeDownloader(CONFIG.get("path", "output_dir"), cookies_file=CONFIG.get("youtube", "cookies"))
elif "bilibili.com" in url:
return VideoType.BILIBILI, BiliDownloader(CONFIG.get("path", "output_dir"))
else:
return None
video_type = classify_video_type()[0]
video_downloader = classify_video_type()[1]
write_debug_log("Classified video type as " + video_type.name)
archiver_api = ArchiveAPI()
if mode != 1 and archiver_api.video_is_archived(video_downloader._get_video_id(url)):
write_debug_log("Video is already archived. Skipping...")
return False
file_ext, file_size = video_downloader.download_video(url)
video_downloader.download_thumbnail(url)
video_metadata_dict = video_downloader.download_metadata(url)
update_database(video_metadata_dict, video_type, file_ext, file_size)
video_downloader.download_captions(url)
return True
def delete_archived_video(video_id: str):
"""
Deletes an archived video from the archive
:param video_id: str
"""
print(f"Deleting video {video_id} from archive...")
subprocess.run(f'{CONFIG.get("path","rclone_path")} delete "{CONFIG.get("path","rclone_video_target")}/{video_id}.webm"', shell=True)
print(f"Deleting thumbnail {video_id} from archive...")
subprocess.run(f'{CONFIG.get("path","rclone_path")} delete "{CONFIG.get("path","rclone_thumbnail_target")}/{video_id}.jpg"', shell=True)
print(f"Deleting metadata {video_id} from archive...")
subprocess.run(f'{CONFIG.get("path","rclone_path")} delete "{CONFIG.get("path","rclone_metadata_target")}/{video_id}.info.json"', shell=True)
print(f"Deleting captions {video_id} from archive...")
subprocess.run(f'{CONFIG.get("path","rclone_path")} delete "{CONFIG.get("path","rclone_captions_target")}/{video_id}"', shell=True)
def execute_server_worker(url: str, mode: int = 0):
"""
To be executed through server.py when deploying an automatic archival
:param url: str
:param mode: int - 0 for normal archival, 1 for force archival
"""
try:
if mode == 2:
delete_archived_video(url)
discord_webhook.send_completed_message(CONFIG.get("discord", "webhook"), url, "Video deleted from archive.")
return
archive_result = archive_video(url, mode)
if archive_result is False:
return
rclone_to_cloud()
discord_webhook.send_completed_message(CONFIG.get("discord", "webhook"), url)
except Exception as e:
write_debug_log(f"Error encountered: {e}")
discord_webhook.send_completed_message(CONFIG.get("discord", "webhook"), url, f"An error occurred while archiving the following video:\n\n{url}\n\nError: {e}")
# This function should only be manually called when you want to generate
# all channel images again
def update_all_channels(override: bool = False):
import csv
hostname = CONFIG.get("database", "host")
user = CONFIG.get("database", "user")
password = CONFIG.get("database", "password")
database = CONFIG.get("database", "database")
katsu = cutlet.Cutlet()
server = SQLHandler(hostname, user, password, database)
failed_file = open('failed_channels.csv', 'w')
with open('channels_patchwork.csv', 'r') as file:
reader = csv.reader(file)
for row in reader:
channel_id = row[0]
channel_name = row[1]
print(f"Processing channel {channel_id}...")
if server.check_row_exists("channels", "channel_id", channel_id) and not override:
write_debug_log(f"Channel {channel_id} already exists in database. Skipping...")
continue
try:
channel_data = channel_meta_archiver.download_youtube_banner_pfp_desc(channel_id, CONFIG.get("youtube", "api_key"))
romanized_name = katsu.romaji(channel_data.name)
server.insert_row("channels", "channel_id, channel_name, romanized_name, description", (channel_id, channel_data.name, romanized_name, channel_data.description))
rclone_channel_images_to_cloud(channel_data.pfp, channel_data.banner)
except Exception as e:
print(f"Error encountered: {e}")
failed_file.write(f"{channel_id},{channel_name}\n")
def execute_next_task(args):
"""
Execute the next archival task in queue
"""
config = read_config(args.configpath)
CONFIG = file_util.read_config(args.configpath)
base_url = config.get("queue", "base_url")
password = config.get("queue", "worker_password")
send_heartbeat("Starting up archival")
if args.db:
# TODO: Add logic to use DB directly instead of through the API
pass
else:
headers = {'X-AUTHENTICATION': password}
has_next_task = True
while has_next_task:
next_video = requests.get(f"{base_url}/api/worker/next", headers=headers)
if next_video.status_code == 200:
print("Found video to archive. Starting...")
next_video_data = json.loads(next_video.text)
send_heartbeat("Archiving " + next_video_data["next_video"])
mode = next_video_data["mode"]
execute_server_worker(next_video_data["next_video"], mode)
elif next_video.status_code == 401:
print("Invalid credentials. The password may be incorrect")
send_heartbeat("Invalid credentials. The password may be incorrect")
has_next_task = False
else:
print("No videos to archive at this time. Cooling down...")
send_heartbeat("Idle. Waiting for work...")
has_next_task = False
print("No tasks remaining. Ending job for now. See you soon!")
if __name__ == "__main__":
"""
Ideally should be run as a cronjob based on how often you want to check for work
"""
args = parse_arguments()
if args.update_all_channel_meta:
update_all_channels(override=True)
else:
execute_next_task(args)