Files
c3dbdl/c3dbdl/c3dbdl.py
2023-04-06 20:12:04 -04:00

630 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
# c3dbdl - Customs Creators Collective archive tool
#
# Copyright (C) 2023 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import click
import requests
import re
import json
import os
from time import sleep
from difflib import unified_diff
from colorama import Fore
from bs4 import BeautifulSoup
from urllib.error import HTTPError
from concurrent.futures import ThreadPoolExecutor, as_completed
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], max_content_width=120)
def fetchSongData(entry):
song_entry = dict()
messages = list()
for idx, td in enumerate(entry.find_all("td")):
if idx == 2:
# Artist
song_entry["artist"] = td.find("a").get_text().strip().replace("/", "+")
elif idx == 3:
# Song
song_entry["title"] = (
td.find("div", attrs={"class": "c3ttitlemargin"})
.get_text()
.strip()
.replace("/", "+")
)
song_entry["album"] = (
td.find("div", attrs={"class": "c3tartist"})
.get_text()
.strip()
.replace("/", "+")
)
# Song page
tmp_links = td.find_all("a", href=True)
for link in tmp_links:
if link.get("href"):
song_entry["song_link"] = link.get("href")
break
elif idx == 4:
# Genre
song_entry["genre"] = td.find("a").get_text().strip()
elif idx == 5:
# Year
song_entry["year"] = td.find("a").get_text().strip()
elif idx == 6:
# Length
song_entry["length"] = td.find("a").get_text().strip()
elif idx == 8:
# Author (of chart)
song_entry["author"] = td.find("a").get_text().strip().replace("/", "+")
if (
song_entry
and song_entry["author"]
and song_entry["title"]
and song_entry["song_link"]
):
messages.append(
f"> Found song entry for {song_entry['artist']} - {song_entry['title']} by {song_entry['author']}"
)
for entry_type in ["artist", "album", "genre", "year", "length"]:
if not song_entry[entry_type]:
song_entry[entry_type] = "None"
# Get download links from the actual song page
attempts = 1
sp = None
while attempts <= 3:
try:
messages.append(
f"Parsing song page {song_entry['song_link']} (attempt {attempts}/3)..."
)
sp = requests.get(song_entry["song_link"])
break
except Exception:
sleep(attempts)
attempts += 1
if sp is None or sp.status_code != 200:
messages.append("Failed to fetch song page, aborting")
return None
song_parsed_html = BeautifulSoup(sp.text, "html.parser")
download_section = song_parsed_html.find(
"div", attrs={"class": "portlet light bg-inverse"}
)
download_links = download_section.find_all("a", href=True)
dl_links = list()
for link_entry in download_links:
link = link_entry.get("href")
description = link_entry.get_text().strip()
if "c3universe.com" not in link:
continue
messages.append(f"Found download link: {link} ({description})")
dl_links.append(
{
"link": link,
"description": description,
}
)
if not dl_links:
messages.append(
"Found no c3universe.com download links for song, not adding to database"
)
return None
song_entry["dl_links"] = dl_links
# Append to the database
return messages, song_entry
def buildDatabase(pages, concurrency):
found_songs = []
if pages is None:
r = requests.get(f"{config['base_songs_url']}")
if r.status_code != 200:
return
root_page_html = BeautifulSoup(r.text, "html.parser")
pages = int(
root_page_html.body.find("a", attrs={"class": "paginationLastPage"})
.get("href")
.replace("?page=", "")
)
click.echo(f"Collecting data from {pages} pages")
# Get a list of song URIs
for i in range(1, pages + 1):
attempts = 1
p = None
while attempts <= 3:
try:
click.echo(f"Parsing page {i} (attempt {attempts}/3)...")
p = requests.get(f"{config['base_songs_url']}?page={i}")
if p is None or p.status_code != 200:
raise
parsed_html = BeautifulSoup(p.text, "html.parser")
if parsed_html.body is None:
raise
if (
parsed_html.body.find("div", attrs={"class": "portlet-body"})
is None
):
raise
break
except Exception:
sleep(attempts)
attempts += 1
table_html = parsed_html.body.find("div", attrs={"class": "portlet-body"}).find(
"tbody"
)
entries = list()
for entry in table_html.find_all("tr", attrs={"class": "odd"}):
if len(entry) < 1:
break
entries.append(entry)
click.echo("Fetching and parsing song pages...")
with ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_song = {
executor.submit(fetchSongData, entry): entry for entry in entries
}
for future in as_completed(future_to_song):
try:
messages, song = future.result()
click.echo("\n".join(messages))
if song is None:
continue
found_songs.append(song)
except Exception:
continue
return found_songs
def downloadSong(destination, filename, entry, dlid, dldesc):
click.echo(
f"""> Downloading song "{entry['artist']} - {entry['title']}" by {entry['author']}..."""
)
if dlid is None:
dl_links = entry["dl_links"]
else:
try:
dl_links = [entry["dl_links"][dlid - 1]]
except Exception:
click.echo(f"Invalid download link ID {dlid}.")
return
if dldesc is not None:
new_dl_links = list()
for entry in dl_links:
if dldesc in entry["description"]:
new_dl_links.append(entry)
dl_links = new_dl_links
if not dl_links:
click.echo(f'No download link matching description "{dldesc}" found.')
return
for dl_link in dl_links:
try:
p = requests.get(dl_link["link"])
if p.status_code != 200:
raise HTTPError(dl_link["link"], p.status_code, "", None, None)
parsed_html = BeautifulSoup(p.text, "html.parser")
download_url = (
parsed_html.body.find("div", attrs={"class": "lock-head"})
.find("a")
.get("href")
)
except Exception as e:
click.echo(f"Failed parsing or retrieving HTML link: {e}")
continue
download_filename = filename.format(
genre=entry["genre"],
artist=entry["artist"],
album=entry["album"],
title=entry["title"],
year=entry["year"],
author=entry["author"],
orig_name=download_url.split("/")[-1],
)
download_filename = f"{destination}/{download_filename}"
download_path = "/".join(f"{download_filename}".split("/")[0:-1])
click.echo(
f"""Downloading file "{dl_link['description']}" from {download_url}..."""
)
if os.path.exists(download_filename):
click.echo(f"File exists at {download_filename}")
continue
attempts = 1
p = None
try:
with requests.get(download_url, stream=True) as r:
while attempts <= 3:
try:
r.raise_for_status()
break
except Exception:
click.echo(
f"Download attempt failed: HTTP {r.status_code}; retrying {attempts}/3"
)
sleep(attempts)
attempts += 1
if r is None or r.status_code != 200:
if r:
code = r.status_code
else:
code = "-1"
raise HTTPError(download_url, code, "", None, None)
if not os.path.exists(download_path):
os.makedirs(download_path)
with open(download_filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
click.echo(f"Successfully downloaded to {download_filename}")
except Exception as e:
click.echo(f"Download attempt failed: {e}")
continue
@click.command(name="build", short_help="Build the local database.")
@click.option(
"-o",
"--overwrite",
"_overwrite",
is_flag=True,
default=False,
envvar="C3DLDB_BUILD_OVERWRITE",
help="Overwrite existing database file.",
)
@click.option(
"-p",
"--pages",
"_pages",
type=int,
default=None,
envvar="C3DBDL_BUILD_PAGES",
help="Number of pages to scan (default is all).",
)
@click.option(
"-c",
"--concurrency",
"_concurrency",
type=int,
default=10,
envvar="C3DBDL_BUILD_CONCURRENCY",
help="Number of concurrent song page downloads to perform at once.",
)
def build_database(_overwrite, _pages, _concurrency):
"""
Initialize the local JSON database of C3DB songs from the website.
\b
The following environment variables can be used for scripting purposes:
* C3DLDB_BUILD_OVERWRITE: equivalent to "--overwrite"
* C3DBDL_BUILD_PAGES: equivalent to "--pages"
"""
if os.path.exists(config["database_filename"]) and not _overwrite:
click.echo(
f"Database already exists at '{config['database_filename']}'; use '--overwrite' to rebuild."
)
exit(1)
click.echo("Building JSON database; this will take a long time...")
songs_database = buildDatabase(_pages, _concurrency)
click.echo("")
click.echo(
f"Found {len(songs_database)} songs, dumping to database file '{config['database_filename']}'"
)
if not os.path.exists(config["download_directory"]):
click.echo(f"Creating download directory '{config['download_directory']}'")
os.makedirs(config["download_directory"])
with open(config["database_filename"], "w") as fh:
json.dump(songs_database, fh, indent=2)
fh.write("\n")
@click.command(name="edit", short_help="Edit the local database in EDITOR.")
def edit_database():
"""
Edit the local JSON database of C3DB songs with your $EDITOR.
"""
if not os.path.exists(config["database_filename"]):
click.echo(
f"WARNING: Database filename '{config['database_filename']}' does not exist!"
)
click.echo(
"Ensure you build a database first with the 'database build' command."
)
exit(1)
with open(config["database_filename"], "r") as fh:
songs_database = fh.read()
new_songs_database = click.edit(
text=songs_database, require_save=True, extension=".json"
)
while True:
if new_songs_database is None:
click.echo("Aborting with no modifications")
exit(0)
click.echo("")
click.echo("Pending modifications:")
click.echo("")
diff = list(
unified_diff(
songs_database.split("\n"),
new_songs_database.split("\n"),
fromfile="current",
tofile="modified",
fromfiledate="",
tofiledate="",
n=3,
lineterm="",
)
)
for line in diff:
if re.match(r"^\+", line) is not None:
click.echo(Fore.GREEN + line + Fore.RESET)
elif re.match(r"^\-", line) is not None:
click.echo(Fore.RED + line + Fore.RESET)
elif re.match(r"^\^", line) is not None:
click.echo(Fore.BLUE + line + Fore.RESET)
else:
click.echo(line)
click.echo("")
try:
json.loads(new_songs_database)
break
except Exception:
click.echo("ERROR: Invalid JSON syntax.")
click.confirm("Continue editing?", abort=True)
new_songs_database = click.edit(
text=new_songs_database, require_save=True, extension=".json"
)
click.confirm("Write modifications to songs database?", abort=True)
with open(config["database_filename"], "w") as fh:
fh.write(new_songs_database)
@click.group(name="database", short_help="Manage the local database.")
def database():
"""
Manage the local JSON database of C3DB songs.
"""
pass
@click.command(name="download", short_help="Download files from C3DB.")
@click.option(
"-s",
"--file-structure",
"_file_structure",
envvar="C3DBDL_DL_FILE_STRUCTURE",
default="{artist}/{album}/{title}.{author}.{orig_name}",
help="Specify the output file/directory stucture.",
)
@click.option(
"-f",
"--filter",
"_filters",
envvar="C3DBDL_DL_FILTERS",
default=[],
multiple=True,
nargs=2,
help="Add a filter option.",
)
@click.option(
"-l",
"--limit",
"_limit",
envvar="C3DBDL_DL_LIMIT",
default=None,
type=int,
help="Limit to this many songs (first N matches).",
)
@click.option(
"-i",
"--download-id",
"_id",
default=None,
type=int,
help='Download only "dl_links" entry N (1 is first, etc.), or all if unspecified.',
)
@click.option(
"-d",
"--download-descr",
"_desc",
default=None,
help='Download only "dl_links" entries with this in their description (fuzzy).',
)
def download(_filters, _id, _desc, _limit, _file_structure):
"""
Download song(s) from the C3DB webpage.
\b
The output file structure can be specified as a path format with any of the following
fields included, surrounded by curly braces:
* genre: The genre of the song.
* artist: The artist of the song.
* album: The album of the song.
* title: The title of the song.
* year: The year of the album/song.
* author: The author of the file on C3DB.
* orig_name: The original filename from the website.
\b
The default output file structure is:
"{artist}/{album}/{title}.{author}.{orig_name}"
Filters allow granular selection of the song(s) to download. Multiple filters can be
specified, and a song is selected only if ALL filters match (logical AND). Each filter
is in the form "--filter [database_key] [value]".
The valid "database_key" values are identical to the output file fields above, except
for "orig_name".
\b
For example, to download all songs in the genre "Rock":
--filter genre Rock
\b
Or to download all songs by the artist "Rush" and the author "MyName":
--filter artist Rush --filter author MyName
In addition to filters, each song may have more than one download link, to provide
multiple versions of the same song (for example, normal and multitracks, or alternate
charts). For each song, the "-i"/"--download-id" and "-d"/"--download-descr" options
can help filter these out, or both can be left blank to download all possible files
for a given song. Mostly useful when being extremely restrictive with filters, less
so when downloading many songs at once.
\b
The following environment variables can be used for scripting purposes:
* C3DBDL_DL_FILE_STRUCTURE: equivalent to "--file-structure"
* C3DBDL_DL_FILTERS: equivalent to "--filter"; limited to one instance
* C3DBDL_DL_LIMIT: equivalent to "--limit"
"""
with open(config["database_filename"], "r") as fh:
all_songs = json.load(fh)
click.echo(
f"Found {len(all_songs)} songs from JSON database file '{config['database_filename']}'"
)
pending_songs = list()
for song in all_songs:
if len(_filters) < 1:
add_to_pending = True
else:
add_to_pending = all(song[_filter[0]] == _filter[1] for _filter in _filters)
if add_to_pending:
pending_songs.append(song)
if _limit is not None:
pending_songs = pending_songs[0:_limit]
click.echo(f"Downloading {len(pending_songs)} song files...")
for song in pending_songs:
downloadSong(config["download_directory"], _file_structure, song, _id, _desc)
@click.group(context_settings=CONTEXT_SETTINGS)
@click.option(
"-u",
"--base-url",
"_base_url",
envvar="C3DBDL_BASE_URL",
default="https://db.c3universe.com/songs/all",
show_default=True,
help="Base URL of the online C3DB songs page",
)
@click.option(
"-d",
"--download-directory",
"_download_directory",
envvar="C3DBDL_DOWNLOAD_DIRECTORY",
default="~/Downloads",
show_default=True,
help="Download directory for JSON database and songs",
)
@click.option(
"-j",
"--json-database",
"_json_database",
envvar="C3DBDL_JSON_DATABASE",
default="c3db.json",
show_default=True,
help="JSON database filename within download directory",
)
def cli(_base_url, _download_directory, _json_database):
"""
Customs Creators Collective archive tool
The Customs Creators Collective archive tool allows for easy scraping to a local JSON
database and downloading of files from the C3 (Customs Creators Collective) database,
a collection of custom songs for Rock Band and similar clone games.
This tool exists because the C3DB is very hard to mass download from: each song must
be found in the extensive list, selected manually, and a second link clicked through,
before a random file name is obtained. This tool simplifies the process by first collecting
information about all available songs of a particular type, and then is able to download
songs based on customizable filters (e.g. by genre, artist, author, etc.) and output them
in a standardized format.
To use the tool, first use the "database" command to build or modify your local JSON
database, then use the "download" command to download songs.
\b
The following environment variables can be used for scripting purposes:
* C3DBDL_BASE_URL: equivalent to "--base-url"
* C3DBDL_DOWNLOAD_DIRECTORY: equivalent to "--download_directory"
* C3DBDL_JSON_DATABASE: equivalent to "--json-database"
"""
global config
# Expand any ~ in the download directory pathname
_download_directory = os.path.expanduser(_download_directory)
# Populate the configuration store
config["base_songs_url"] = _base_url
config["download_directory"] = _download_directory
config["database_filename"] = f"{_download_directory}/{_json_database}"
config = dict()
database.add_command(build_database)
database.add_command(edit_database)
cli.add_command(database)
cli.add_command(download)
def main():
return cli(obj={})
if __name__ == "__main__":
main()