410 lines
11 KiB
Python
Executable File
410 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
from linuxapi import inotify
|
||
import PIL.Image
|
||
import argparse
|
||
import contextlib
|
||
import datetime
|
||
import io
|
||
import logging
|
||
import musicbrainzngs
|
||
import mutagen
|
||
import os
|
||
import pyudev
|
||
import queue
|
||
import select
|
||
import subprocess
|
||
import sys
|
||
import threading
|
||
|
||
try:
|
||
from libdiscid.compat import discid
|
||
except ImportError:
|
||
import discid
|
||
|
||
|
||
log = logging.getLogger('ripcd2')
|
||
|
||
|
||
filesystemencoding = sys.getfilesystemencoding()
|
||
|
||
|
||
DATE_FMTS = [
|
||
'%Y-%m-%d',
|
||
'%Y-%m',
|
||
'%Y',
|
||
]
|
||
|
||
|
||
DRIVE_OFFSETS = {
|
||
'ASUS_BW-12B1ST_a': 6,
|
||
'ATAPI_iHES212_3': 702,
|
||
}
|
||
|
||
|
||
RELEASE_INFO_TMPL = '{artist} - {title} ({year}): {more_info}'
|
||
|
||
|
||
FILENAME_SAFE_MAP = {
|
||
'’': "'",
|
||
':': ' - ',
|
||
'/': '-',
|
||
}
|
||
|
||
|
||
musicbrainzngs.set_useragent('DCPlayer', '0.0.1', 'https://dcplayer.audio/')
|
||
|
||
|
||
class RipThread(threading.Thread):
|
||
|
||
def __init__(self, device, tracks=None, use_libcdio=False):
|
||
super(RipThread, self).__init__()
|
||
if not tracks:
|
||
tracks = ('1-',)
|
||
self.tracks = tracks
|
||
self.device = device
|
||
self.use_libcdio = bool(use_libcdio)
|
||
|
||
def run(self):
|
||
log.debug('Starting rip')
|
||
model = self.device.properties['ID_MODEL']
|
||
offset = DRIVE_OFFSETS[model]
|
||
log.info('Using offset %d for %s', offset, model)
|
||
|
||
if self.use_libcdio:
|
||
cmd = ['cd-paranoia']
|
||
else:
|
||
cmd = ['cdparanoia']
|
||
cmd += (
|
||
'--batch',
|
||
'--force-read-speed', '1',
|
||
'--sample-offset', str(offset),
|
||
'--force-cdrom-device', self.device.device_node,
|
||
'--verbose',
|
||
)
|
||
cmd += self.tracks
|
||
log.debug('Running command: %s', cmd)
|
||
subprocess.run(cmd)
|
||
|
||
|
||
class ProcessThread(threading.Thread):
|
||
|
||
def __init__(self, release, discno):
|
||
super(ProcessThread, self).__init__()
|
||
self.release = release
|
||
self.discno = discno
|
||
self.quitpipe = None
|
||
self.q = queue.Queue()
|
||
self.encoder = EncodeThread(self.q, self.release, self.discno)
|
||
|
||
def handle_event(self, evt):
|
||
filename = evt.name.decode(filesystemencoding)
|
||
if filename.endswith('.wav'):
|
||
self.q.put(filename)
|
||
|
||
def run(self):
|
||
self.quitpipe = os.pipe()
|
||
self.encoder.start()
|
||
try:
|
||
with contextlib.closing(inotify.Inotify()) as inot:
|
||
inot.add_watch(b'.', inotify.IN_CLOSE_WRITE)
|
||
while True:
|
||
ready = select.select((self.quitpipe[0], inot), (), ())[0]
|
||
if inot in ready:
|
||
for evt in inot.read():
|
||
log.debug('Dispatching inotify event')
|
||
self.handle_event(evt)
|
||
if self.quitpipe[0] in ready:
|
||
log.debug('Shutting down')
|
||
break
|
||
finally:
|
||
os.close(self.quitpipe[0])
|
||
self.q.put(None)
|
||
log.debug('Waiting for encoder thread')
|
||
self.encoder.join()
|
||
|
||
def stop(self):
|
||
log.debug('Stopping process thread')
|
||
if self.quitpipe is not None:
|
||
os.close(self.quitpipe[1])
|
||
|
||
|
||
class EncodeThread(threading.Thread):
|
||
|
||
def __init__(self, q, release, discno):
|
||
super(EncodeThread, self).__init__()
|
||
self.q = q
|
||
self.release = release
|
||
self.discno = discno
|
||
self.release_year = str(release_year(self.release))
|
||
|
||
def encode(self, filename):
|
||
log.info('Encoding %s to flac', filename)
|
||
cmd = ['flac', filename]
|
||
log.debug('Running command %s', cmd)
|
||
subprocess.run(cmd)
|
||
|
||
def tag(self, filename):
|
||
basename, ext = os.path.splitext(filename)
|
||
filename = basename + '.flac'
|
||
|
||
log.info('Adding tags to %s', filename)
|
||
trackno = int(filename[5:7])
|
||
artist = self.release['artist-credit'][0]['artist']
|
||
album = self.release['title']
|
||
medium = self.release['medium-list'][self.discno]
|
||
track = medium['track-list'][trackno - 1]
|
||
tags = mutagen.File(filename, easy=True)
|
||
tags['tracknumber'] = str(trackno)
|
||
tags['artist'] = tags['albumartist'] = artist['name']
|
||
tags['album'] = album
|
||
tags['title'] = track['recording']['title']
|
||
tags['date'] = self.release_year
|
||
if len(self.release['medium-list']) > 1:
|
||
tags['discnumber'] = str(self.discno + 1)
|
||
tags['musicbrainz_albumid'] = self.release['id']
|
||
tags['musicbrainz_artistid'] = artist['id']
|
||
tags['musicbrainz_releasetrackid'] = track['id']
|
||
tags.save()
|
||
|
||
newname = '{track:02} {artist} - {title}.flac'.format(
|
||
track=trackno,
|
||
artist=artist['name'],
|
||
title=track['recording']['title'],
|
||
)
|
||
log.info('Renaming %s to %s', filename, newname)
|
||
os.rename(filename, safe_name(newname))
|
||
|
||
def run(self):
|
||
while True:
|
||
filename = self.q.get()
|
||
if filename is None:
|
||
break
|
||
self.encode(filename)
|
||
self.tag(filename)
|
||
os.unlink(filename)
|
||
|
||
|
||
class MockRipThread(threading.Thread):
|
||
|
||
def run(self):
|
||
log.debug('Mock rip thread')
|
||
import time; time.sleep(2)
|
||
open('track01.cdda.wav', 'ab').close()
|
||
|
||
|
||
def fetch_albumart(release):
|
||
data = musicbrainzngs.get_image_front(release['id'])
|
||
buf = io.BytesIO(data)
|
||
img = PIL.Image.open(buf)
|
||
img.thumbnail((1000, 1000))
|
||
with open('folder.jpg', 'wb') as f:
|
||
img.save(f)
|
||
|
||
|
||
def format_release(release):
|
||
more_info = [
|
||
'{} disc(s)'.format(len(release['medium-list'])),
|
||
]
|
||
if 'packaging' in release:
|
||
more_info.append(release['packaging'])
|
||
if 'country' in release:
|
||
more_info.append(release['country'])
|
||
if 'label-info-list' in release:
|
||
for label_info in release['label-info-list']:
|
||
if 'catalog-number' in label_info:
|
||
more_info.append(label_info['catalog-number'])
|
||
break
|
||
return RELEASE_INFO_TMPL.format(
|
||
artist=release['artist-credit-phrase'],
|
||
title=release['title'],
|
||
year=release_year(release),
|
||
more_info=', '.join(more_info),
|
||
)
|
||
|
||
|
||
def get_device(path):
|
||
udev = pyudev.Context()
|
||
if path is None:
|
||
block_devices = udev.list_devices(subsystem='block')
|
||
return next(iter(block_devices.match_property('ID_CDROM', 1)))
|
||
else:
|
||
return pyudev.Devices.from_device_file(udev, path)
|
||
|
||
|
||
def get_release_by_id(mbid):
|
||
res = musicbrainzngs.get_release_by_id(
|
||
mbid,
|
||
includes=[
|
||
'artists',
|
||
'recordings',
|
||
'labels',
|
||
],
|
||
)
|
||
return res['release']
|
||
|
||
|
||
def get_release_list_from_device(device):
|
||
disc = discid.read(device)
|
||
log.info('Found disc ID %s at %s', disc.id, device)
|
||
res = musicbrainzngs.get_releases_by_discid(
|
||
disc.id,
|
||
toc=disc.toc_string,
|
||
includes=[
|
||
'artists',
|
||
'recordings',
|
||
'labels',
|
||
],
|
||
)
|
||
if 'disc' in res:
|
||
return res['disc']['release-list']
|
||
else:
|
||
try:
|
||
return res['release-list']
|
||
except KeyError:
|
||
return []
|
||
|
||
|
||
def parse_date(datestr):
|
||
for fmt in DATE_FMTS:
|
||
try:
|
||
return datetime.datetime.strptime(datestr, fmt)
|
||
except ValueError:
|
||
continue
|
||
else:
|
||
raise ValueError('Could not parse date string "{}"'.format(datestr))
|
||
|
||
|
||
def prompt(text, validate=None):
|
||
while True:
|
||
try:
|
||
inp = input(text)
|
||
except (EOFError, KeyboardInterrupt):
|
||
print('')
|
||
raise SystemExit(1)
|
||
if validate:
|
||
try:
|
||
return validate(inp)
|
||
except ValueError as e:
|
||
sys.stderr.write('Invalid input: {}\n'.format(e))
|
||
|
||
|
||
def prompt_menu(choices):
|
||
max_ = 0
|
||
for idx, choice in enumerate(choices):
|
||
print('{})'.format(idx + 1), choice)
|
||
max_ += 1
|
||
|
||
def validate(inp):
|
||
i = int(inp)
|
||
if i < 1 or i > max_:
|
||
raise ValueError('Invalid selection: {}'.format(i))
|
||
return i
|
||
|
||
return prompt('Selection: ', validate=validate) - 1
|
||
|
||
|
||
def prompt_select_disc(num_discs):
|
||
print(
|
||
'Found part of a multi-disc album. Please select the disc number:'
|
||
)
|
||
return prompt_menu('Disc {}'.format(x) for x in range(1, num_discs + 1))
|
||
|
||
|
||
def prompt_select_release(release_list):
|
||
print(
|
||
'Mutliple matching releases found. Please select the correct release:'
|
||
)
|
||
choice = prompt_menu(format_release(r) for r in release_list)
|
||
return release_list[choice]
|
||
|
||
|
||
def release_year(release):
|
||
try:
|
||
return parse_date(release['date']).year
|
||
except (KeyError, ValueError):
|
||
return None
|
||
|
||
|
||
def safe_name(name):
|
||
for k, v in FILENAME_SAFE_MAP.items():
|
||
name = name.replace(k, v)
|
||
return name
|
||
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument(
|
||
'--use-libcdio',
|
||
action='store_true',
|
||
default=False,
|
||
)
|
||
parser.add_argument(
|
||
'--device', '-d',
|
||
)
|
||
parser.add_argument(
|
||
'--mbid', '-i',
|
||
)
|
||
parser.add_argument(
|
||
'tracks',
|
||
nargs='*',
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def main():
|
||
args = parse_args()
|
||
|
||
device = get_device(args.device)
|
||
if args.mbid:
|
||
release = get_release_by_id(args.mbid)
|
||
else:
|
||
release_list = get_release_list_from_device(device.device_node)
|
||
if len(release_list) > 1:
|
||
release = prompt_select_release(release_list)
|
||
elif len(release_list) == 1:
|
||
release = release_list[0]
|
||
else:
|
||
sys.stderr.write('Could not find a matching MusicBrainz release\n')
|
||
raise SystemExit(1)
|
||
|
||
print('Ripping', format_release(release))
|
||
|
||
dirname = safe_name('{artist} - {album}'.format(
|
||
artist=release['artist-credit-phrase'],
|
||
album=release['title'],
|
||
))
|
||
if not os.path.isdir(dirname):
|
||
log.info('Creating directory: %s', dirname)
|
||
os.mkdir(dirname)
|
||
os.chdir(dirname)
|
||
|
||
num_discs = len(release['medium-list'])
|
||
if num_discs > 1:
|
||
discno = prompt_select_disc(num_discs)
|
||
subdirname = 'Disc {}'.format(discno + 1)
|
||
if not os.path.isdir(subdirname):
|
||
log.info('Creating directory: %s', subdirname)
|
||
os.mkdir(subdirname)
|
||
os.chdir(subdirname)
|
||
else:
|
||
discno = 0
|
||
|
||
ripthread = RipThread(device, args.tracks, args.use_libcdio)
|
||
#ripthread = MockRipThread()
|
||
ripthread.start()
|
||
processthread = ProcessThread(release, discno)
|
||
try:
|
||
processthread.start()
|
||
try:
|
||
fetch_albumart(release)
|
||
except:
|
||
log.exception('Error fetching album art')
|
||
ripthread.join()
|
||
finally:
|
||
processthread.stop()
|
||
processthread.join()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|