Fix gallery ordering, fix inconcistent line ending

This commit is contained in:
John Stephani 2025-12-26 14:11:32 -06:00
parent 3d806c729c
commit 3d2edfd5cf
4 changed files with 7956 additions and 7945 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,96 +1,96 @@
import os import os
import csv import csv
import json import json
import logging import logging
import requests import requests
from urllib.parse import urlparse from urllib.parse import urlparse
from pygments import formatters, highlight, lexers from pygments import formatters, highlight, lexers
logging.basicConfig( logging.basicConfig(
level=logging.INFO, filename="YARS.log", format="%(asctime)s - %(message)s" level=logging.INFO, filename="YARS.log", format="%(asctime)s - %(message)s"
) )
def display_results(results, title): def display_results(results, title):
try: try:
print(f"\n{'-'*20} {title} {'-'*20}") print(f"\n{'-'*20} {title} {'-'*20}")
if isinstance(results, list): if isinstance(results, list):
for item in results: for item in results:
if isinstance(item, dict): if isinstance(item, dict):
formatted_json = json.dumps(item, sort_keys=True, indent=4) formatted_json = json.dumps(item, sort_keys=True, indent=4)
colorful_json = highlight( colorful_json = highlight(
formatted_json, formatted_json,
lexers.JsonLexer(), lexers.JsonLexer(),
formatters.TerminalFormatter(), formatters.TerminalFormatter(),
) )
print(colorful_json) print(colorful_json)
else: else:
print(item) print(item)
elif isinstance(results, dict): elif isinstance(results, dict):
formatted_json = json.dumps(results, sort_keys=True, indent=4) formatted_json = json.dumps(results, sort_keys=True, indent=4)
colorful_json = highlight( colorful_json = highlight(
formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter()
) )
print(colorful_json) print(colorful_json)
else: else:
logging.warning( logging.warning(
"No results to display: expected a list or dictionary, got %S", "No results to display: expected a list or dictionary, got %S",
type(results), type(results),
) )
print("No results to display.") print("No results to display.")
except Exception as e: except Exception as e:
logging.error(f"Error displaying results: {e}") logging.error(f"Error displaying results: {e}")
print("Error displaying results.") print("Error displaying results.")
def download_image(image_url, output_folder="images", session=None): def download_image(image_url, output_folder="images", session=None):
os.makedirs(output_folder, exist_ok=True) os.makedirs(output_folder, exist_ok=True)
filename = urlparse(image_url).path filename = urlparse(image_url).path
if filename[0]=='/': if filename[0]=='/':
filename = filename[1:] filename = filename[1:]
filepath = os.path.join(output_folder, filename) filepath = os.path.join(output_folder, filename)
os.makedirs(filepath[:filepath.rfind('/')], exist_ok=True) os.makedirs(filepath[:filepath.rfind('/')], exist_ok=True)
if session is None: if session is None:
session = requests.Session() session = requests.Session()
try: try:
response = session.get(image_url, stream=True) response = session.get(image_url, stream=True)
response.raise_for_status() response.raise_for_status()
with open(filepath, "wb") as f: with open(filepath, "wb") as f:
for chunk in response.iter_content(8192): for chunk in response.iter_content(8192):
f.write(chunk) f.write(chunk)
logging.info("Downloaded: %s", filepath) logging.info("Downloaded: %s", filepath)
return filepath return filepath
except requests.RequestException as e: except requests.RequestException as e:
logging.error("Failed to download %s: %s", image_url, e) logging.error("Failed to download %s: %s", image_url, e)
return None return None
except Exception as e: except Exception as e:
logging.error("An error occurred while saving the image: %s", e) logging.error("An error occurred while saving the image: %s", e)
return None return None
def export_to_json(data, filename="output.json"): def export_to_json(data, filename="output.json"):
try: try:
with open(filename, "w", encoding="utf-8") as json_file: with open(filename, "w", encoding="utf-8") as json_file:
json.dump(data, json_file, indent=4) json.dump(data, json_file, indent=4)
print(f"Data successfully exported to {filename}") print(f"Data successfully exported to {filename}")
except Exception as e: except Exception as e:
print(f"Error exporting to JSON: {e}") print(f"Error exporting to JSON: {e}")
def export_to_csv(data, filename="output.csv"): def export_to_csv(data, filename="output.csv"):
try: try:
keys = data[0].keys() keys = data[0].keys()
with open(filename, "w", newline="", encoding="utf-8") as output_file: with open(filename, "w", newline="", encoding="utf-8") as output_file:
dict_writer = csv.DictWriter(output_file, fieldnames=keys) dict_writer = csv.DictWriter(output_file, fieldnames=keys)
dict_writer.writeheader() dict_writer.writeheader()
dict_writer.writerows(data) dict_writer.writerows(data)
print(f"Data successfully exported to {filename}") print(f"Data successfully exported to {filename}")
except Exception as e: except Exception as e:
print(f"Error exporting to CSV: {e}") print(f"Error exporting to CSV: {e}")

