c3dbdl/c3dbdl/c3dbdl.py

902 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
# c3dbdl - Customs Creators Collective archive tool
#
# Copyright (C) 2023 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import click
import requests
import re
import json
import os
from time import sleep
from difflib import unified_diff
from colorama import Fore
from bs4 import BeautifulSoup
from urllib.error import HTTPError
from concurrent.futures import ThreadPoolExecutor, as_completed
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], max_content_width=120)
def fetchSongData(entries):
song_entry = {
"artist": None,
"title": None,
"album": None,
"song_link": None,
"genre": None,
"year": None,
"length": None,
"author": None,
"instruments": dict(),
"dl_links": list(),
}
messages = list()
found_instruments = False
# Find song details
for idx, td in enumerate(entries[0].find_all("td")):
if idx == 2:
# Artist
song_entry["artist"] = td.find("a").get_text().strip().replace("/", "+")
elif idx == 3:
# Song
song_entry["title"] = (
td.find("div", attrs={"class": "c3ttitlemargin"})
.get_text()
.strip()
.replace("/", "+")
)
song_entry["album"] = (
td.find("div", attrs={"class": "c3tartist"})
.get_text()
.strip()
.replace("/", "+")
)
# Song page
tmp_links = td.find_all("a", href=True)
for link in tmp_links:
if link.get("href"):
song_entry["song_link"] = link.get("href")
break
elif idx == 4:
# Genre
song_entry["genre"] = td.find("a").get_text().strip()
elif idx == 5:
# Year
song_entry["year"] = td.find("a").get_text().strip()
elif idx == 6:
# Length
song_entry["length"] = td.find("a").get_text().strip()
elif idx == 8:
# Author (of chart)
song_entry["author"] = td.find("a").get_text().strip().replace("/", "+")
# Find song instruments and difficulties
for idx, td in enumerate(entries[1].find_all("td")):
if (
len(list(td.find_all("div", attrs={"style": "width:110px;float:left"}))) > 0
and not found_instruments
):
for instrument in td.find_all(
"div", attrs={"style": "width:110px;float:left"}
):
difficulty_link = (
instrument.find_all(
"a", attrs={"style": "text-decoration: none;color:#000"}
)[1]
.get("href")
.split("/")
)
instrument_name = (
difficulty_link[-2].split("_")[-1].replace("prokeys", "keys")
)
instrument_diff = int(difficulty_link[-1])
if instrument_diff < 1:
# No part
instrument_difficulty = None
else:
# Link difficulty - 1
instrument_difficulty = instrument_diff - 1
song_entry["instruments"][instrument_name] = instrument_difficulty
found_instruments = True
if (
song_entry
and song_entry["author"]
and song_entry["title"]
and song_entry["song_link"]
):
messages.append(
f"> Found song entry for {song_entry['artist']} - {song_entry['title']} by {song_entry['author']}"
)
# Get download links from the actual song page
attempts = 1
sp = None
while attempts <= 3:
try:
messages.append(
f"Parsing song page {song_entry['song_link']} (attempt {attempts}/3)..."
)
sp = requests.get(song_entry["song_link"])
break
except Exception:
sleep(attempts)
attempts += 1
if sp is None or sp.status_code != 200:
messages.append("Failed to fetch song page, aborting")
return None
song_parsed_html = BeautifulSoup(sp.text, "html.parser")
download_section = song_parsed_html.find(
"div", attrs={"class": "portlet light bg-inverse"}
)
download_links = download_section.find_all("a", href=True)
dl_links = list()
for link_entry in download_links:
link = link_entry.get("href")
description = link_entry.get_text().strip()
if "c3universe.com" not in link:
continue
messages.append(f"Found download link: {link} ({description})")
dl_links.append(
{
"link": link,
"description": description,
}
)
if not dl_links:
messages.append(
"Found no c3universe.com download links for song, not adding to database"
)
return None
song_entry["dl_links"] = dl_links
# Return messages and song entry
return messages, song_entry
def buildDatabase(pages, concurrency):
found_songs = []
if pages is None:
r = requests.get(f"{config['base_songs_url']}")
if r.status_code != 200:
return
root_page_html = BeautifulSoup(r.text, "html.parser")
pages = int(
root_page_html.body.find("a", attrs={"class": "paginationLastPage"})
.get("href")
.replace("?page=", "")
)
click.echo(f"Collecting data from {pages} pages")
# Get a list of song URIs
for i in range(1, pages + 1):
attempts = 1
p = None
while attempts <= 3:
try:
click.echo(f"Parsing page {i} (attempt {attempts}/3)...")
p = requests.get(f"{config['base_songs_url']}?page={i}")
if p is None or p.status_code != 200:
raise
parsed_html = BeautifulSoup(p.text, "html.parser")
if parsed_html.body is None:
raise
if (
parsed_html.body.find("div", attrs={"class": "portlet-body"})
is None
):
raise
break
except Exception:
sleep(attempts)
attempts += 1
table_html = parsed_html.body.find("div", attrs={"class": "portlet-body"}).find(
"tbody"
)
# This is weird, but because of the table layout, there are two table rows for
# each song: the first is the song info, the second is the instruments
# So we must make a single "entry" that is a list of the two elements, then
# handle that later in fetchSongData.
entries = list()
entry_idx = 0
entry_data = list()
for entry in table_html.find_all("tr", attrs={"class": "odd"}):
if len(entry) < 1:
break
entry_data.append(entry)
entry_idx += 1
if entry_idx == 2:
entries.append(entry_data)
entry_idx = 0
entry_data = list()
click.echo("Fetching and parsing song pages...")
with ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_song = {
executor.submit(fetchSongData, entry): entry for entry in entries
}
for future in as_completed(future_to_song):
try:
messages, song = future.result()
click.echo("\n".join(messages))
if song is None:
continue
found_songs.append(song)
except Exception:
continue
return found_songs
def downloadSong(destination, filename, entry, dlid, dldesc):
click.echo(
f"""> Downloading song "{entry['artist']} - {entry['title']}" by {entry['author']}..."""
)
if dlid is None:
dl_links = entry["dl_links"]
else:
try:
dl_links = [entry["dl_links"][dlid - 1]]
except Exception:
click.echo(f"Invalid download link ID {dlid}.")
return
if dldesc is not None:
new_dl_links = list()
for link in dl_links:
if dldesc in link["description"]:
new_dl_links.append(link)
dl_links = new_dl_links
if not dl_links:
click.echo(f'No download link matching description "{dldesc}" found.')
return
for dl_link in dl_links:
try:
p = requests.get(dl_link["link"])
if p.status_code != 200:
raise HTTPError(dl_link["link"], p.status_code, "", None, None)
parsed_html = BeautifulSoup(p.text, "html.parser")
download_url = (
parsed_html.body.find("div", attrs={"class": "lock-head"})
.find("a")
.get("href")
)
except Exception as e:
click.echo(f"Failed parsing or retrieving HTML link: {e}")
continue
download_filename = filename.format(
genre=entry["genre"],
artist=entry["artist"],
album=entry["album"],
title=entry["title"],
year=entry["year"],
author=entry["author"],
orig_name=download_url.split("/")[-1],
)
download_filename = f"{destination}/{download_filename}"
download_path = "/".join(f"{download_filename}".split("/")[0:-1])
click.echo(
f"""Downloading file "{dl_link['description']}" from {download_url}..."""
)
if os.path.exists(download_filename):
click.echo(f"File exists at {download_filename}")
continue
attempts = 1
p = None
try:
with requests.get(download_url, stream=True) as r:
while attempts <= 3:
try:
r.raise_for_status()
break
except Exception:
click.echo(
f"Download attempt failed: HTTP {r.status_code}; retrying {attempts}/3"
)
sleep(attempts)
attempts += 1
if r is None or r.status_code != 200:
if r:
code = r.status_code
else:
code = "-1"
raise HTTPError(download_url, code, "", None, None)
if not os.path.exists(download_path):
os.makedirs(download_path)
with open(download_filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
click.echo(f"Successfully downloaded to {download_filename}")
except Exception as e:
click.echo(f"Download attempt failed: {e}")
continue
@click.command(name="build", short_help="Build the local database.")
@click.option(
"-o",
"--overwrite",
"_overwrite",
is_flag=True,
default=False,
envvar="C3DLDB_BUILD_OVERWRITE",
help="Overwrite existing database file.",
)
@click.option(
"-p",
"--pages",
"_pages",
type=int,
default=None,
envvar="C3DBDL_BUILD_PAGES",
help="Number of pages to scan (default is all).",
)
@click.option(
"-c",
"--concurrency",
"_concurrency",
type=int,
default=10,
envvar="C3DBDL_BUILD_CONCURRENCY",
help="Number of concurrent song page downloads to perform at once.",
)
def build_database(_overwrite, _pages, _concurrency):
"""
Initialize the local JSON database of C3DB songs from the website.
\b
The following environment variables can be used for scripting purposes:
* C3DLDB_BUILD_OVERWRITE: equivalent to "--overwrite"
* C3DBDL_BUILD_PAGES: equivalent to "--pages"
"""
if os.path.exists(config["database_filename"]) and not _overwrite:
click.echo(
f"Database already exists at '{config['database_filename']}'; use '--overwrite' to rebuild."
)
exit(1)
click.echo("Building JSON database; this will take a long time...")
songs_database = buildDatabase(_pages, _concurrency)
click.echo("")
click.echo(
f"Found {len(songs_database)} songs, dumping to database file '{config['database_filename']}'"
)
if not os.path.exists(config["download_directory"]):
click.echo(f"Creating download directory '{config['download_directory']}'")
os.makedirs(config["download_directory"])
with open(config["database_filename"], "w") as fh:
json.dump(songs_database, fh, indent=2)
fh.write("\n")
@click.command(name="edit", short_help="Edit the local database in EDITOR.")
def edit_database():
"""
Edit the local JSON database of C3DB songs with your $EDITOR.
"""
if not os.path.exists(config["database_filename"]):
click.echo(
f"WARNING: Database filename '{config['database_filename']}' does not exist!"
)
click.echo(
"Ensure you build a database first with the 'database build' command."
)
exit(1)
with open(config["database_filename"], "r") as fh:
songs_database = fh.read()
new_songs_database = click.edit(
text=songs_database, require_save=True, extension=".json"
)
while True:
if new_songs_database is None:
click.echo("Aborting with no modifications")
exit(0)
click.echo("")
click.echo("Pending modifications:")
click.echo("")
diff = list(
unified_diff(
songs_database.split("\n"),
new_songs_database.split("\n"),
fromfile="current",
tofile="modified",
fromfiledate="",
tofiledate="",
n=3,
lineterm="",
)
)
for line in diff:
if re.match(r"^\+", line) is not None:
click.echo(Fore.GREEN + line + Fore.RESET)
elif re.match(r"^\-", line) is not None:
click.echo(Fore.RED + line + Fore.RESET)
elif re.match(r"^\^", line) is not None:
click.echo(Fore.BLUE + line + Fore.RESET)
else:
click.echo(line)
click.echo("")
try:
json.loads(new_songs_database)
break
except Exception:
click.echo("ERROR: Invalid JSON syntax.")
click.confirm("Continue editing?", abort=True)
new_songs_database = click.edit(
text=new_songs_database, require_save=True, extension=".json"
)
click.confirm("Write modifications to songs database?", abort=True)
with open(config["database_filename"], "w") as fh:
fh.write(new_songs_database)
@click.group(name="database", short_help="Manage the local database.")
def database():
"""
Manage the local JSON database of C3DB songs.
"""
pass
@click.command(name="download", short_help="Download files from C3DB.")
@click.option(
"-s",
"--file-structure",
"_file_structure",
envvar="C3DBDL_DL_FILE_STRUCTURE",
default="{artist}/{album}/{title}.{author}.{orig_name}",
help="Specify the output file/directory stucture.",
)
@click.option(
"-f",
"--filter",
"_filters",
default=[],
multiple=True,
nargs=2,
help="Add a search filter.",
)
@click.option(
"-l",
"--limit",
"_limit",
envvar="C3DBDL_DL_LIMIT",
default=None,
type=int,
help="Limit to this many songs (first N matches).",
)
@click.option(
"-i",
"--download-id",
"_id",
envvar="C3DBDL_DL_ID",
default=None,
type=int,
help='Download only "dl_links" entry N (1 is first, etc.), or all if unspecified.',
)
@click.option(
"-d",
"--download-descr",
"_desc",
envvar="C3DBDL_DL_DESCR",
default=None,
help='Download only "dl_links" entries with this in their description (fuzzy).',
)
def download(_filters, _id, _desc, _limit, _file_structure):
"""
Download song(s) from the C3DB webpage.
Filters allow granular selection of the song(s) to download. Multiple filters can be
specified, and a song is selected only if ALL filters match (logical AND). Filters are
specified in the form "--filter <field> <value>".
For a full list of and explanation for filters, see the help output for the "search"
command ("c3dbdl search --help").
In addition to filters, each song may have more than one download link, to provide
multiple versions of the same song (for example, normal and multitracks, or alternate
charts). For each song, the "-i"/"--download-id" and "-d"/"--download-descr" options
can help filter these out, or both can be left blank to download all possible files
for a given song.
\b
The output file structure can be specified as a path format with any of the following
fields included, surrounded by curly braces:
* genre: The genre of the song.
* artist: The artist of the song.
* album: The album of the song.
* title: The title of the song.
* year: The year of the album/song.
* author: The author of the file on C3DB.
* orig_name: The original filename from the website.
\b
The default output file structure is:
"{artist}/{album}/{title}.{author}.{orig_name}"
\b
The following environment variables can be used for scripting purposes:
* C3DBDL_DL_FILE_STRUCTURE: equivalent to "--file-structure"
* C3DBDL_DL_LIMIT: equivalent to "--limit"
* C3DBDL_DL_ID: equivalent to "--download-id"
* C3DBDL_DL_DESCR: equivalent to "--download-descr"
"""
with open(config["database_filename"], "r") as fh:
all_songs = json.load(fh)
click.echo(
f"Found {len(all_songs)} songs from JSON database file '{config['database_filename']}'"
)
pending_songs = list()
for song in all_songs:
add_to_pending = True
song_filters = _filters
song_information_filters = list()
song_instrument_filters = list()
if len(_filters) > 0:
# Extract the instrument filters
for _filter in song_filters:
if _filter[0] == "instrument":
song_instrument_filters.append(_filter[1].lower())
else:
song_information_filters.append(_filter)
if len(song_information_filters) > 0 or len(song_instrument_filters) > 0:
# Parse the information filters
if len(song_information_filters) > 0:
try:
pending_information_filters = list()
for information_filter in song_information_filters:
filter_field = information_filter[0].lower()
filter_value = information_filter[1].lower()
if re.match("^~", filter_value):
filter_value = filter_value.replace("~", "")
if filter_value in song[filter_field].lower():
pending_information_filters.append(True)
else:
pending_information_filters.append(False)
else:
if filter_value == song[filter_field].lower():
pending_information_filters.append(True)
else:
pending_information_filters.append(False)
information_add_to_pending = all(pending_information_filters)
except KeyError as e:
click.echo(f"Invalid filter field {e}")
exit(1)
else:
information_add_to_pending = True
# Parse the instrument filters
if len(song_instrument_filters) > 0:
try:
pending_instrument_filters = list()
for instrument_filter in song_instrument_filters:
if re.match("^no-", instrument_filter):
instrument_filter = instrument_filter.replace("no-", "")
if song["instruments"][instrument_filter] is None:
pending_instrument_filters.append(True)
else:
pending_instrument_filters.append(False)
else:
if song["instruments"][instrument_filter] is not None:
pending_instrument_filters.append(True)
else:
pending_instrument_filters.append(False)
instrument_add_to_pending = all(pending_instrument_filters)
except KeyError as e:
click.echo(f"Invalid instrument value {e}")
exit(1)
else:
instrument_add_to_pending = True
add_to_pending = all(
[information_add_to_pending, instrument_add_to_pending]
)
if add_to_pending:
pending_songs.append(song)
if _limit is not None:
pending_songs = pending_songs[0:_limit]
click.echo(f"Downloading {len(pending_songs)} songs...")
for song in pending_songs:
downloadSong(config["download_directory"], _file_structure, song, _id, _desc)
@click.command(name="search", short_help="Search for songs from local C3DB.")
@click.option(
"-f",
"--filter",
"_filters",
default=[],
multiple=True,
nargs=2,
help="Add a search filter.",
)
def search(_filters):
"""
Search for song(s) from the C3DB local database.
Filters allow granular selection of the song(s) to download. Multiple filters can be
specified, and a song is selected only if ALL filters match (logical AND). Filters are
specified in the form "--filter <field> <value>".
\b
The valid fields for the "<field>" value are:
* genre: The genre of the song.
* artist: The artist of the song.
* album: The album of the song.
* title: The title of the song.
* year: The year of the album/song.
* author: The author of the file on C3DB.
* instrument: An instrument chart for the song.
\b
For example, to download all songs in the genre "Rock":
--filter genre Rock
\b
Or to download all songs by the artist "Rush" and the author "MyName":
--filter artist Rush --filter author MyName
Filter values are case insensitive, and non-instrument filters can be made fuzzy by
adding a tilde ("~") to the beginning of the "<value>".
\b
For example, to match all songs with "Word" in their titles:
--filter title ~word
Instrument filters allow selection of the presence of instruments. If an instrument
fitler is given, only songs which contain parts for the given instrument(s) will be
shown.
\b
The valid instruments are:
* guitar
* bass
* drums
* vocals
* keys
To negate an instrument filter and find only entires without the specified
instrument, append "no-" to the instrument name.
\b
For example, to download only songs that have a keys part but no vocal part:
--filter instrument keys --filter instrument no-vocals
Note that while instrument difficulties are displayed in the output of this command,
they can not be filtered on; this is up to the user to do manually. The purpose of
instrument filters is to ensure that songs contain or don't contain given parts, not
to granularly select the difficulty of said parts (that's for the players of the game
to do, not us).
"""
with open(config["database_filename"], "r") as fh:
all_songs = json.load(fh)
click.echo(
f"Found {len(all_songs)} songs from JSON database file '{config['database_filename']}'"
)
pending_songs = list()
for song in all_songs:
add_to_pending = True
song_filters = _filters
song_information_filters = list()
song_instrument_filters = list()
if len(_filters) > 0:
# Extract the instrument filters
for _filter in song_filters:
if _filter[0] == "instrument":
song_instrument_filters.append(_filter[1].lower())
else:
song_information_filters.append(_filter)
if len(song_information_filters) > 0 or len(song_instrument_filters) > 0:
# Parse the information filters
if len(song_information_filters) > 0:
try:
pending_information_filters = list()
for information_filter in song_information_filters:
filter_field = information_filter[0].lower()
filter_value = information_filter[1].lower()
if re.match("^~", filter_value):
filter_value = filter_value.replace("~", "")
if filter_value in song[filter_field].lower():
pending_information_filters.append(True)
else:
pending_information_filters.append(False)
else:
if filter_value == song[filter_field].lower():
pending_information_filters.append(True)
else:
pending_information_filters.append(False)
information_add_to_pending = all(pending_information_filters)
except KeyError as e:
click.echo(f"Invalid filter field {e}")
exit(1)
else:
information_add_to_pending = True
# Parse the instrument filters
if len(song_instrument_filters) > 0:
try:
pending_instrument_filters = list()
for instrument_filter in song_instrument_filters:
if re.match("^no-", instrument_filter):
instrument_filter = instrument_filter.replace("no-", "")
if song["instruments"][instrument_filter] is None:
pending_instrument_filters.append(True)
else:
pending_instrument_filters.append(False)
else:
if song["instruments"][instrument_filter] is not None:
pending_instrument_filters.append(True)
else:
pending_instrument_filters.append(False)
instrument_add_to_pending = all(pending_instrument_filters)
except KeyError as e:
click.echo(f"Invalid instrument value {e}")
exit(1)
else:
instrument_add_to_pending = True
add_to_pending = all(
[information_add_to_pending, instrument_add_to_pending]
)
if add_to_pending:
pending_songs.append(song)
click.echo(f"Found {len(pending_songs)} matching songs:")
click.echo()
for entry in pending_songs:
click.echo(
f"""> Song: "{entry['artist']} - {entry['title']}" ({entry['length']}, {entry['genre']}) from "{entry['album']} ({entry['year']})" by {entry['author']}"""
)
instrument_list = list()
for instrument in entry["instruments"]:
instrument_list.append(f"{instrument} [{entry['instruments'][instrument]}]")
click.echo(
f""" Instruments: {', '.join(instrument_list)}""",
)
click.echo(""" Available downloads:""")
for link in entry["dl_links"]:
click.echo(f""" * {link['description']}""")
click.echo()
@click.group(context_settings=CONTEXT_SETTINGS)
@click.option(
"-u",
"--base-url",
"_base_url",
envvar="C3DBDL_BASE_URL",
default="https://db.c3universe.com/songs/all",
show_default=True,
help="Base URL of the online C3DB songs page",
)
@click.option(
"-d",
"--download-directory",
"_download_directory",
envvar="C3DBDL_DOWNLOAD_DIRECTORY",
default="~/Downloads",
show_default=True,
help="Download directory for JSON database and songs",
)
@click.option(
"-j",
"--json-database",
"_json_database",
envvar="C3DBDL_JSON_DATABASE",
default="c3db.json",
show_default=True,
help="JSON database filename within download directory",
)
def cli(_base_url, _download_directory, _json_database):
"""
Customs Creators Collective archive tool
The Customs Creators Collective archive tool allows for easy scraping to a local JSON
database and downloading of files from the C3 (Customs Creators Collective) database,
a collection of custom songs for Rock Band and similar clone games.
This tool exists because the C3DB is very hard to mass download from: each song must
be found in the extensive list, selected manually, and a second link clicked through,
before a random file name is obtained. This tool simplifies the process by first collecting
information about all available songs of a particular type, and then is able to download
songs based on customizable filters (e.g. by genre, artist, author, etc.) and output them
in a standardized format.
To use the tool, first use the "database" command to build or modify your local JSON
database, then use the "download" command to download songs.
\b
The following environment variables can be used for scripting purposes:
* C3DBDL_BASE_URL: equivalent to "--base-url"
* C3DBDL_DOWNLOAD_DIRECTORY: equivalent to "--download_directory"
* C3DBDL_JSON_DATABASE: equivalent to "--json-database"
"""
global config
# Expand any ~ in the download directory pathname
_download_directory = os.path.expanduser(_download_directory)
# Populate the configuration store
config["base_songs_url"] = _base_url
config["download_directory"] = _download_directory
config["database_filename"] = f"{_download_directory}/{_json_database}"
config = dict()
database.add_command(build_database)
database.add_command(edit_database)
cli.add_command(database)
cli.add_command(download)
cli.add_command(search)
def main():
return cli(obj={})
if __name__ == "__main__":
main()