From b7d80002a11ce6aefa3e0dba93c71e502240c84c Mon Sep 17 00:00:00 2001 From: John Stephani Date: Fri, 30 Jan 2026 19:42:40 -0600 Subject: [PATCH] Properly escape post content that contains HTML tags; fix issue with scraping when metadata is in an unanticipated form --- app/YARS/src/yars/yars.py | 550 +++++++++++++++++++------------------- app/app.py | 2 + app/delete_posts.py | 2 +- 3 files changed, 280 insertions(+), 274 deletions(-) diff --git a/app/YARS/src/yars/yars.py b/app/YARS/src/yars/yars.py index aff744e..6e24423 100644 --- a/app/YARS/src/yars/yars.py +++ b/app/YARS/src/yars/yars.py @@ -9,312 +9,316 @@ from urllib3.util.retry import Retry from requests.adapters import HTTPAdapter logger = logging.basicConfig( - filename="YARS.log", - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", + filename="YARS.log", + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", ) class YARS: - __slots__ = ("headers", "session", "proxy", "timeout") + __slots__ = ("headers", "session", "proxy", "timeout") - def __init__(self, proxy=None, timeout=10, random_user_agent=True): - self.session = RandomUserAgentSession() if random_user_agent else requests.Session() - self.proxy = proxy - self.timeout = timeout + def __init__(self, proxy=None, timeout=10, random_user_agent=True): + self.session = RandomUserAgentSession() if random_user_agent else requests.Session() + self.proxy = proxy + self.timeout = timeout - retries = Retry( - total=5, - backoff_factor=2, # Exponential backoff - status_forcelist=[429, 500, 502, 503, 504], - ) + retries = Retry( + total=5, + backoff_factor=2, # Exponential backoff + status_forcelist=[429, 500, 502, 503, 504], + ) - self.session.mount("https://", HTTPAdapter(max_retries=retries)) + self.session.mount("https://", HTTPAdapter(max_retries=retries)) - if proxy: - self.session.proxies.update({"http": proxy, "https": proxy}) - def handle_search(self,url, params, after=None, before=None): - if after: - params["after"] = after - if before: - params["before"] = before + if proxy: + self.session.proxies.update({"http": proxy, "https": proxy}) + def handle_search(self,url, params, after=None, before=None): + if after: + params["after"] = after + if before: + params["before"] = before - try: - response = self.session.get(url, params=params, timeout=self.timeout) - response.raise_for_status() - logging.info("Search request successful") - except Exception as e: - if response.status_code != 200: - logging.info("Search request unsuccessful due to: %s", e) - print(f"Failed to fetch search results: {response.status_code}") - return [] + try: + response = self.session.get(url, params=params, timeout=self.timeout) + response.raise_for_status() + logging.info("Search request successful") + except Exception as e: + if response.status_code != 200: + logging.info("Search request unsuccessful due to: %s", e) + print(f"Failed to fetch search results: {response.status_code}") + return [] - data = response.json() - results = [] - for post in data["data"]["children"]: - post_data = post["data"] - results.append( - { - "title": post_data["title"], - "link": f"https://www.reddit.com{post_data['permalink']}", - "description": post_data.get("selftext", "")[:269], - } - ) - logging.info("Search Results Retrned %d Results", len(results)) - return results - def search_reddit(self, query, limit=10, after=None, before=None): - url = "https://www.reddit.com/search.json" - params = {"q": query, "limit": limit, "sort": "relevance", "type": "link"} - return self.handle_search(url, params, after, before) - def search_subreddit(self, subreddit, query, limit=10, after=None, before=None, sort="relevance"): - url = f"https://www.reddit.com/r/{subreddit}/search.json" - params = {"q": query, "limit": limit, "sort": "relevance", "type": "link","restrict_sr":"on"} - return self.handle_search(url, params, after, before) + data = response.json() + results = [] + for post in data["data"]["children"]: + post_data = post["data"] + results.append( + { + "title": post_data["title"], + "link": f"https://www.reddit.com{post_data['permalink']}", + "description": post_data.get("selftext", "")[:269], + } + ) + logging.info("Search Results Retrned %d Results", len(results)) + return results + def search_reddit(self, query, limit=10, after=None, before=None): + url = "https://www.reddit.com/search.json" + params = {"q": query, "limit": limit, "sort": "relevance", "type": "link"} + return self.handle_search(url, params, after, before) + def search_subreddit(self, subreddit, query, limit=10, after=None, before=None, sort="relevance"): + url = f"https://www.reddit.com/r/{subreddit}/search.json" + params = {"q": query, "limit": limit, "sort": "relevance", "type": "link","restrict_sr":"on"} + return self.handle_search(url, params, after, before) - def scrape_post_details(self, permalink): - url = f"https://www.reddit.com{permalink}.json" + def scrape_post_details(self, permalink): + url = f"https://www.reddit.com{permalink}.json" - try: - response = self.session.get(url, timeout=self.timeout) - response.raise_for_status() - logging.info("Post details request successful : %s", url) - except Exception as e: - logging.info("Post details request unsccessful: %e", e) - if response.status_code != 200: - print(f"Failed to fetch post data: {response.status_code}") - return None + try: + response = self.session.get(url, timeout=self.timeout) + response.raise_for_status() + logging.info("Post details request successful : %s", url) + except Exception as e: + logging.info("Post details request unsccessful: %e", e) + if response.status_code != 200: + print(f"Failed to fetch post data: {response.status_code}") + return None - post_data = response.json() - if not isinstance(post_data, list) or len(post_data) < 2: - logging.info("Unexpected post data structre") - print("Unexpected post data structure") - return None + post_data = response.json() + if not isinstance(post_data, list) or len(post_data) < 2: + logging.info("Unexpected post data structre") + print("Unexpected post data structure") + return None - main_post = post_data[0]["data"]["children"][0]["data"] - title = main_post["title"] - body = main_post.get("selftext", "") + main_post = post_data[0]["data"]["children"][0]["data"] + title = main_post["title"] + body = main_post.get("selftext", "") - comments = self._extract_comments(post_data[1]["data"]["children"]) - logging.info("Successfully scraped post: %s", title) - return {"title": title, "body": body, "comments": comments} + comments = self._extract_comments(post_data[1]["data"]["children"]) + logging.info("Successfully scraped post: %s", title) + return {"title": title, "body": body, "comments": comments} - def _extract_comments(self, comments): - logging.info("Extracting comments") - extracted_comments = [] - for comment in comments: - if isinstance(comment, dict) and comment.get("kind") == "t1": - comment_data = comment.get("data", {}) - extracted_comment = { - "author": comment_data.get("author", ""), - "body": comment_data.get("body", ""), - "score": comment_data.get("score",""), - "replies": [], - } + def _extract_comments(self, comments): + logging.info("Extracting comments") + extracted_comments = [] + for comment in comments: + if isinstance(comment, dict) and comment.get("kind") == "t1": + comment_data = comment.get("data", {}) + extracted_comment = { + "author": comment_data.get("author", ""), + "body": comment_data.get("body", ""), + "score": comment_data.get("score",""), + "replies": [], + } - replies = comment_data.get("replies", "") - if isinstance(replies, dict): - extracted_comment["replies"] = self._extract_comments( - replies.get("data", {}).get("children", []) - ) - extracted_comments.append(extracted_comment) - logging.info("Successfully extracted comments") - return extracted_comments + replies = comment_data.get("replies", "") + if isinstance(replies, dict): + extracted_comment["replies"] = self._extract_comments( + replies.get("data", {}).get("children", []) + ) + extracted_comments.append(extracted_comment) + logging.info("Successfully extracted comments") + return extracted_comments - def scrape_user_data(self, username, limit=10): - logging.info("Scraping user data for %s, limit: %d", username, limit) - base_url = f"https://www.reddit.com/user/{username}/.json" - params = {"limit": limit, "after": None} - all_items = [] - count = 0 + def scrape_user_data(self, username, limit=10): + logging.info("Scraping user data for %s, limit: %d", username, limit) + base_url = f"https://www.reddit.com/user/{username}/.json" + params = {"limit": limit, "after": None} + all_items = [] + count = 0 - while count < limit: - try: - response = self.session.get( - base_url, params=params, timeout=self.timeout - ) - response.raise_for_status() - logging.info("User data request successful") - except Exception as e: - logging.info("User data request unsuccessful: %s", e) - if response.status_code != 200: - print( - f"Failed to fetch data for user {username}: {response.status_code}" - ) - break - try: - data = response.json() - except ValueError: - print(f"Failed to parse JSON response for user {username}.") - break + while count < limit: + try: + response = self.session.get( + base_url, params=params, timeout=self.timeout + ) + response.raise_for_status() + logging.info("User data request successful") + except Exception as e: + logging.info("User data request unsuccessful: %s", e) + if response.status_code != 200: + print( + f"Failed to fetch data for user {username}: {response.status_code}" + ) + break + try: + data = response.json() + except ValueError: + print(f"Failed to parse JSON response for user {username}.") + break - if "data" not in data or "children" not in data["data"]: - print( - f"No 'data' or 'children' field found in response for user {username}." - ) - logging.info("No 'data' or 'children' field found in response") - break + if "data" not in data or "children" not in data["data"]: + print( + f"No 'data' or 'children' field found in response for user {username}." + ) + logging.info("No 'data' or 'children' field found in response") + break - items = data["data"]["children"] - if not items: - print(f"No more items found for user {username}.") - logging.info("No more items found for user") - break + items = data["data"]["children"] + if not items: + print(f"No more items found for user {username}.") + logging.info("No more items found for user") + break - for item in items: - kind = item["kind"] - item_data = item["data"] - if kind == "t3": - post_url = f"https://www.reddit.com{item_data.get('permalink', '')}" - all_items.append( - { - "type": "post", - "title": item_data.get("title", ""), - "subreddit": item_data.get("subreddit", ""), - "url": post_url, - "created_utc": item_data.get("created_utc", ""), - } - ) - elif kind == "t1": - comment_url = ( - f"https://www.reddit.com{item_data.get('permalink', '')}" - ) - all_items.append( - { - "type": "comment", - "subreddit": item_data.get("subreddit", ""), - "body": item_data.get("body", ""), - "created_utc": item_data.get("created_utc", ""), - "url": comment_url, - } - ) - count += 1 - if count >= limit: - break + for item in items: + kind = item["kind"] + item_data = item["data"] + if kind == "t3": + post_url = f"https://www.reddit.com{item_data.get('permalink', '')}" + all_items.append( + { + "type": "post", + "title": item_data.get("title", ""), + "subreddit": item_data.get("subreddit", ""), + "url": post_url, + "created_utc": item_data.get("created_utc", ""), + } + ) + elif kind == "t1": + comment_url = ( + f"https://www.reddit.com{item_data.get('permalink', '')}" + ) + all_items.append( + { + "type": "comment", + "subreddit": item_data.get("subreddit", ""), + "body": item_data.get("body", ""), + "created_utc": item_data.get("created_utc", ""), + "url": comment_url, + } + ) + count += 1 + if count >= limit: + break - params["after"] = data["data"].get("after") - if not params["after"]: - break + params["after"] = data["data"].get("after") + if not params["after"]: + break - time.sleep(random.uniform(1, 2)) - logging.info("Sleeping for random time") + time.sleep(random.uniform(1, 2)) + logging.info("Sleeping for random time") - logging.info("Successfully scraped user data for %s", username) - return all_items + logging.info("Successfully scraped user data for %s", username) + return all_items - def fetch_subreddit_posts( - self, subreddit, limit=10, category="hot", time_filter="all" - ): - logging.info( - "Fetching subreddit/user posts for %s, limit: %d, category: %s, time_filter: %s", - subreddit, - limit, - category, - time_filter, - ) - if category not in ["hot", "top", "new", "userhot", "usertop", "usernew"]: - raise ValueError("Category for Subredit must be either 'hot', 'top', or 'new' or for User must be 'userhot', 'usertop', or 'usernew'") + def fetch_subreddit_posts( + self, subreddit, limit=10, category="hot", time_filter="all" + ): + logging.info( + "Fetching subreddit/user posts for %s, limit: %d, category: %s, time_filter: %s", + subreddit, + limit, + category, + time_filter, + ) + if category not in ["hot", "top", "new", "userhot", "usertop", "usernew"]: + raise ValueError("Category for Subredit must be either 'hot', 'top', or 'new' or for User must be 'userhot', 'usertop', or 'usernew'") - batch_size = min(100, limit) - total_fetched = 0 - after = None - all_posts = [] + batch_size = min(100, limit) + total_fetched = 0 + after = None + all_posts = [] - while total_fetched < limit: - if category == "hot": - url = f"https://www.reddit.com/r/{subreddit}/hot.json" - elif category == "top": - url = f"https://www.reddit.com/r/{subreddit}/top.json" - elif category == "new": - url = f"https://www.reddit.com/r/{subreddit}/new.json" - elif category == "userhot": - url = f"https://www.reddit.com/user/{subreddit}/submitted/hot.json" - elif category == "usertop": - url = f"https://www.reddit.com/user/{subreddit}/submitted/top.json" - else: - url = f"https://www.reddit.com/user/{subreddit}/submitted/new.json" + while total_fetched < limit: + if category == "hot": + url = f"https://www.reddit.com/r/{subreddit}/hot.json" + elif category == "top": + url = f"https://www.reddit.com/r/{subreddit}/top.json" + elif category == "new": + url = f"https://www.reddit.com/r/{subreddit}/new.json" + elif category == "userhot": + url = f"https://www.reddit.com/user/{subreddit}/submitted/hot.json" + elif category == "usertop": + url = f"https://www.reddit.com/user/{subreddit}/submitted/top.json" + else: + url = f"https://www.reddit.com/user/{subreddit}/submitted/new.json" - params = { - "limit": batch_size, - "after": after, - "raw_json": 1, - "t": time_filter, - } - try: - response = self.session.get(url, params=params, timeout=self.timeout) - response.raise_for_status() - logging.info("Subreddit/user posts request successful") - except Exception as e: - logging.info("Subreddit/user posts request unsuccessful: %s", e) - if response.status_code != 200: - print( - f"Failed to fetch posts for subreddit/user {subreddit}: {response.status_code}" - ) - break + params = { + "limit": batch_size, + "after": after, + "raw_json": 1, + "t": time_filter, + } + try: + response = self.session.get(url, params=params, timeout=self.timeout) + response.raise_for_status() + logging.info("Subreddit/user posts request successful") + except Exception as e: + logging.info("Subreddit/user posts request unsuccessful: %s", e) + if response.status_code != 200: + print( + f"Failed to fetch posts for subreddit/user {subreddit}: {response.status_code}" + ) + break - data = response.json() - posts = data["data"]["children"] - if not posts: - break + data = response.json() + posts = data["data"]["children"] + if not posts: + break - for post in posts: - post_data = post["data"] - post_info = { - "title": post_data["title"], + for post in posts: + post_data = post["data"] + post_info = { + "title": post_data["title"], "subreddit": post_data["subreddit"], - "author": post_data["author"], - "permalink": post_data["permalink"], - "score": post_data["score"], - "num_comments": post_data["num_comments"], - "created_utc": post_data["created_utc"], - } - if "selftext" in post_data: - body = post_data["selftext"] - if body != None and len(body)>0: - post_info["body"] = body - if "gallery_data" in post_data and "media_metadata" in post_data: - items = post_data["gallery_data"]["items"] - media_urls = [] - for item in items: - id = item["media_id"] - if id in post_data["media_metadata"]: - metadata = post_data["media_metadata"][id] - m = re.search(r"redd\.it\/(.+)\?", metadata["p"][0]["u"]) - if m: - media_urls.append(f"https://i.redd.it/{m.group(1)}") - post_info["media_urls"] = media_urls - elif "media_metadata" in post_data: - media_urls = [] - for id in post_data["media_metadata"]: - metadata = post_data["media_metadata"][id] - m = re.search(r"redd\.it\/(.+)\?", metadata["p"][0]["u"]) - if m: - media_urls.append(f"https://i.redd.it/{m.group(1)}") - post_info["media_urls"] = media_urls - elif "media" in post_data and post_data["media"] is not None and "reddit_video" in post_data["media"]: - media_url = post_data["media"]["reddit_video"]["fallback_url"] - video_url = media_url[:media_url.find('?')] - audio_url = video_url[:video_url.rfind('/')] + "/CMAF_AUDIO_128.mp4" - post_info["media_urls"] = [video_url, audio_url] - elif "url" in post_data: - url = post_data["url"] - if re.fullmatch(r"https:\/\/i\.redd\.it\/.{1,20}", url): - post_info["media_urls"] = [url] - elif "body" not in post_info: - post_info["body"] = url - if "thumbnail" in post_data and post_data["thumbnail"] != "self": - post_info["thumbnail_url"] = post_data["thumbnail"] + "author": post_data["author"], + "permalink": post_data["permalink"], + "score": post_data["score"], + "num_comments": post_data["num_comments"], + "created_utc": post_data["created_utc"], + } + if "selftext" in post_data: + body = post_data["selftext"] + if body != None and len(body)>0: + post_info["body"] = body + if "gallery_data" in post_data and "media_metadata" in post_data: + items = post_data["gallery_data"]["items"] + media_urls = [] + for item in items: + id = item["media_id"] + if id in post_data["media_metadata"]: + metadata = post_data["media_metadata"][id] + if "p" not in metadata: + continue + m = re.search(r"redd\.it\/(.+)\?", metadata["p"][0]["u"]) + if m: + media_urls.append(f"https://i.redd.it/{m.group(1)}") + post_info["media_urls"] = media_urls + elif "media_metadata" in post_data: + media_urls = [] + for id in post_data["media_metadata"]: + metadata = post_data["media_metadata"][id] + if "p" not in metadata: + continue + m = re.search(r"redd\.it\/(.+)\?", metadata["p"][0]["u"]) + if m: + media_urls.append(f"https://i.redd.it/{m.group(1)}") + post_info["media_urls"] = media_urls + elif "media" in post_data and post_data["media"] is not None and "reddit_video" in post_data["media"]: + media_url = post_data["media"]["reddit_video"]["fallback_url"] + video_url = media_url[:media_url.find('?')] + audio_url = video_url[:video_url.rfind('/')] + "/CMAF_AUDIO_128.mp4" + post_info["media_urls"] = [video_url, audio_url] + elif "url" in post_data: + url = post_data["url"] + if re.fullmatch(r"https:\/\/i\.redd\.it\/.{1,20}", url): + post_info["media_urls"] = [url] + elif "body" not in post_info: + post_info["body"] = url + if "thumbnail" in post_data and post_data["thumbnail"] != "self": + post_info["thumbnail_url"] = post_data["thumbnail"] - all_posts.append(post_info) - total_fetched += 1 - if total_fetched >= limit: - break + all_posts.append(post_info) + total_fetched += 1 + if total_fetched >= limit: + break - after = data["data"].get("after") - if not after: - break + after = data["data"].get("after") + if not after: + break - time.sleep(random.uniform(1, 2)) - logging.info("Sleeping for random time") + time.sleep(random.uniform(1, 2)) + logging.info("Sleeping for random time") - logging.info("Successfully fetched subreddit posts for %s", subreddit) - return all_posts + logging.info("Successfully fetched subreddit posts for %s", subreddit) + return all_posts diff --git a/app/app.py b/app/app.py index 6e1cfd4..edfb802 100755 --- a/app/app.py +++ b/app/app.py @@ -4,6 +4,7 @@ from urllib.parse import urlparse import delete_posts import config import json +import html import re import sqlite3 import subprocess @@ -415,6 +416,7 @@ def add_age_to_posts(posts): def reformat_body(posts): for post in posts: if "body" in post and post["body"] is not None: + post["body"] = html.escape(post["body"]) post["body"] = post["body"].rstrip().replace("\n", "
") post["body"] = re.sub(r"\[(.*?)\]\((.*?)\)", r'\1', post["body"]) diff --git a/app/delete_posts.py b/app/delete_posts.py index b25a94c..8962c8d 100644 --- a/app/delete_posts.py +++ b/app/delete_posts.py @@ -36,7 +36,7 @@ def run(): print("Deleting old media db rows") delete = "DELETE FROM media WHERE permalink NOT IN (SELECT permalink FROM post)" cursor.execute(delete) - print("Deleving media db for read posts") + print("Deleting media db for read posts") delete = "DELETE FROM media WHERE permalink IN (SELECT permalink FROM post WHERE hidden = ?)" binds = [True] cursor.execute(delete, binds)