Refactor data fetching logic using stdlib

Removed dependencies on external libraries such as `requests`, `requests-html`, and `playwright` in favor of Python's standard libraries like `urllib` for HTTP requests and parsing functionality. A more robust and efficient data update function was introduced to scrape API keys and fetch project data using Typesense. Transitioned from a manual browser-based scraping approach to an API-based one for improved stability and performance. Added logging for better monitoring and debuggability. Error-handling now leverages `HTTPError` from `urllib.error`. Shifted the configuration of debug mode to rely on an environment variable, aligning with Twelve-Factor principles. Removed unused functions and streamlined the handling of various routes within the Flask app. This change shifts the project towards a more maintainable code base by using built-in libraries, reduces external dependencies, and improves resilience and scalability of the web scraping components.
This commit is contained in:
Kumi 2024-01-16 17:13:59 +01:00
parent d269a9992e
commit 29592de90a
Signed by: kumi
GPG key ID: ECBCC9082395383F
2 changed files with 201 additions and 173 deletions

359
main.py
View file

@ -9,90 +9,121 @@ from flask import (
stream_with_context,
)
import requests
import re
from bs4 import BeautifulSoup
from urllib.parse import quote, unquote
from urllib.request import Request, urlopen
from urllib.error import HTTPError
from traceback import print_exc
from requests_html import HTMLSession
from playwright.sync_api import sync_playwright
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse
from argparse import ArgumentParser
from configparser import ConfigParser
from werkzeug.exceptions import BadRequest, abort, InternalServerError, NotFound
from bs4 import BeautifulSoup
import os
import json
import re
import logging
logging.basicConfig(level=logging.DEBUG)
global_ibles = {}
def proxy(src):
return "/proxy/?url=" + quote(str(src))
def proxy(url):
logging.debug(f"Generating proxy URL for {url}")
return f"/proxy/?url={url}"
def get_typesense_api_key():
logging.debug("Getting Typesense API key...")
data = urlopen("https://www.instructables.com/")
soup = BeautifulSoup(data.read().decode(), "html.parser")
scripts = soup.select("script")
for script in scripts:
if "typesense" in script.text and (
matches := re.search(r'"typesenseApiKey":\s?"(.*?)"', script.text)
):
api_key = matches.group(1)
logging.debug(f"Identified Typesense API key as {api_key}")
return api_key
logging.error("Failed to get Typesense API key")
TYPESENSE_API_KEY = get_typesense_api_key()
def projects_search(
query="*",
category="",
channel="",
filter_by="featureFlag:=true",
page=1,
per_page=50,
):
if category:
if filter_by:
filter_by += " && "
filter_by += f"category:={category}"
if channel:
if filter_by:
filter_by += " && "
filter_by += f"channel:={channel}"
query = quote(query)
filter_by = quote(filter_by)
logging.debug(f"Searching projects with query {query} and filter {filter_by}")
projects_headers = {"x-typesense-api-key": TYPESENSE_API_KEY}
projects_request = Request(
f"https://www.instructables.com/api_proxy/search/collections/projects/documents/search?q={query}&query_by=title,stepBody,screenName&page={page}&sort_by=publishDate:desc&include_fields=title,urlString,coverImageUrl,screenName,favorites,views,primaryClassification,featureFlag,prizeLevel,IMadeItCount&filter_by={filter_by}&per_page={per_page}",
headers=projects_headers,
)
projects_data = urlopen(projects_request)
project_obj = json.loads(projects_data.read().decode())
project_ibles = project_obj["hits"]
logging.debug(f"Got {len(project_ibles)} projects")
return project_ibles
def update_data():
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=True)
page = browser.new_page()
logging.debug("Updating data...")
channels = []
data = requests.get(f"https://www.instructables.com/sitemap/")
soup = BeautifulSoup(data.text, "html.parser")
main = soup.select("div.sitemap-content")[0]
sitemap_data = urlopen("https://www.instructables.com/sitemap/")
sitemap_soup = BeautifulSoup(sitemap_data.read().decode(), "html.parser")
main = sitemap_soup.select("div.sitemap-content")[0]
groups = []
for group in main.select("div.group-section"):
channels.append(group.select("h2 a")[0].text.lower())
global_ibles["/projects"] = []
page.goto("https://www.instructables.com/projects")
project_ibles = projects_search()
while len(global_ibles["/projects"]) <= 0:
for ible in page.query_selector_all(".ibleCard__QPJVm"):
link = (
ible.query_selector("a")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
img = proxy(ible.query_selector("img").get_attribute("src"))
for ible in project_ibles:
link = f"/{ible['document']['urlString']}"
img = proxy(ible["document"]["coverImageUrl"])
title = ible.query_selector(".title__t0fGQ").inner_text()
author = ible.query_selector("a[href^='/member/']").inner_text()
author_link = (
ible.query_selector("a[href^='/member/']")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
title = ible["document"]["title"]
author = ible["document"]["screenName"]
author_link = f"/member/{author}"
for c in channels:
try:
channel = ible.query_selector("a[href^='/" + c + "']").inner_text()
channel_link = (
ible.query_selector("a[href^='/" + c + "']")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
except:
try:
channel = ible.query_selector("a[href^='/projects/']").inner_text()
channel_link = (
ible.query_selector("a[href^='/projects/']")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
except:
pass
channel = ible["document"]["primaryClassification"]
channel_link = f"/channel/{channel}"
stats = ible.query_selector(".stats__GFKyl")
views = 0
if stats.query_selector("div[title$=' views']"):
views = stats.query_selector("div[title$=' views']").inner_text()
favorites = 0
if stats.query_selector("div[title$=' favorites']"):
favorites = stats.query_selector("div[title$=' favorites']").inner_text()
views = ible["document"]["views"]
favorites = ible["document"]["favorites"]
global_ibles["/projects"].append(
[
@ -108,10 +139,8 @@ def update_data():
]
)
browser.close()
playwright.stop()
debugmode = False
debugmode = os.environ.get("FLASK_DEBUG", False)
if __name__ == "__main__":
parser = ArgumentParser()
@ -147,14 +176,16 @@ print("Started!")
app = Flask(__name__, template_folder="templates", static_folder="static")
def get_instance_root_url(request):
return request.url_root
if debugmode:
app.logger.setLevel(logging.DEBUG)
@app.route("/cron/")
def cron():
update_data()
return "OK"
def explore_lists(soup):
list_ = []
for ible in soup.select(".home-content-explore-ible"):
@ -199,8 +230,6 @@ def member_header(header):
profile_top = header.select("div.profile-top")[0]
print(header.encode_contents())
# stats_text = profile_top.select("div.profile-header-stats")[0]
# stats_num = header.select("div.profile-top div.profile-header-stats")[1]
@ -260,11 +289,12 @@ def member_header(header):
def category_page(path, name, teachers=False):
data = requests.get("https://www.instructables.com" + path)
if data.status_code != 200:
abort(data.status_code)
try:
data = urlopen("https://www.instructables.com" + path)
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
channels = []
for card in soup.select("div.scrollable-cards-inner div.scrollable-card"):
@ -328,70 +358,39 @@ def category_page(path, name, teachers=False):
def project_list(path, head, sort=""):
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=True)
page = browser.new_page()
page.goto(urljoin("https://www.instructables.com", path))
head = f"{head + ' ' if head != '' else ''}Projects" + sort
path_ = path.rsplit("/", 1)[0]
path = urlparse(path).path
if path == "/projects/" or path == "/projects":
if path in ("/projects/", "/projects"):
ibles = global_ibles["/projects"]
else:
if not "projects" in path.split("/"):
abort(404)
ibles = []
for ible in page.query_selector_all(".ibleCard__QPJVm"):
link = (
ible.query_selector("a")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
img = proxy(
ible.find_elements(By.CSS_SELECTOR, "img")[0].get_attribute("src")
)
parts = path.split("/")
title = ible.find_elements(By.CLASS_NAME, "title__t0fGQ")[0].text
author = ible.find_elements(By.CSS_SELECTOR, "a[href^='/member/']")[0].text
author_link = (
ible.find_elements(By.CSS_SELECTOR, "a[href^='/member/']")[0]
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
category = parts[1]
channel = "" if parts[2] == "projects" else parts[2]
channel = "TEST"
channel_link = "TEST"
# TODO: Add pagination, popular, etc.
for c in channels:
try:
channel = ible.query_selector("a[href^='/" + c + "']").inner_text()
channel_link = (
ible.query_selector("a[href^='/" + c + "']")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
except:
try:
channel = ible.query_selector("a[href^='/projects/'] span").inner_text()
channel_link = (
ible.query_selector("a[href^='/projects/']")
.get_attribute("href")
.replace("https://www.instructables.com", "{instance_root_url}")
)
except:
pass
project_ibles = projects_search(category=category, channel=channel)
stats = ible.query_selector(".stats__GFKyl")
views = 0
for ible in project_ibles:
link = f"/{ible['document']['urlString']}"
img = proxy(ible["document"]["coverImageUrl"])
if stats.query_selector("div[title$=' views']"):
views = stats.query_selector("div[title$=' views']").inner_text()
title = ible["document"]["title"]
author = ible["document"]["screenName"]
author_link = f"/member/{author}"
favorites = 0
channel = ible["document"]["primaryClassification"]
channel_link = f"/channel/{channel}"
if stats.query_selector("div[title$=' favorites']"):
favorites = stats.query_selector("div[title$=' favorites']").inner_text()
views = ible["document"]["views"]
favorites = ible["document"]["favorites"]
ibles.append(
[
@ -410,23 +409,26 @@ def project_list(path, head, sort=""):
if len(ibles) >= 8:
break
browser.close()
playwright.stop()
return render_template("projects.html", data=[head, ibles, path])
return render_template("projects.html", data=[head, ibles, path_])
@app.route("/sitemap/")
def route_sitemap():
data = requests.get(f"https://www.instructables.com/sitemap/")
if data.status_code != 200:
abort(data.status_code)
@app.route("/sitemap/<path:path>")
def route_sitemap(path=""):
try:
data = urlopen("https://www.instructables.com/sitemap/" + path)
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
main = soup.select("div.sitemap-content")[0]
group_section = main.select("div.group-section")
if group_section:
groups = []
for group in main.select("div.group-section"):
for group in group_section:
category = group.select("h2 a")[0].text
category_link = group.select("h2 a")[0].get("href")
channels = []
@ -436,6 +438,15 @@ def route_sitemap():
channels.append([channel, channel_link])
groups.append([category, category_link, channels])
else:
groups = []
channels = []
for li in main.select("ul.sitemap-listing li"):
channel = li.a.text
channel_link = li.a["href"]
channels.append([channel, channel_link])
groups.append(["", "", channels])
return render_template("sitemap.html", data=groups)
@ -444,11 +455,13 @@ def route_contest_archive():
page = 1
if request.args.get("page") != None:
page = request.args.get("page")
data = requests.get(f"https://www.instructables.com/contest/archive/?page={page}")
if data.status_code != 200:
abort(data.status_code)
soup = BeautifulSoup(data.text, "html.parser")
try:
data = urlopen(f"https://www.instructables.com/contest/archive/?page={page}")
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.read().decode(), "html.parser")
main = soup.select("div#contest-archive-wrapper")[0]
@ -481,11 +494,12 @@ def route_contest_archive():
@app.route("/contest/<contest>/")
def route_contest(contest):
data = requests.get(f"https://www.instructables.com/contest/{contest}/")
if data.status_code != 200:
abort(data.status_code)
try:
data = urlopen(f"https://www.instructables.com/contest/{contest}/")
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
title = soup.select('meta[property="og:title"]')[0].get("content")
@ -500,7 +514,7 @@ def route_contest(contest):
info.select("div#site-announcements-page")[0].decompose()
info.select("h3")[0].decompose()
info.select("div#contest-body-nav")[0].decompose()
info = str(info).replace("https://www.instructables.com", "{instance_root_url}")
info = str(info).replace("https://www.instructables.com", "/")
entries = body.select("span.contest-entity-count")[0].text
@ -535,11 +549,12 @@ def route_contest(contest):
@app.route("/contest/")
def route_contests():
data = requests.get("https://www.instructables.com/contest/")
if data.status_code != 200:
abort(data.status_code)
try:
data = urlopen("https://www.instructables.com/contest/")
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
contest_count = str(soup.select("p.contest-count")[0])
@ -660,11 +675,12 @@ def route_sitemap_circuits(category, subcategory):
@app.route("/member/<member>/instructables/")
def route_member_instructables(member):
data = requests.get(f"https://www.instructables.com/member/{member}/instructables")
if data.status_code != 200:
abort(data.status_code)
try:
data = urlopen(f"https://www.instructables.com/member/{member}/instructables/")
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
header = soup.select(".profile-header.profile-header-social")[0]
header_content = member_header(header)
@ -697,13 +713,16 @@ def route_member(member):
"User-Agent": "Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/113.0"
}
data = requests.get(
request = Request(
f"https://www.instructables.com/member/{member}/", headers=headers
)
if data.status_code != 200:
abort(data.status_code)
soup = BeautifulSoup(data.text, "html.parser")
try:
data = urlopen(request)
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.read().decode(), "html.parser")
header_content = member_header(soup)
@ -753,11 +772,12 @@ def route_member(member):
@app.route("/<article>/")
def route_article(article):
data = requests.get(f"https://www.instructables.com/{article}/")
if data.status_code != 200:
abort(data.status_code)
try:
data = urlopen(f"https://www.instructables.com/{article}/")
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
try:
header = soup.select("header")
@ -800,9 +820,7 @@ def route_article(article):
step_text = str(step.select("div.step-body")[0])
step_text = step_text.replace(
"https://content.instructables.com",
"{instance_root_url}/proxy/?url=https://content.instructables.com".format(
instance_root_url=get_instance_root_url(request)
),
"/proxy/?url=https://content.instructables.com",
)
steps.append([step_title, step_imgs, step_text, step_videos])
@ -941,6 +959,7 @@ def route_article(article):
@app.route("/<category>/<channel>/")
def route_channel_redirect(category, channel):
# TODO: Just check if the channel exists
if (
category == "circuits"
or category == "workshop"
@ -957,11 +976,12 @@ def route_channel_redirect(category, channel):
@app.route("/")
def route_explore():
data = requests.get("https://www.instructables.com/")
if data.status_code != 200:
abort(data.status_code)
try:
data = urlopen("https://www.instructables.com/")
except HTTPError as e:
abort(e.code)
soup = BeautifulSoup(data.text, "html.parser")
soup = BeautifulSoup(data.read().decode(), "html.parser")
explore = soup.select(".home-content-explore-wrap")[0]
@ -994,32 +1014,43 @@ def route_proxy():
if url.startswith("https://cdn.instructables.com/") or url.startswith(
"https://content.instructables.com/"
):
data = requests.get(unquote(url))
return Response(data.content, content_type=data.headers["content-type"])
try:
data = urlopen(unquote(url))
except HTTPError as e:
abort(e.code)
return Response(data.read(), content_type=data.headers["content-type"])
else:
raise BadRequest()
else:
raise BadRequest()
@app.route("/privacypolicy/")
def privacypolicy():
# TODO: Make this dynamic
return render_template("privacypolicy.html")
@app.errorhandler(404)
def not_found(e):
return render_template("404.html")
@app.errorhandler(400)
def bad_request(e):
return render_template("400.html")
@app.errorhandler(429)
def too_many_requests(e):
return render_template("429.html")
@app.errorhandler(500)
def internal_server_error(e):
return render_template("500.html")
if __name__ == '__main__':
if __name__ == "__main__":
app.run(port=args.port, host=args.listen_host, debug=debugmode)

View file

@ -1,5 +1,2 @@
bs4
requests
flask
requests-html
playwright