rupert/src/rupert/ripper.py

373 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import enum
import glob
import io
import logging
import os
import queue
import select
import subprocess
import sys
import threading
import time
from types import TracebackType
from typing import (
TYPE_CHECKING,
Callable,
Iterable,
Optional,
Tuple,
Type,
Union,
)
import mutagen
from . import inotify
from .disc import DiscDrive
from .musicbrainz import AlbumArtNotFound, Release, fetch_album_art
try:
from PIL import Image
except ImportError:
Image = None # type: ignore
log = logging.getLogger(__name__)
TrackList = Iterable[Union[str, int]]
class TrackStatus(enum.Enum):
ripping = 'Ripping'
encoding = 'Encoding'
done = 'Done'
StatusCallback = Callable[[str, TrackStatus], None]
CompleteCallback = Callable[[str, bool], None]
StatusMessage = Tuple[Optional[int], Union[Tuple[str, bool], TrackStatus]]
if TYPE_CHECKING:
ProcessQueue = queue.Queue[ # pylint: disable=unsubscriptable-object
Optional[str]
]
StatusQueue = queue.Queue[ # pylint: disable=unsubscriptable-object
StatusMessage
]
else:
ProcessQueue = queue.Queue
StatusQueue = queue.Queue
# fmt: off
FILENAME_SAFE_MAP = {
'': "'",
':': ' - ',
'/': '-',
}
# fmt: on
class RipThread(threading.Thread):
def __init__(
self,
device: DiscDrive,
tracks: Optional[TrackList] = None,
use_libcdio: bool = False,
):
super().__init__(name='RipThread')
if not tracks:
tracks = ('1-',)
self.tracks = tracks
self.device = device
self.use_libcdio = bool(use_libcdio)
self.on_complete: Optional[CompleteCallback] = None
def run(self):
log.info('Starting rip from device %s', self.device.device_node)
log.debug(
'Using offset %d for %s', self.device.offset, self.device.model
)
if self.use_libcdio:
cmd = ['cd-paranoia']
else:
cmd = ['cdparanoia']
cmd += (
'--batch',
'--quiet',
'--force-read-speed',
'1',
'--output-wav',
'--sample-offset',
str(self.device.offset),
'--force-cdrom-device',
str(self.device.device_node),
'--verbose',
)
cmd += (str(t) for t in self.tracks)
log.debug('Running command: %s', cmd)
with open('rupert-rip.out', 'wb') as f:
p = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
check=False,
)
self._complete(
f'{cmd[0]} exited with status {p.returncode}', p.returncode != 0
)
def _complete(self, message: str, is_err: bool) -> None:
if self.on_complete is not None:
self.on_complete(message, is_err)
class ProcessThread(threading.Thread):
encoding = sys.getfilesystemencoding()
def __init__(self, q: ProcessQueue) -> None:
super().__init__(name='ProcessThread')
self.q = q
self.quitpipe = None
self.on_status: Optional[StatusCallback] = None
def handle_event(self, evt):
filename = evt.name.decode(self.encoding)
if filename.endswith('.wav'):
if evt.mask & inotify.IN_CREATE:
log.debug('Started ripping %s', filename)
self._status(filename, TrackStatus.ripping)
if evt.mask & inotify.IN_CLOSE_WRITE:
log.debug('Finished ripping %s', filename)
self._status(filename, TrackStatus.encoding)
self.q.put(filename)
def run(self):
self.quitpipe = os.pipe()
try:
with inotify.Inotify() as inot:
inot.add_watch(
b'.', inotify.IN_CLOSE_WRITE | inotify.IN_CREATE
)
while True:
ready = select.select((self.quitpipe[0], inot), (), ())[0]
if inot in ready:
for evt in inot.read():
if not evt.name:
continue
self.handle_event(evt)
if self.quitpipe[0] in ready:
log.debug('Shutting down')
break
finally:
os.close(self.quitpipe[0])
self.q.put(None)
def stop(self):
log.debug('Stopping process thread')
if self.quitpipe is not None:
os.close(self.quitpipe[1])
def _status(self, filename: str, status: TrackStatus) -> None:
if self.on_status is not None:
self.on_status(filename, status)
class EncodeThread(threading.Thread):
def __init__(self, release: Optional[Release], discno: int, q: ProcessQueue) -> None:
super().__init__(name='EncodeThread')
self.release = release
self.discno = discno
self.q = q
self.on_status: Optional[StatusCallback] = None
self.on_complete: Optional[CompleteCallback] = None
def encode(self, filename: str) -> None:
log.info('Encoding %s to flac', filename)
cmd = ['flac', '--silent', filename]
log.debug('Running command %s', cmd)
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
)
assert p.stdout
codec = sys.getfilesystemencoding()
while 1:
d = p.stdout.readline().decode(codec, 'replace')
if not d:
break
log.debug(d.rstrip())
p.wait()
log.info('flac exited with status %d', p.returncode)
def tag(self, filename: str) -> None:
basename = os.path.splitext(filename)[0]
filename = basename + '.flac'
log.info('Adding tags to %s', filename)
trackno = int(filename[5:7])
if not self.release:
log.warning('Cannot tag track %d: no release metadata', trackno)
return
artist = self.release.artist_credit[0].artist
album = self.release.title
medium = self.release.medium_list[self.discno]
track = medium.track_list[trackno - 1]
tags = mutagen.File(filename, easy=True)
assert tags
tags['tracknumber'] = str(trackno)
tags['artist'] = tags['albumartist'] = artist.name
tags['album'] = album
tags['title'] = track.recording.title
tags['date'] = str(self.release.date.year)
if len(self.release.medium_list) > 1:
tags['discnumber'] = str(self.discno + 1)
tags['musicbrainz_albumid'] = self.release.id
tags['musicbrainz_artistid'] = artist.id
tags['musicbrainz_releasetrackid'] = track.id
tags.save()
newname = '{track:02} {artist} - {title}.flac'.format(
track=trackno, artist=artist.name, title=track.recording.title
)
log.info('Renaming "%s" to "%s"', filename, newname)
os.rename(filename, safe_name(newname))
def run(self) -> None:
while 1:
filename = self.q.get()
if filename is None:
break
try:
self.encode(filename)
self.tag(filename)
except Exception as e: # pylint: disable=broad-except
log.error('Error encoding/tagging %s: %s', filename, e)
finally:
os.unlink(filename)
self._status(filename, TrackStatus.done)
self._complete()
def _complete(self) -> None:
if self.on_complete is not None:
self.on_complete('Finished encoding files', False)
def _status(self, filename: str, status: TrackStatus) -> None:
if self.on_status is not None:
self.on_status(filename, status)
class Ripper:
def __init__(
self,
device: DiscDrive,
release: Optional[Release],
discno: int,
tracks: Optional[TrackList] = None,
use_libcdio: bool = False,
) -> None:
self.device = device
self.release = release
self.discno = discno
self.tracks = tracks
self.use_libcdio = use_libcdio
self._status_queue: StatusQueue = queue.Queue()
q: ProcessQueue = queue.Queue()
self._rip_thread = RipThread(
self.device, self.tracks, self.use_libcdio
)
self._rip_thread.on_complete = self.on_complete
self._process_thread = ProcessThread(q)
self._process_thread.on_status = self.on_status
self._encode_thread = EncodeThread(self.release, self.discno, q)
self._encode_thread.on_status = self.on_status
self._encode_thread.on_complete = self.on_complete
def __enter__(self,) -> Ripper:
return self
def __exit__(
self,
exc_type: Type[Exception],
exc_value: Exception,
tb: TracebackType,
) -> None:
...
def rip(self) -> Iterable[StatusMessage]:
start = time.monotonic()
if self.release:
dirname = safe_name(
f'{self.release.artist_credit_phrase} - {self.release.title}'
)
else:
dirname = 'Unknown Artist - Unknown Album'
if not os.path.isdir(dirname):
log.info('Creating directory: %s', dirname)
os.mkdir(dirname)
os.chdir(dirname)
if self.release and len(self.release.medium_list) > 1:
subdirname = f'Disc {self.discno + 1}'
if not os.path.isdir(subdirname):
log.info('Creating directory: %s', subdirname)
os.mkdir(subdirname)
os.chdir(subdirname)
for filename in glob.glob('track*.cdda.wav'):
os.unlink(filename)
if self.release and Image is not None:
threading.Thread(target=self.fetch_album_art).start()
self._process_thread.start()
self._encode_thread.start()
self._rip_thread.start()
while 1:
track, status = self._status_queue.get()
yield track, status
if track is None:
break
self._process_thread.stop()
while 1:
track, status = self._status_queue.get()
yield track, status
if track is None:
break
end = time.monotonic()
log.info('Ripping/encoding took %d seconds', end - start)
def on_status(self, filename: str, status: TrackStatus) -> None:
log.debug('Filename: %s, Status: %s', filename, status.name)
if filename.startswith('track') and filename.endswith('.cdda.wav'):
track = int(filename[5:-9])
self._status_queue.put((track, status))
def on_complete(self, message: str, is_err: bool) -> None:
self._status_queue.put((None, (message, is_err)))
def fetch_album_art(self) -> None:
assert self.release
assert Image
try:
data = fetch_album_art(self.release)
except AlbumArtNotFound:
log.error('No album art found for %s', self.release.title)
return
buf = io.BytesIO(data)
img = Image.open(buf)
img.thumbnail((1000, 1000))
with open('folder.jpg', 'wb') as f:
img.save(f)
def safe_name(name: str) -> str:
for k, v in FILENAME_SAFE_MAP.items():
name = name.replace(k, v)
return name