scripts/ripcd.py

341 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python3.4
from lxml import etree
import asyncio
import aiohttp
import argparse
import chardet
import fnmatch
import functools
import glob
import multiprocessing
import mutagen
import os
import re
import sys
import urllib.parse
try:
from titlecase import titlecase
except ImportError:
titlecase = str.title
class TocProtocol(asyncio.SubprocessProtocol):
def __init__(self, toc):
self.toc = toc
self.done = asyncio.Future()
def pipe_data_received(self, fd, data):
detected = chardet.detect(data)
for line in data.decode(detected['encoding']).splitlines():
if not line:
continue
if line.startswith('Album title:'):
m = self.toc.ALBUM_TITLE_RE.match(line)
if not m:
continue
values = m.groupdict()
self.toc.album = values.get('album')
self.toc.artist = values.get('artist')
elif line.startswith('CDINDEX'):
try:
discid = line.split(':', 1)[1].strip()
except ValueError:
continue
self.toc.discid = discid
elif line.startswith('Track'):
try:
title = line.split(':', 1)[1].strip().strip("'")
except ValueError:
continue
self.toc.tracks.append(title)
def process_exited(self):
self.done.set_result(True)
class TableOfContents(object):
ALBUM_TITLE_RE = re.compile(
r'''^Album title: '(?P<album>.*)'\s+\[from (?P<artist>.*)\]\s*$'''
)
def __init__(self):
self.discid = None
self.artist = None
self.album = None
self.tracks = []
@classmethod
@asyncio.coroutine
def from_device(cls, device=None):
cmd = ['icedax']
if device:
cmd.extend(('--device', device))
cmd.extend((
'--info-only',
'--no-infofile',
'--verbose-level', 'catalog,titles',
'--quiet',
'--silent-scsi',
))
env = os.environ.copy()
env['LC_MESSAGES'] = env['LANG'] = 'C'
self = cls()
loop = asyncio.get_event_loop()
factory = functools.partial(TocProtocol, self)
trans, proto = yield from loop.subprocess_exec(factory, *cmd, env=env)
yield from proto.done
trans.close()
return self
class Track(object):
CDTEXT_TAGS = {
'Albumperformer': 'albumartist',
'Albumtitle': 'album',
'Performer': 'artist',
'Tracknumber': 'tracknumber',
'Tracktitle': 'title',
}
FILENAME_FORMAT = '{tracknumber:0>2} {artist} - {title}.{ext}'
def __init__(self):
self.filename = None
self.tags = {}
@property
def outfile(self):
if self.tags.get('title'):
return self.FILENAME_FORMAT.format(ext=self.extension, **self.tags)
else:
return '{filename}.{ext}'.format(
filename=os.path.splitext(self.filename)[0],
ext=self.extension,
)
@classmethod
def from_file(cls, filename):
self = cls()
self.filename = filename
return self
def _parse_inf(self):
assert self.filename
basename = os.path.splitext(self.filename)[0]
infname = '{}.inf'.format(basename)
try:
inf = open(infname, 'rb')
except OSError as e:
sys.stderr.write('Could not read track info: {}\n'.format(e))
return
with inf:
for line in inf:
detected = chardet.detect(line)
line = line.decode(detected['encoding']).split('#')[0]
if not line:
continue
try:
key, value = line.split('=')
except ValueError:
continue
try:
tag = self.CDTEXT_TAGS[key.strip()]
except KeyError:
continue
self.tags[tag] = titlecase(value.strip().strip("'"))
@asyncio.coroutine
def encode(self, lock=None):
assert self.filename
loop = asyncio.get_event_loop()
yield from loop.run_in_executor(None, self._parse_inf)
if lock:
yield from lock.acquire()
print('Encoding {} as {}'.format(self.filename, self.outfile))
def write_tags(self):
tags = mutagen.File(self.outfile, easy=True)
tags.update(self.tags)
tags.save()
class VorbisTrack(Track):
extension = 'ogg'
@asyncio.coroutine
def encode(self, lock=None):
yield from super(VorbisTrack, self).encode(lock)
loop = asyncio.get_event_loop()
cmd = ['oggenc', '-q', '9', '-Q', '-o', self.outfile, self.filename]
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to encode {}\n'.format(self.filename))
if lock:
lock.release()
if self.tags:
yield from loop.run_in_executor(None, self.write_tags)
class FlacTrack(Track):
extension = 'flac'
@asyncio.coroutine
def encode(self, lock=None):
yield from super(FlacTrack, self).encode(lock)
loop = asyncio.get_event_loop()
cmd = ['flac', '-s', '-o', self.outfile, self.filename]
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to encode {}\n'.format(self.filename))
if lock:
lock.release()
if self.tags:
yield from loop.run_in_executor(None, self.write_tags)
@asyncio.coroutine
def fetch_album_art(discid):
loop = asyncio.get_event_loop()
headers = {
'Accept': 'application/json',
}
url = 'http://musicbrainz.org/ws/2/discid/{}'.format(discid)
res = yield from aiohttp.request('GET', url, headers=headers)
metadata = yield from res.json()
res.close()
for release in metadata.get('releases', ()):
if 'cover-art-archive' not in release:
continue
if not release['cover-art-archive'].get('count', 0):
continue
break
else:
sys.stderr.write('No cover artwork available\n')
return
url = 'http://coverartarchive.org/release/{}/front'.format(
release['id'])
res = yield from aiohttp.request('GET', url)
try:
with open('folder.jpg', 'wb') as f:
while True:
data = yield from res.content.read(8192)
if not data:
break
f.write(data)
except OSError as e:
sys.stderr.write('Could not save album art: {}'.format(e))
finally:
res.close()
@asyncio.coroutine
def rip_info(device):
cmd = ['icedax']
if device:
cmd.extend(('--device', device))
cmd.extend((
'--info-only',
'--quiet',
'--silent-scsi',
))
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to rip CD info\n')
@asyncio.coroutine
def rip_tracks(device, codec, num_encoders=None):
if not num_encoders:
num_encoders = multiprocessing.cpu_count()
cmd = ['icedax']
if device:
cmd.extend(('--device', device))
cmd.extend((
'--max',
'--alltracks',
'--no-infofile',
'--verbose-level', 'summary',
'--silent-scsi',
))
p = yield from asyncio.create_subprocess_exec(*cmd)
yield from p.wait()
if p.returncode != 0:
sys.stderr.write('Failed to rip CD tracks\n')
return
lock = asyncio.Semaphore(num_encoders)
tasks = []
for filename in glob.glob('*.wav'):
if codec == 'vorbis':
track = VorbisTrack.from_file(filename)
elif codec == 'flac':
track = FlacTrack.from_file(filename)
else:
raise ValueError('Unsupported codec: {}'.format(codec))
tasks.append(track.encode(lock))
yield from asyncio.wait(tasks)
def cleanup():
for f in os.listdir():
delete = False
if fnmatch.fnmatch(f, '*.wav'):
delete = True
elif fnmatch.fnmatch(f, '*.inf'):
delete = True
elif f in ('audio.cddb', 'audio.cdindex'):
delete = True
if delete:
os.unlink(f)
@asyncio.coroutine
def rip_cd(args):
loop = asyncio.get_event_loop()
toc = yield from TableOfContents.from_device(args.device)
print('Found CD: {} by {}'.format(toc.album, toc.artist))
yield from rip_info(args.device)
tasks = []
if toc.discid:
tasks.append(loop.create_task(fetch_album_art(toc.discid)))
tasks.append(loop.create_task(rip_tracks(args.device, args.codec,
args.num_encoders)))
yield from asyncio.wait(tasks)
if args.cleanup:
cleanup()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--num-encoders', metavar='COUNT',
help='Number of simultaneous encoder processes')
parser.add_argument('--no-clean', dest='cleanup', action='store_false',
default=True,
help='Do not remove temporary files')
parser.add_argument('--codec', '-c', choices=('vorbis', 'flac'),
default='vorbis',
help='Encode audio with specific codec')
parser.add_argument('device', nargs='?',
help='CD-ROM device to use')
return parser.parse_args()
def main():
args = parse_args()
loop = asyncio.get_event_loop()
loop.run_until_complete(rip_cd(args))
loop.close()
if __name__ == '__main__':
main()