#!/usr/bin/env python3 from linuxapi import inotify import PIL.Image import argparse import contextlib import datetime import io import logging import musicbrainzngs import mutagen import os import pyudev import queue import select import subprocess import sys import threading try: from libdiscid.compat import discid except ImportError: import discid log = logging.getLogger('ripcd2') filesystemencoding = sys.getfilesystemencoding() DRIVE_OFFSETS = { 'ASUS_BW-12B1ST_a': 6, 'ATAPI_iHES212_3': 702, } RELEASE_INFO_TMPL = '{artist} - {title} ({year}): {more_info}' FILENAME_SAFE_MAP = { '’': "'", ':': ' - ', } musicbrainzngs.set_useragent('DCPlayer', '0.0.1', 'https://dcplayer.audio/') class RipThread(threading.Thread): def __init__(self, device, tracks=None, use_libcdio=False): super(RipThread, self).__init__() if not tracks: tracks = ('1-',) self.tracks = tracks self.device = device self.use_libcdio = bool(use_libcdio) def run(self): log.debug('Starting rip') model = self.device.properties['ID_MODEL'] offset = DRIVE_OFFSETS[model] log.info('Using offset %d for %s', offset, model) if self.use_libcdio: cmd = ['cd-paranoia'] else: cmd = ['cdparanoia'] cmd += ( '--batch', '--force-read-speed', '1', '--sample-offset', str(offset), '--force-cdrom-device', self.device.device_node, '--verbose', ) cmd += self.tracks log.debug('Running command: %s', cmd) subprocess.run(cmd) class ProcessThread(threading.Thread): def __init__(self, release, discno): super(ProcessThread, self).__init__() self.release = release self.discno = discno self.quitpipe = None self.q = queue.Queue() self.encoder = EncodeThread(self.q, self.release, self.discno) def handle_event(self, evt): filename = evt.name.decode(filesystemencoding) if filename.endswith('.wav'): self.q.put(filename) def run(self): self.quitpipe = os.pipe() self.encoder.start() try: with contextlib.closing(inotify.Inotify()) as inot: inot.add_watch(b'.', inotify.IN_CLOSE_WRITE) while True: ready = select.select((self.quitpipe[0], inot), (), ())[0] if inot in ready: for evt in inot.read(): log.debug('Dispatching inotify event') self.handle_event(evt) if self.quitpipe[0] in ready: log.debug('Shutting down') break finally: os.close(self.quitpipe[0]) self.q.put(None) log.debug('Waiting for encoder thread') self.encoder.join() def stop(self): log.debug('Stopping process thread') if self.quitpipe is not None: os.close(self.quitpipe[1]) class EncodeThread(threading.Thread): def __init__(self, q, release, discno): super(EncodeThread, self).__init__() self.q = q self.release = release self.discno = discno self.release_year = str(release_year(self.release)) def encode(self, filename): log.info('Encoding %s to flac', filename) cmd = ['flac', filename] log.debug('Running command %s', cmd) subprocess.run(cmd) def tag(self, filename): basename, ext = os.path.splitext(filename) filename = basename + '.flac' log.info('Adding tags to %s', filename) trackno = int(filename[5:7]) artist = self.release['artist-credit'][0]['artist'] album = self.release['title'] medium = self.release['medium-list'][self.discno] track = medium['track-list'][trackno - 1] tags = mutagen.File(filename, easy=True) tags['tracknumber'] = str(trackno) tags['artist'] = tags['albumartist'] = artist['name'] tags['album'] = album tags['title'] = track['recording']['title'] tags['date'] = self.release_year if len(self.release['medium-list']) > 1: tags['discnumber'] = str(self.discno + 1) tags['musicbrainz_albumid'] = self.release['id'] tags['musicbrainz_artistid'] = artist['id'] tags['musicbrainz_releasetrackid'] = track['id'] tags.save() newname = '{track:02} {artist} - {title}.flac'.format( track=trackno, artist=artist['name'], title=track['recording']['title'], ) log.info('Renaming %s to %s', filename, newname) os.rename(filename, safe_name(newname)) def run(self): while True: filename = self.q.get() if filename is None: break self.encode(filename) self.tag(filename) os.unlink(filename) class MockRipThread(threading.Thread): def run(self): log.debug('Mock rip thread') import time; time.sleep(2) open('track01.cdda.wav', 'ab').close() def fetch_albumart(release): data = musicbrainzngs.get_image_front(release['id']) buf = io.BytesIO(data) img = PIL.Image.open(buf) img.thumbnail((1000, 1000)) with open('folder.jpg', 'wb') as f: img.save(f) def format_release(release): more_info = [ '{} disc(s)'.format(len(release['medium-list'])), ] if 'packaging' in release: more_info.append(release['packaging']) if 'country' in release: more_info.append(release['country']) if 'label-info-list' in release: for label_info in release['label-info-list']: if 'catalog-number' in label_info: more_info.append(label_info['catalog-number']) break return RELEASE_INFO_TMPL.format( artist=release['artist-credit-phrase'], title=release['title'], year=release_year(release), more_info=', '.join(more_info), ) def get_device(path): udev = pyudev.Context() if path is None: block_devices = udev.list_devices(subsystem='block') return next(iter(block_devices.match_property('ID_CDROM', 1))) else: return pyudev.Devices.from_device_file(udev, path) def get_release_by_id(mbid): res = musicbrainzngs.get_release_by_id( mbid, includes=[ 'artists', 'recordings', 'labels', ], ) import pdb; pdb.set_trace() return res['release'] def get_release_list_from_device(device): disc = discid.read(device) log.info('Found disc ID %s at %s', disc.id, device) res = musicbrainzngs.get_releases_by_discid( disc.id, toc=disc.toc_string, includes=[ 'artists', 'recordings', 'labels', ], ) if 'disc' in res: return res['disc']['release-list'] else: return res['release-list'] def prompt(text, validate=None): while True: try: inp = input(text) except (EOFError, KeyboardInterrupt): print('') raise SystemExit(1) if validate: try: return validate(inp) except ValueError as e: sys.stderr.write('Invalid input: {}\n'.format(e)) def prompt_menu(choices): max_ = 0 for idx, choice in enumerate(choices): print('{})'.format(idx + 1), choice) max_ += 1 def validate(inp): i = int(inp) if i < 1 or i > max_: raise ValueError('Invalid selection: {}'.format(i)) return i return prompt('Selection: ', validate=validate) - 1 def prompt_select_disc(num_discs): print( 'Found part of a multi-disc album. Please select the disc number:' ) return prompt_menu('Disc {}'.format(x) for x in range(1, num_discs + 1)) def prompt_select_release(release_list): print( 'Mutliple matching releases found. Please select the correct release:' ) choice = prompt_menu(format_release(r) for r in release_list) return release_list[choice] def release_year(release): return datetime.datetime.strptime(release['date'], '%Y-%m-%d').year def safe_name(name): for k, v in FILENAME_SAFE_MAP.items(): name = name.replace(k, v) return name def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( '--use-libcdio', action='store_true', default=False, ) parser.add_argument( '--device', '-d', ) parser.add_argument( '--mbid', '-i', ) parser.add_argument( 'tracks', nargs='*', ) return parser.parse_args() def main(): args = parse_args() device = get_device(args.device) if args.mbid: release = get_release_by_id(args.mbid) else: release_list = get_release_list_from_device(device.device_node) if len(release_list) > 1: release = prompt_select_release(release_list) elif len(release_list) == 1: release = release_list[0] print('Ripping', format_release(release)) dirname = safe_name('{artist} - {album}'.format( artist=release['artist-credit-phrase'], album=release['title'], )) if not os.path.isdir(dirname): log.info('Creating directory: %s', dirname) os.mkdir(dirname) os.chdir(dirname) num_discs = len(release['medium-list']) if num_discs > 1: discno = prompt_select_disc(num_discs) subdirname = 'Disc {}'.format(discno + 1) if not os.path.isdir(subdirname): log.info('Creating directory: %s', subdirname) os.mkdir(subdirname) os.chdir(subdirname) else: discno = 0 ripthread = RipThread(device, args.tracks, args.use_libcdio) #ripthread = MockRipThread() ripthread.start() processthread = ProcessThread(release, discno) try: processthread.start() try: fetch_albumart(release) except: log.exception('Error fetching album art') ripthread.join() finally: processthread.stop() processthread.join() if __name__ == '__main__': main()