View File

@ -1,309 +1,319 @@
from __future__ import annotations from __future__ import annotations
from .sessions import RandomUserAgentSession from .sessions import RandomUserAgentSession
import time import time
import random import random
import logging import logging
import re import re
import requests import requests
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
logger = logging.basicConfig( logger = logging.basicConfig(
filename="YARS.log", filename="YARS.log",
level=logging.INFO, level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s", format="%(asctime)s - %(levelname)s - %(message)s",
) )
class YARS: class YARS:
__slots__ = ("headers", "session", "proxy", "timeout") __slots__ = ("headers", "session", "proxy", "timeout")
def __init__(self, proxy=None, timeout=10, random_user_agent=True): def __init__(self, proxy=None, timeout=10, random_user_agent=True):
self.session = RandomUserAgentSession() if random_user_agent else requests.Session() self.session = RandomUserAgentSession() if random_user_agent else requests.Session()
self.proxy = proxy self.proxy = proxy
self.timeout = timeout self.timeout = timeout
retries = Retry( retries = Retry(
total=5, total=5,
backoff_factor=2, # Exponential backoff backoff_factor=2, # Exponential backoff
status_forcelist=[429, 500, 502, 503, 504], status_forcelist=[429, 500, 502, 503, 504],
) )
self.session.mount("https://", HTTPAdapter(max_retries=retries)) self.session.mount("https://", HTTPAdapter(max_retries=retries))
if proxy: if proxy:
self.session.proxies.update({"http": proxy, "https": proxy}) self.session.proxies.update({"http": proxy, "https": proxy})
def handle_search(self,url, params, after=None, before=None): def handle_search(self,url, params, after=None, before=None):
if after: if after:
params["after"] = after params["after"] = after
if before: if before:
params["before"] = before params["before"] = before
try: try:
response = self.session.get(url, params=params, timeout=self.timeout) response = self.session.get(url, params=params, timeout=self.timeout)
response.raise_for_status() response.raise_for_status()
logging.info("Search request successful") logging.info("Search request successful")
except Exception as e: except Exception as e:
if response.status_code != 200: if response.status_code != 200:
logging.info("Search request unsuccessful due to: %s", e) logging.info("Search request unsuccessful due to: %s", e)
print(f"Failed to fetch search results: {response.status_code}") print(f"Failed to fetch search results: {response.status_code}")
return [] return []
data = response.json() data = response.json()
results = [] results = []
for post in data["data"]["children"]: for post in data["data"]["children"]:
post_data = post["data"] post_data = post["data"]
results.append( results.append(
{ {
"title": post_data["title"], "title": post_data["title"],
"link": f"https://www.reddit.com{post_data['permalink']}", "link": f"https://www.reddit.com{post_data['permalink']}",
"description": post_data.get("selftext", "")[:269], "description": post_data.get("selftext", "")[:269],
} }
) )
logging.info("Search Results Retrned %d Results", len(results)) logging.info("Search Results Retrned %d Results", len(results))
return results return results
def search_reddit(self, query, limit=10, after=None, before=None): def search_reddit(self, query, limit=10, after=None, before=None):
url = "https://www.reddit.com/search.json" url = "https://www.reddit.com/search.json"
params = {"q": query, "limit": limit, "sort": "relevance", "type": "link"} params = {"q": query, "limit": limit, "sort": "relevance", "type": "link"}
return self.handle_search(url, params, after, before) return self.handle_search(url, params, after, before)
def search_subreddit(self, subreddit, query, limit=10, after=None, before=None, sort="relevance"): def search_subreddit(self, subreddit, query, limit=10, after=None, before=None, sort="relevance"):
url = f"https://www.reddit.com/r/{subreddit}/search.json" url = f"https://www.reddit.com/r/{subreddit}/search.json"
params = {"q": query, "limit": limit, "sort": "relevance", "type": "link","restrict_sr":"on"} params = {"q": query, "limit": limit, "sort": "relevance", "type": "link","restrict_sr":"on"}
return self.handle_search(url, params, after, before) return self.handle_search(url, params, after, before)
def scrape_post_details(self, permalink): def scrape_post_details(self, permalink):
url = f"https://www.reddit.com{permalink}.json" url = f"https://www.reddit.com{permalink}.json"
try: try:
response = self.session.get(url, timeout=self.timeout) response = self.session.get(url, timeout=self.timeout)
response.raise_for_status() response.raise_for_status()
logging.info("Post details request successful : %s", url) logging.info("Post details request successful : %s", url)
except Exception as e: except Exception as e:
logging.info("Post details request unsccessful: %e", e) logging.info("Post details request unsccessful: %e", e)
if response.status_code != 200: if response.status_code != 200:
print(f"Failed to fetch post data: {response.status_code}") print(f"Failed to fetch post data: {response.status_code}")
return None return None
post_data = response.json() post_data = response.json()
if not isinstance(post_data, list) or len(post_data) < 2: if not isinstance(post_data, list) or len(post_data) < 2:
logging.info("Unexpected post data structre") logging.info("Unexpected post data structre")
print("Unexpected post data structure") print("Unexpected post data structure")
return None return None
main_post = post_data[0]["data"]["children"][0]["data"] main_post = post_data[0]["data"]["children"][0]["data"]
title = main_post["title"] title = main_post["title"]
body = main_post.get("selftext", "") body = main_post.get("selftext", "")
comments = self._extract_comments(post_data[1]["data"]["children"]) comments = self._extract_comments(post_data[1]["data"]["children"])
logging.info("Successfully scraped post: %s", title) logging.info("Successfully scraped post: %s", title)
return {"title": title, "body": body, "comments": comments} return {"title": title, "body": body, "comments": comments}
def _extract_comments(self, comments): def _extract_comments(self, comments):
logging.info("Extracting comments") logging.info("Extracting comments")
extracted_comments = [] extracted_comments = []
for comment in comments: for comment in comments:
if isinstance(comment, dict) and comment.get("kind") == "t1": if isinstance(comment, dict) and comment.get("kind") == "t1":
comment_data = comment.get("data", {}) comment_data = comment.get("data", {})
extracted_comment = { extracted_comment = {
"author": comment_data.get("author", ""), "author": comment_data.get("author", ""),
"body": comment_data.get("body", ""), "body": comment_data.get("body", ""),
"score": comment_data.get("score",""), "score": comment_data.get("score",""),
"replies": [], "replies": [],
} }
replies = comment_data.get("replies", "") replies = comment_data.get("replies", "")
if isinstance(replies, dict): if isinstance(replies, dict):
extracted_comment["replies"] = self._extract_comments( extracted_comment["replies"] = self._extract_comments(
replies.get("data", {}).get("children", []) replies.get("data", {}).get("children", [])
) )
extracted_comments.append(extracted_comment) extracted_comments.append(extracted_comment)
logging.info("Successfully extracted comments") logging.info("Successfully extracted comments")
return extracted_comments return extracted_comments
def scrape_user_data(self, username, limit=10): def scrape_user_data(self, username, limit=10):
logging.info("Scraping user data for %s, limit: %d", username, limit) logging.info("Scraping user data for %s, limit: %d", username, limit)
base_url = f"https://www.reddit.com/user/{username}/.json" base_url = f"https://www.reddit.com/user/{username}/.json"
params = {"limit": limit, "after": None} params = {"limit": limit, "after": None}
all_items = [] all_items = []
count = 0 count = 0
while count < limit: while count < limit:
try: try:
response = self.session.get( response = self.session.get(
base_url, params=params, timeout=self.timeout base_url, params=params, timeout=self.timeout
) )
response.raise_for_status() response.raise_for_status()
logging.info("User data request successful") logging.info("User data request successful")
except Exception as e: except Exception as e:
logging.info("User data request unsuccessful: %s", e) logging.info("User data request unsuccessful: %s", e)
if response.status_code != 200: if response.status_code != 200:
print( print(
f"Failed to fetch data for user {username}: {response.status_code}" f"Failed to fetch data for user {username}: {response.status_code}"
) )
break break
try: try:
data = response.json() data = response.json()
except ValueError: except ValueError:
print(f"Failed to parse JSON response for user {username}.") print(f"Failed to parse JSON response for user {username}.")
break break
if "data" not in data or "children" not in data["data"]: if "data" not in data or "children" not in data["data"]:
print( print(
f"No 'data' or 'children' field found in response for user {username}." f"No 'data' or 'children' field found in response for user {username}."
) )
logging.info("No 'data' or 'children' field found in response") logging.info("No 'data' or 'children' field found in response")
break break
items = data["data"]["children"] items = data["data"]["children"]
if not items: if not items:
print(f"No more items found for user {username}.") print(f"No more items found for user {username}.")
logging.info("No more items found for user") logging.info("No more items found for user")
break break
for item in items: for item in items:
kind = item["kind"] kind = item["kind"]
item_data = item["data"] item_data = item["data"]
if kind == "t3": if kind == "t3":
post_url = f"https://www.reddit.com{item_data.get('permalink', '')}" post_url = f"https://www.reddit.com{item_data.get('permalink', '')}"
all_items.append( all_items.append(
{ {
"type": "post", "type": "post",
"title": item_data.get("title", ""), "title": item_data.get("title", ""),
"subreddit": item_data.get("subreddit", ""), "subreddit": item_data.get("subreddit", ""),
"url": post_url, "url": post_url,
"created_utc": item_data.get("created_utc", ""), "created_utc": item_data.get("created_utc", ""),
} }
) )
elif kind == "t1": elif kind == "t1":
comment_url = ( comment_url = (
f"https://www.reddit.com{item_data.get('permalink', '')}" f"https://www.reddit.com{item_data.get('permalink', '')}"
) )
all_items.append( all_items.append(
{ {
"type": "comment", "type": "comment",
"subreddit": item_data.get("subreddit", ""), "subreddit": item_data.get("subreddit", ""),
"body": item_data.get("body", ""), "body": item_data.get("body", ""),
"created_utc": item_data.get("created_utc", ""), "created_utc": item_data.get("created_utc", ""),
"url": comment_url, "url": comment_url,
} }
) )
count += 1 count += 1
if count >= limit: if count >= limit:
break break
params["after"] = data["data"].get("after") params["after"] = data["data"].get("after")
if not params["after"]: if not params["after"]:
break break
time.sleep(random.uniform(1, 2)) time.sleep(random.uniform(1, 2))
logging.info("Sleeping for random time") logging.info("Sleeping for random time")
logging.info("Successfully scraped user data for %s", username) logging.info("Successfully scraped user data for %s", username)
return all_items return all_items
def fetch_subreddit_posts( def fetch_subreddit_posts(
self, subreddit, limit=10, category="hot", time_filter="all" self, subreddit, limit=10, category="hot", time_filter="all"
): ):
logging.info( logging.info(
"Fetching subreddit/user posts for %s, limit: %d, category: %s, time_filter: %s", "Fetching subreddit/user posts for %s, limit: %d, category: %s, time_filter: %s",
subreddit, subreddit,
limit, limit,
category, category,
time_filter, time_filter,
) )
if category not in ["hot", "top", "new", "userhot", "usertop", "usernew"]: if category not in ["hot", "top", "new", "userhot", "usertop", "usernew"]:
raise ValueError("Category for Subredit must be either 'hot', 'top', or 'new' or for User must be 'userhot', 'usertop', or 'usernew'") raise ValueError("Category for Subredit must be either 'hot', 'top', or 'new' or for User must be 'userhot', 'usertop', or 'usernew'")
batch_size = min(100, limit) batch_size = min(100, limit)
total_fetched = 0 total_fetched = 0
after = None after = None
all_posts = [] all_posts = []
while total_fetched < limit: while total_fetched < limit:
if category == "hot": if category == "hot":
url = f"https://www.reddit.com/r/{subreddit}/hot.json" url = f"https://www.reddit.com/r/{subreddit}/hot.json"
elif category == "top": elif category == "top":
url = f"https://www.reddit.com/r/{subreddit}/top.json" url = f"https://www.reddit.com/r/{subreddit}/top.json"
elif category == "new": elif category == "new":
url = f"https://www.reddit.com/r/{subreddit}/new.json" url = f"https://www.reddit.com/r/{subreddit}/new.json"
elif category == "userhot": elif category == "userhot":
url = f"https://www.reddit.com/user/{subreddit}/submitted/hot.json" url = f"https://www.reddit.com/user/{subreddit}/submitted/hot.json"
elif category == "usertop": elif category == "usertop":
url = f"https://www.reddit.com/user/{subreddit}/submitted/top.json" url = f"https://www.reddit.com/user/{subreddit}/submitted/top.json"
else: else:
url = f"https://www.reddit.com/user/{subreddit}/submitted/new.json" url = f"https://www.reddit.com/user/{subreddit}/submitted/new.json"
params = { params = {
"limit": batch_size, "limit": batch_size,
"after": after, "after": after,
"raw_json": 1, "raw_json": 1,
"t": time_filter, "t": time_filter,
} }
try: try:
response = self.session.get(url, params=params, timeout=self.timeout) response = self.session.get(url, params=params, timeout=self.timeout)
response.raise_for_status() response.raise_for_status()
logging.info("Subreddit/user posts request successful") logging.info("Subreddit/user posts request successful")
except Exception as e: except Exception as e:
logging.info("Subreddit/user posts request unsuccessful: %s", e) logging.info("Subreddit/user posts request unsuccessful: %s", e)
if response.status_code != 200: if response.status_code != 200:
print( print(
f"Failed to fetch posts for subreddit/user {subreddit}: {response.status_code}" f"Failed to fetch posts for subreddit/user {subreddit}: {response.status_code}"
) )
break break
data = response.json() data = response.json()
posts = data["data"]["children"] posts = data["data"]["children"]
if not posts: if not posts:
break break
for post in posts: for post in posts:
post_data = post["data"] post_data = post["data"]
post_info = { post_info = {
"title": post_data["title"], "title": post_data["title"],
"author": post_data["author"], "author": post_data["author"],
"permalink": post_data["permalink"], "permalink": post_data["permalink"],
"score": post_data["score"], "score": post_data["score"],
"num_comments": post_data["num_comments"], "num_comments": post_data["num_comments"],
"created_utc": post_data["created_utc"], "created_utc": post_data["created_utc"],
} }
if "selftext" in post_data: if "selftext" in post_data:
body = post_data["selftext"] body = post_data["selftext"]
if body != None and len(body)>0: if body != None and len(body)>0:
post_info["body"] = body post_info["body"] = body
if "media_metadata" in post_data: if "gallery_data" in post_data and "media_metadata" in post_data:
media_urls = [] items = post_data["gallery_data"]["items"]
for image in post_data["media_metadata"]: media_urls = []
if "m" not in post_data["media_metadata"][image]: for item in items:
continue id = item["media_id"]
content_type = post_data["media_metadata"][image]["m"] if id in post_data["media_metadata"]:
extension = content_type[content_type.find('/')+1:] metadata = post_data["media_metadata"][id]
media_urls.append("https://i.redd.it/{}.{}".format(image, extension)) m = re.search(r"redd\.it\/(.+)\?", metadata["p"][0]["u"])
post_info["media_urls"] = media_urls if m:
elif "media" in post_data and post_data["media"] is not None and "reddit_video" in post_data["media"]: media_urls.append(f"https://i.redd.it/{m.group(1)}")
media_url = post_data["media"]["reddit_video"]["fallback_url"] post_info["media_urls"] = media_urls
video_url = media_url[:media_url.find('?')] elif "media_metadata" in post_data:
audio_url = video_url[:video_url.rfind('/')] + "/CMAF_AUDIO_128.mp4" media_urls = []
post_info["media_urls"] = [video_url, audio_url] for id in post_data["media_metadata"]:
elif "url" in post_data: metadata = post_data["media_metadata"][id]
url = post_data["url"] m = re.search(r"redd\.it\/(.+)\?", metadata["p"][0]["u"])
if re.fullmatch(r"https:\/\/i\.redd\.it\/.{1,20}", url): if m:
post_info["media_urls"] = [url] media_urls.append(f"https://i.redd.it/{m.group(1)}")
elif "body" not in post_info: post_info["media_urls"] = media_urls
post_info["body"] = url elif "media" in post_data and post_data["media"] is not None and "reddit_video" in post_data["media"]:
if "thumbnail" in post_data and post_data["thumbnail"] != "self": media_url = post_data["media"]["reddit_video"]["fallback_url"]
post_info["thumbnail_url"] = post_data["thumbnail"] video_url = media_url[:media_url.find('?')]
audio_url = video_url[:video_url.rfind('/')] + "/CMAF_AUDIO_128.mp4"
all_posts.append(post_info) post_info["media_urls"] = [video_url, audio_url]
total_fetched += 1 elif "url" in post_data:
if total_fetched >= limit: url = post_data["url"]
break if re.fullmatch(r"https:\/\/i\.redd\.it\/.{1,20}", url):
post_info["media_urls"] = [url]
after = data["data"].get("after") elif "body" not in post_info:
if not after: post_info["body"] = url
break if "thumbnail" in post_data and post_data["thumbnail"] != "self":
post_info["thumbnail_url"] = post_data["thumbnail"]
time.sleep(random.uniform(1, 2))
logging.info("Sleeping for random time") all_posts.append(post_info)
total_fetched += 1
logging.info("Successfully fetched subreddit posts for %s", subreddit) if total_fetched >= limit:
return all_posts break
after = data["data"].get("after")
if not after:
break
time.sleep(random.uniform(1, 2))
logging.info("Sleeping for random time")
logging.info("Successfully fetched subreddit posts for %s", subreddit)
return all_posts

View File

@ -19,6 +19,7 @@ subreddits = [
("HomeServer", 100), ("HomeServer", 100),
("homelab", 100), ("homelab", 100),
("NonPoliticalTwitter", 100), ("NonPoliticalTwitter", 100),
("comics", 100),
("all", 1000) ("all", 1000)
] ]
max_age_days = 30 max_age_days = 30