ripcd/ripcd.py

404 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
from linuxapi import inotify
import PIL.Image
import argparse
import contextlib
import datetime
import io
import logging
import musicbrainzngs
import mutagen
import os
import pyudev
import queue
import select
import subprocess
import sys
import threading
try:
from libdiscid.compat import discid
except ImportError:
import discid
log = logging.getLogger('ripcd2')
filesystemencoding = sys.getfilesystemencoding()
DATE_FMTS = [
'%Y-%m-%d',
'%Y-%m',
'%Y',
]
DRIVE_OFFSETS = {
'ASUS_BW-12B1ST_a': 6,
'ATAPI_iHES212_3': 702,
}
RELEASE_INFO_TMPL = '{artist} - {title} ({year}): {more_info}'
FILENAME_SAFE_MAP = {
'': "'",
':': ' - ',
}
musicbrainzngs.set_useragent('DCPlayer', '0.0.1', 'https://dcplayer.audio/')
class RipThread(threading.Thread):
def __init__(self, device, tracks=None, use_libcdio=False):
super(RipThread, self).__init__()
if not tracks:
tracks = ('1-',)
self.tracks = tracks
self.device = device
self.use_libcdio = bool(use_libcdio)
def run(self):
log.debug('Starting rip')
model = self.device.properties['ID_MODEL']
offset = DRIVE_OFFSETS[model]
log.info('Using offset %d for %s', offset, model)
if self.use_libcdio:
cmd = ['cd-paranoia']
else:
cmd = ['cdparanoia']
cmd += (
'--batch',
'--force-read-speed', '1',
'--sample-offset', str(offset),
'--force-cdrom-device', self.device.device_node,
'--verbose',
)
cmd += self.tracks
log.debug('Running command: %s', cmd)
subprocess.run(cmd)
class ProcessThread(threading.Thread):
def __init__(self, release, discno):
super(ProcessThread, self).__init__()
self.release = release
self.discno = discno
self.quitpipe = None
self.q = queue.Queue()
self.encoder = EncodeThread(self.q, self.release, self.discno)
def handle_event(self, evt):
filename = evt.name.decode(filesystemencoding)
if filename.endswith('.wav'):
self.q.put(filename)
def run(self):
self.quitpipe = os.pipe()
self.encoder.start()
try:
with contextlib.closing(inotify.Inotify()) as inot:
inot.add_watch(b'.', inotify.IN_CLOSE_WRITE)
while True:
ready = select.select((self.quitpipe[0], inot), (), ())[0]
if inot in ready:
for evt in inot.read():
log.debug('Dispatching inotify event')
self.handle_event(evt)
if self.quitpipe[0] in ready:
log.debug('Shutting down')
break
finally:
os.close(self.quitpipe[0])
self.q.put(None)
log.debug('Waiting for encoder thread')
self.encoder.join()
def stop(self):
log.debug('Stopping process thread')
if self.quitpipe is not None:
os.close(self.quitpipe[1])
class EncodeThread(threading.Thread):
def __init__(self, q, release, discno):
super(EncodeThread, self).__init__()
self.q = q
self.release = release
self.discno = discno
self.release_year = str(release_year(self.release))
def encode(self, filename):
log.info('Encoding %s to flac', filename)
cmd = ['flac', filename]
log.debug('Running command %s', cmd)
subprocess.run(cmd)
def tag(self, filename):
basename, ext = os.path.splitext(filename)
filename = basename + '.flac'
log.info('Adding tags to %s', filename)
trackno = int(filename[5:7])
artist = self.release['artist-credit'][0]['artist']
album = self.release['title']
medium = self.release['medium-list'][self.discno]
track = medium['track-list'][trackno - 1]
tags = mutagen.File(filename, easy=True)
tags['tracknumber'] = str(trackno)
tags['artist'] = tags['albumartist'] = artist['name']
tags['album'] = album
tags['title'] = track['recording']['title']
tags['date'] = self.release_year
if len(self.release['medium-list']) > 1:
tags['discnumber'] = str(self.discno + 1)
tags['musicbrainz_albumid'] = self.release['id']
tags['musicbrainz_artistid'] = artist['id']
tags['musicbrainz_releasetrackid'] = track['id']
tags.save()
newname = '{track:02} {artist} - {title}.flac'.format(
track=trackno,
artist=artist['name'],
title=track['recording']['title'],
)
log.info('Renaming %s to %s', filename, newname)
os.rename(filename, safe_name(newname))
def run(self):
while True:
filename = self.q.get()
if filename is None:
break
self.encode(filename)
self.tag(filename)
os.unlink(filename)
class MockRipThread(threading.Thread):
def run(self):
log.debug('Mock rip thread')
import time; time.sleep(2)
open('track01.cdda.wav', 'ab').close()
def fetch_albumart(release):
data = musicbrainzngs.get_image_front(release['id'])
buf = io.BytesIO(data)
img = PIL.Image.open(buf)
img.thumbnail((1000, 1000))
with open('folder.jpg', 'wb') as f:
img.save(f)
def format_release(release):
more_info = [
'{} disc(s)'.format(len(release['medium-list'])),
]
if 'packaging' in release:
more_info.append(release['packaging'])
if 'country' in release:
more_info.append(release['country'])
if 'label-info-list' in release:
for label_info in release['label-info-list']:
if 'catalog-number' in label_info:
more_info.append(label_info['catalog-number'])
break
return RELEASE_INFO_TMPL.format(
artist=release['artist-credit-phrase'],
title=release['title'],
year=release_year(release),
more_info=', '.join(more_info),
)
def get_device(path):
udev = pyudev.Context()
if path is None:
block_devices = udev.list_devices(subsystem='block')
return next(iter(block_devices.match_property('ID_CDROM', 1)))
else:
return pyudev.Devices.from_device_file(udev, path)
def get_release_by_id(mbid):
res = musicbrainzngs.get_release_by_id(
mbid,
includes=[
'artists',
'recordings',
'labels',
],
)
import pdb; pdb.set_trace()
return res['release']
def get_release_list_from_device(device):
disc = discid.read(device)
log.info('Found disc ID %s at %s', disc.id, device)
res = musicbrainzngs.get_releases_by_discid(
disc.id,
toc=disc.toc_string,
includes=[
'artists',
'recordings',
'labels',
],
)
if 'disc' in res:
return res['disc']['release-list']
else:
return res['release-list']
def parse_date(datestr):
for fmt in DATE_FMTS:
try:
return datetime.datetime.strptime(datestr, fmt)
except ValueError:
continue
else:
raise ValueError('Could not parse date string "{}"'.format(datestr))
def prompt(text, validate=None):
while True:
try:
inp = input(text)
except (EOFError, KeyboardInterrupt):
print('')
raise SystemExit(1)
if validate:
try:
return validate(inp)
except ValueError as e:
sys.stderr.write('Invalid input: {}\n'.format(e))
def prompt_menu(choices):
max_ = 0
for idx, choice in enumerate(choices):
print('{})'.format(idx + 1), choice)
max_ += 1
def validate(inp):
i = int(inp)
if i < 1 or i > max_:
raise ValueError('Invalid selection: {}'.format(i))
return i
return prompt('Selection: ', validate=validate) - 1
def prompt_select_disc(num_discs):
print(
'Found part of a multi-disc album. Please select the disc number:'
)
return prompt_menu('Disc {}'.format(x) for x in range(1, num_discs + 1))
def prompt_select_release(release_list):
print(
'Mutliple matching releases found. Please select the correct release:'
)
choice = prompt_menu(format_release(r) for r in release_list)
return release_list[choice]
def release_year(release):
try:
return parse_date(release['date']).year
except (KeyError, ValueError):
return None
def safe_name(name):
for k, v in FILENAME_SAFE_MAP.items():
name = name.replace(k, v)
return name
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--use-libcdio',
action='store_true',
default=False,
)
parser.add_argument(
'--device', '-d',
)
parser.add_argument(
'--mbid', '-i',
)
parser.add_argument(
'tracks',
nargs='*',
)
return parser.parse_args()
def main():
args = parse_args()
device = get_device(args.device)
if args.mbid:
release = get_release_by_id(args.mbid)
else:
release_list = get_release_list_from_device(device.device_node)
if len(release_list) > 1:
release = prompt_select_release(release_list)
elif len(release_list) == 1:
release = release_list[0]
print('Ripping', format_release(release))
dirname = safe_name('{artist} - {album}'.format(
artist=release['artist-credit-phrase'],
album=release['title'],
))
if not os.path.isdir(dirname):
log.info('Creating directory: %s', dirname)
os.mkdir(dirname)
os.chdir(dirname)
num_discs = len(release['medium-list'])
if num_discs > 1:
discno = prompt_select_disc(num_discs)
subdirname = 'Disc {}'.format(discno + 1)
if not os.path.isdir(subdirname):
log.info('Creating directory: %s', subdirname)
os.mkdir(subdirname)
os.chdir(subdirname)
else:
discno = 0
ripthread = RipThread(device, args.tracks, args.use_libcdio)
#ripthread = MockRipThread()
ripthread.start()
processthread = ProcessThread(release, discno)
try:
processthread.start()
try:
fetch_albumart(release)
except:
log.exception('Error fetching album art')
ripthread.join()
finally:
processthread.stop()
processthread.join()
if __name__ == '__main__':
main()