diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6038a4..aa44ad9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,9 +27,9 @@ jobs: python: "3.14" tox: "3.14" coverage: true - # - name: "pyright" - # python: "3.14" - # tox: "pyright" + - name: "pyright" + python: "3.14" + tox: "pyright" - name: "ruff check" python: "3.14" tox: "ruff-check" diff --git a/pyproject.toml b/pyproject.toml index 878623e..eaac69d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ ] dynamic = ["version"] dependencies = [ - "mopidy >= 4.0.0a7", + "mopidy >= 4.0.0rc2", "pykka >= 4.1", "requests >= 2.32", ] @@ -86,16 +86,7 @@ target-version = "py313" select = ["ALL"] ignore = [ # Add rules you want to ignore here - "ANN001", # missing-type-function-argument # TODO - "ANN002", # missing-type-args # TODO - "ANN003", # missing-type-kwargs # TODO - "ANN201", # missing-return-type-undocumented-public-function # TODO - "ANN202", # missing-return-type-private-function # TODO - "ANN204", # missing-return-type-special-method # TODO - "ANN205", # missing-return-type-static-method # TODO "D", # pydocstyle - "D203", # one-blank-line-before-class - "D213", # multi-line-summary-second-line "FIX002", # line-contains-todo # TODO "G004", # logging-f-string "TD002", # missing-todo-author @@ -108,6 +99,7 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "tests/*" = [ + "ANN", # flake8-annotations "ARG", # flake8-unused-arguments "D", # pydocstyle "S101", # assert diff --git a/src/mopidy_beets/__init__.py b/src/mopidy_beets/__init__.py index d2c7ce9..1b48c70 100644 --- a/src/mopidy_beets/__init__.py +++ b/src/mopidy_beets/__init__.py @@ -11,16 +11,16 @@ class Extension(ext.Extension): ext_name = "beets" version = __version__ - def get_default_config(self): + def get_default_config(self) -> str: return config.read(pathlib.Path(__file__).parent / "ext.conf") - def get_config_schema(self): + def get_config_schema(self) -> config.ConfigSchema: schema = super().get_config_schema() schema["hostname"] = config.Hostname() schema["port"] = config.Port() return schema - def setup(self, registry): + def setup(self, registry: ext.Registry) -> None: from mopidy_beets.actor import BeetsBackend # noqa: PLC0415 registry.add("backend", BeetsBackend) diff --git a/src/mopidy_beets/actor.py b/src/mopidy_beets/actor.py index cfe9aa3..eb415cf 100644 --- a/src/mopidy_beets/actor.py +++ b/src/mopidy_beets/actor.py @@ -1,21 +1,28 @@ +from __future__ import annotations + import logging -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar, override import pykka from mopidy import backend -from mopidy.types import UriScheme +from mopidy.types import Uri, UriScheme from .client import BeetsRemoteClient from .library import BeetsLibraryProvider +if TYPE_CHECKING: + from mopidy.audio import AudioProxy + from mopidy.config import Config + logger = logging.getLogger(__name__) class BeetsBackend(pykka.ThreadingActor, backend.Backend): uri_schemes: ClassVar[list[UriScheme]] = [UriScheme("beets")] - def __init__(self, config, audio): - super().__init__() + @override + def __init__(self, *, config: Config, audio: AudioProxy) -> None: + super().__init__(config=config, audio=audio) beets_endpoint = ( f"http://{config['beets']['hostname']}:{config['beets']['port']}" @@ -30,7 +37,8 @@ def __init__(self, config, audio): class BeetsPlaybackProvider(backend.PlaybackProvider): backend: BeetsBackend - def translate_uri(self, uri): + @override + def translate_uri(self, uri: Uri) -> Uri | None: track_id = uri.split(";")[1] logger.debug(f"Getting info for track {uri} with id {track_id}") return self.backend.beets_api.get_track_stream_url(track_id) diff --git a/src/mopidy_beets/browsers/__init__.py b/src/mopidy_beets/browsers/__init__.py index 1896c48..c9ce103 100644 --- a/src/mopidy_beets/browsers/__init__.py +++ b/src/mopidy_beets/browsers/__init__.py @@ -1,9 +1,19 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from mopidy.models import Ref + + from mopidy_beets.client import BeetsRemoteClient + + class GenericBrowserBase: - def __init__(self, ref, api): + def __init__(self, ref: Ref, api: BeetsRemoteClient) -> None: self.ref = ref self.api = api - def get_toplevel(self): + def get_toplevel(self) -> list[Ref]: """deliver the top level directories or tracks for this browser The result is a list of ``mopidy.models.Ref`` objects. @@ -11,7 +21,7 @@ def get_toplevel(self): """ raise NotImplementedError - def get_directory(self, key): + def get_directory(self, key: str) -> list[Ref]: """deliver the corresponding sub items for a given category key The result is a list of ``mopidy.models.Ref`` objects. diff --git a/src/mopidy_beets/browsers/albums.py b/src/mopidy_beets/browsers/albums.py index 75b864f..0964a11 100644 --- a/src/mopidy_beets/browsers/albums.py +++ b/src/mopidy_beets/browsers/albums.py @@ -1,40 +1,52 @@ -from mopidy import models +from __future__ import annotations + +from typing import ClassVar, override + +from mopidy.models import Album, Ref from mopidy_beets.browsers import GenericBrowserBase from mopidy_beets.translator import assemble_uri class AlbumsCategoryBrowser(GenericBrowserBase): - field = None - sort_fields = None - label_fields = None + field: ClassVar[str] + sort_fields: ClassVar[tuple[str, ...]] - def get_toplevel(self): + def get_toplevel(self) -> list[Ref]: keys = self.api.get_sorted_unique_album_attributes(self.field) return [ - models.Ref.directory( - name=str(key), uri=assemble_uri(self.ref.uri, id_value=key) + Ref.directory( + name=str(k), + uri=assemble_uri(self.ref.uri, id_value=k), ) - for key in keys + for k in sorted(keys) ] - def get_directory(self, key): + def get_directory(self, key: str) -> list[Ref]: albums = self.api.get_albums_by( [(self.field, key)], - True, # noqa: FBT003 - self.sort_fields, + exact_text=True, + sort_fields=self.sort_fields, ) return [ - models.Ref.album(uri=album.uri, name=self._get_label(album)) - for album in albums + Ref.album( + uri=a.uri, + name=self._get_label(a), + ) + for a in albums + if a.uri is not None ] + def _get_label(self, album: Album) -> str | None: + raise NotImplementedError + class AlbumsByArtistBrowser(AlbumsCategoryBrowser): field = "albumartist" sort_fields = ("original_year+", "year+", "album+") - def _get_label(self, album): + @override + def _get_label(self, album: Album) -> str | None: return album.name @@ -42,8 +54,9 @@ class AlbumsByGenreBrowser(AlbumsCategoryBrowser): field = "genre" sort_fields = ("albumartist", "original_year+", "year+", "album+") - def _get_label(self, album): - artists = " / ".join([artist.name for artist in album.artists]) + @override + def _get_label(self, album: Album) -> str | None: + artists = " / ".join([a.name for a in album.artists if a.name is not None]) if artists and album.date: return f"{artists} - {album.name} ({album.date.split('-')[0]})" if artists: @@ -61,8 +74,9 @@ class AlbumsByYearBrowser(AlbumsCategoryBrowser): "album+", ) - def _get_label(self, album): - artists = " / ".join([artist.name for artist in album.artists]) + @override + def _get_label(self, album: Album) -> str | None: + artists = " / ".join([a.name for a in album.artists if a.name is not None]) if artists: return f"{artists} - {album.name}" return album.name diff --git a/src/mopidy_beets/client.py b/src/mopidy_beets/client.py index c8bb06c..52714d9 100644 --- a/src/mopidy_beets/client.py +++ b/src/mopidy_beets/client.py @@ -1,36 +1,49 @@ +from __future__ import annotations + import logging import re import time -import urllib.error import urllib.parse import urllib.request from http import HTTPStatus +from typing import TYPE_CHECKING, Any import requests from mopidy import httpclient +from mopidy.models import Album, Track +from mopidy.types import DistinctField, Uri from requests.exceptions import RequestException import mopidy_beets from mopidy_beets.translator import parse_album, parse_track +if TYPE_CHECKING: + from collections.abc import Callable, Iterable + + from mopidy.config import ProxyConfig + logger = logging.getLogger(__name__) class cache: # noqa: N801 - # TODO: merge this to util library - - def __init__(self, ctl=8, ttl=3600): - self.cache = {} + def __init__( + self, + ctl: int = 8, + ttl: int = 3600, + ) -> None: + self.cache: dict[tuple[Any, ...], tuple[Any, float]] = {} self.ctl = ctl self.ttl = ttl self._call_count = 1 + self.func: Callable[..., Any] | None = None - def __call__(self, func): - def _memoized(*args): + def __call__[**P, R](self, func: Callable[P, R]) -> Callable[P, R]: + def _memoized(*args: P.args, **kwargs: P.kwargs) -> R: self.func = func + key = (args, tuple(sorted(kwargs.items()))) now = time.time() try: - value, last_update = self.cache[args] + value, last_update = self.cache[key] age = now - last_update if self._call_count >= self.ctl or age > self.ttl: self._call_count = 1 @@ -39,12 +52,12 @@ def _memoized(*args): self._call_count += 1 except (KeyError, AttributeError): - value = self.func(*args) - self.cache[args] = (value, now) + value = func(*args, **kwargs) + self.cache[key] = (value, now) return value except TypeError: - return self.func(*args) + return func(*args, **kwargs) else: return value @@ -53,51 +66,86 @@ def _memoized(*args): class BeetsRemoteClient: - def __init__(self, endpoint, proxy_config, request_timeout=4): + def __init__( + self, + endpoint: str, + proxy_config: ProxyConfig, + request_timeout: int = 4, + ) -> None: super().__init__() self._request_timeout = request_timeout self.api = self._get_session(proxy_config) self.api_endpoint = endpoint logger.info("Configured for Beets remote library %s", endpoint) - def _get_session(self, proxy_config): - proxy = httpclient.format_proxy(proxy_config) - full_user_agent = httpclient.format_user_agent( - f"{mopidy_beets.Extension.dist_name}/{mopidy_beets.__version__}" - ) + def _get_session(self, proxy_config: ProxyConfig) -> requests.Session: session = requests.Session() - session.proxies.update({"http": proxy, "https": proxy}) - session.headers.update({"user-agent": full_user_agent}) + session.headers["user-agent"] = httpclient.format_user_agent( + f"{mopidy_beets.Extension.dist_name}/{mopidy_beets.Extension.version}" + ) + if proxy := httpclient.format_proxy(proxy_config): + session.proxies.update({"http": proxy, "https": proxy}) # pyright: ignore[reportCallIssue] return session @cache() - def get_tracks(self): - track_ids = self._get("/item/").get("item_ids") or [] - return [self.get_track(track_id) for track_id in track_ids] + def get_tracks(self) -> list[Track]: + if (result := self._get("/item/")) is None: + return [] + track_ids = result.get("item_ids") or [] + return [track for track_id in track_ids if (track := self.get_track(track_id))] @cache(ctl=16) - def get_track(self, track_id): - return parse_track(self._get(f"/item/{track_id}"), self) + def get_track(self, track_id: str | int) -> Track | None: + if (result := self._get(f"/item/{track_id}")) is None: + return None + return parse_track(result, self) @cache(ctl=16) - def get_album(self, album_id): - return parse_album(self._get(f"/album/{album_id}"), self) + def get_album(self, album_id: str | int) -> Album | None: + if (result := self._get(f"/album/{album_id}")) is None: + return None + return parse_album(result, self) @cache() - def get_tracks_by(self, attributes, exact_text, sort_fields): + def get_tracks_by( + self, + attributes: list[tuple[str, str | int]], + *, + exact_text: bool, + sort_fields: Iterable[str], + ) -> list[Track]: tracks = self._get_objects_by_attribute( - "/item", attributes, exact_text, sort_fields + "/item", + attributes, + exact_text=exact_text, + sort_fields=sort_fields, ) return self._parse_multiple_tracks(tracks) @cache() - def get_albums_by(self, attributes, exact_text, sort_fields): + def get_albums_by( + self, + attributes: list[tuple[str, str | int]], + *, + exact_text: bool, + sort_fields: Iterable[str], + ) -> list[Album]: albums = self._get_objects_by_attribute( - "/album", attributes, exact_text, sort_fields + "/album", + attributes, + exact_text=exact_text, + sort_fields=sort_fields, ) return self._parse_multiple_albums(albums) - def _get_objects_by_attribute(self, base_path, attributes, exact_text, sort_fields): # noqa: C901 + def _get_objects_by_attribute( # noqa: C901, PLR0912 + self, + base_path: str, + attributes: list[tuple[str, str | int]], + *, + exact_text: bool, + sort_fields: Iterable[str], + ) -> list[dict[str, Any]]: """The beets web-api accepts queries like: /item/query/album_id:183/track:2 /item/query/album:Foo @@ -111,9 +159,7 @@ def _get_objects_by_attribute(self, base_path, attributes, exact_text, sort_fiel @param exact_text: True for exact matches, False for case-insensitive 'is in' matches (only relevant for text values - not integers) - @type exact_text: bool @param sort_fields: fieldnames, each followed by '+' or '-' - @type sort_fields: list of strings @rtype: list of json datasets describing tracks or albums """ # assemble the query string @@ -121,7 +167,7 @@ def _get_objects_by_attribute(self, base_path, attributes, exact_text, sort_fiel # only used for 'exact_text' exact_query_list = [] - def quote_and_encode(text): + def quote_and_encode(text: str | float) -> str: if isinstance(text, (int, float)): text = str(text) # Escape colons. The beets web API uses the colon to separate @@ -161,8 +207,11 @@ def quote_and_encode(text): logger.info("Beets - invalid sorting field ignore: %s", sort_field) query_string = "/".join(query_parts) query_url = f"{base_path}/query/{query_string}" + logger.debug("Beets query: %s", query_url) - items = self._get(query_url)["results"] + if (result := self._get(query_url)) is None: + return [] + items = result["results"] if exact_text: # verify that text attributes do not just test 'is in', but match # equality @@ -176,18 +225,17 @@ def quote_and_encode(text): return items @cache() - def get_artists(self): + def get_artists(self) -> list[str]: """returns all artists of one or more tracks""" - names = self._get("/artist/")["artist_names"] - names.sort() - # remove empty names - return [name for name in names if name] + if (result := self._get("/artist/")) is None: + return [] + return [name for name in sorted(result["artist_names"]) if name] - def get_sorted_unique_track_attributes(self, field): + def get_sorted_unique_track_attributes(self, field: DistinctField) -> set[str]: sort_field = {"albumartist": "albumartist_sort"}.get(field, field) return self._get_unique_attribute_values("/item", field, sort_field) - def get_sorted_unique_album_attributes(self, field): + def get_sorted_unique_album_attributes(self, field: str) -> set[str]: # Modern Beets exposes the multi-valued "genres" on albums (singular # "genre" was removed after the 2.x series); fall through to the # plural key so both /album/values/... and the legacy fallback work. @@ -196,45 +244,24 @@ def get_sorted_unique_album_attributes(self, field): return self._get_unique_attribute_values("/album", field, sort_field) @cache(ctl=32) - def _get_unique_attribute_values(self, base_url, field, sort_field): - """returns all artists, genres, ... of tracks or albums""" - if not hasattr(self, "__legacy_beets_api_detected"): - try: - result = self._get( - f"{base_url}/values/{field}?sort_key={sort_field}", - raise_not_found=True, - ) - except KeyError: - # The above URL was added to the Beets API after v1.3.17 - # Probably we are working against an older version. - logger.warning( - "Failed to use the /item/unique/KEY feature of the Beets " - "API (introduced in v1.3.18). Falling back to the " - "slower and more resource intensive manual approach. " - "Please upgrade Beets, if possible." - ) - # Warn only once and use the manual approach for all future - # requests. - self.__legacy_beets_api_detected = True - # continue below with the fallback - else: - return result["values"] - # Fallback: use manual filtering (requires too much time and memory for - # most collections). - sorted_items = self._get(f"{base_url}/query/{sort_field}+")["results"] - # extract the wanted field and remove all duplicates - unique_values = [] - for item in sorted_items: - value = item[field] - if not unique_values or (value != unique_values[-1]): - unique_values.append(value) - return unique_values - - def get_track_stream_url(self, track_id): - return f"{self.api_endpoint}/item/{track_id}/file" + def _get_unique_attribute_values( + self, + base_url: str, + field: str, + sort_field: str, + ) -> set[str]: + """Returns all artists, genres, ... of tracks or albums""" + result = self._get( + f"{base_url}/values/{field}?sort_key={sort_field}", + raise_not_found=True, + ) + return set(result["values"]) if result else set() + + def get_track_stream_url(self, track_id: str) -> Uri: + return Uri(f"{self.api_endpoint}/item/{track_id}/file") @cache(ctl=32) - def get_album_art_url(self, album_id): + def get_album_art_url(self, album_id: str) -> Uri | None: # Sadly we cannot determine, if the Beets library really contains album # art. Thus we need to ask for it and check the status code. url = f"{self.api_endpoint}/album/{album_id}/art" @@ -244,9 +271,9 @@ def get_album_art_url(self, album_id): # DNS problem or similar return None request.close() - return url if request.getcode() == HTTPStatus.OK else None + return Uri(url) if request.getcode() == HTTPStatus.OK else None - def _get(self, url, *, raise_not_found=False): + def _get(self, url: str, *, raise_not_found: bool = False) -> dict[str, Any] | None: url = self.api_endpoint + url logger.debug(f"Beets - requesting {url}") try: @@ -267,20 +294,28 @@ def _get(self, url, *, raise_not_found=False): return None return req.json() - def _parse_multiple_albums(self, album_datasets): - albums = [] + def _parse_multiple_albums( + self, + album_datasets: Iterable[dict[str, Any]] | None, + ) -> list[Album]: + albums = list[Album]() for dataset in album_datasets or []: try: - albums.append(parse_album(dataset, self)) + if album := parse_album(dataset, self): + albums.append(album) except (ValueError, KeyError) as exc: logger.info(f"Beets - Failed to parse album data: {exc}") return [album for album in albums if album] - def _parse_multiple_tracks(self, track_datasets): - tracks = [] + def _parse_multiple_tracks( + self, + track_datasets: Iterable[dict[str, Any]] | None, + ) -> list[Track]: + tracks = list[Track]() for dataset in track_datasets or []: try: - tracks.append(parse_track(dataset, self)) + if track := parse_track(dataset, self): + tracks.append(track) except (ValueError, KeyError) as exc: logger.info(f"Beets - Failed to parse track data: {exc}") return [track for track in tracks if track] diff --git a/src/mopidy_beets/library.py b/src/mopidy_beets/library.py index 129c2b1..cb54b34 100644 --- a/src/mopidy_beets/library.py +++ b/src/mopidy_beets/library.py @@ -1,11 +1,13 @@ +from __future__ import annotations + import logging import re -from typing import ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, override -from mopidy import backend, models -from mopidy.models import SearchResult +from mopidy import backend +from mopidy.models import Ref, SearchResult, Track +from mopidy.types import DistinctField, Query, SearchField, Uri -from mopidy_beets.browsers import GenericBrowserBase from mopidy_beets.browsers.albums import ( AlbumsByArtistBrowser, AlbumsByGenreBrowser, @@ -13,6 +15,11 @@ ) from mopidy_beets.translator import assemble_uri, parse_uri +if TYPE_CHECKING: + from mopidy_beets.actor import BeetsBackend + from mopidy_beets.browsers import GenericBrowserBase + from mopidy_beets.client import BeetsRemoteClient + logger = logging.getLogger(__name__) # match dates of the following format: @@ -23,26 +30,37 @@ class BeetsLibraryProvider(backend.LibraryProvider): - root_directory = models.Ref.directory(uri="beets:library", name="Beets library") + root_directory = Ref.directory( + uri=Uri("beets:library"), + name="Beets library", + ) root_categorie_list: ClassVar[list[tuple[str, str, type[GenericBrowserBase]]]] = [ ("albums-by-artist", "Albums by Artist", AlbumsByArtistBrowser), ("albums-by-genre", "Albums by Genre", AlbumsByGenreBrowser), ("albums-by-year", "Albums by Year", AlbumsByYearBrowser), ] - def __init__(self, *args, **kwargs): + backend: BeetsBackend + remote: BeetsRemoteClient + + @override + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + assert self.root_directory # noqa: S101 self.remote = self.backend.beets_api self.category_browsers = [] for key, label, browser_class in self.root_categorie_list: - ref = models.Ref.directory( - name=label, uri=assemble_uri(self.root_directory.uri, key) + ref = Ref.directory( + name=label, + uri=assemble_uri(self.root_directory.uri, key), ) browser = browser_class(ref, self.remote) self.category_browsers.append(browser) - def browse(self, uri): # noqa: PLR0911 + @override + def browse(self, uri: Uri) -> list[Ref]: # noqa: PLR0911 logger.debug("Browsing Beets at: %s", uri) + assert self.root_directory # noqa: S101 path, item_id = parse_uri(uri, uri_prefix=self.root_directory.uri) if path is None: logger.error("Beets - failed to parse uri: %s", uri) @@ -54,6 +72,8 @@ def browse(self, uri): # noqa: PLR0911 return refs if path == "album": # show an album + if item_id is None: + return [] try: album_id = int(item_id) except ValueError: @@ -61,12 +81,10 @@ def browse(self, uri): # noqa: PLR0911 return [] tracks = self.remote.get_tracks_by( [("album_id", album_id)], - True, # noqa: FBT003 - ["track+"], + exact_text=True, + sort_fields=["track+"], ) - return [ - models.Ref.track(uri=track.uri, name=track.name) for track in tracks - ] + return [Ref.track(uri=t.uri, name=t.name) for t in tracks] # show a generic category directory for browser in self.category_browsers: if ( @@ -79,7 +97,13 @@ def browse(self, uri): # noqa: PLR0911 logger.error("Beets - Invalid browse URI: %s / %s", uri, path) return [] - def search(self, query=None, uris=None, exact=False): # noqa: C901, FBT002, PLR0912 + @override + def search( # noqa: C901, PLR0912 + self, + query: Query[SearchField], + uris: list[Uri] | None = None, + exact: bool = False, + ) -> SearchResult: # TODO: restrict the result to 'uris' logger.debug('Beets Query (exact=%s) within "%s": %s', exact, uris, query) self._validate_query(query) @@ -109,7 +133,7 @@ def search(self, query=None, uris=None, exact=False): # noqa: C901, FBT002, PLR # supported date formats: YYYY, YYYY-MM, YYYY-MM-DD # Days and months may consist of one or two digits. # A slash (instead of a dash) is acceptable as a separator. - match = DATE_REGEX.search(val) + match = DATE_REGEX.search(str(val)) if match: # remove None values for key, value in match.groupdict().items(): @@ -125,59 +149,68 @@ def search(self, query=None, uris=None, exact=False): # noqa: C901, FBT002, PLR logger.info("Beets: ignoring unknown query key: %s", field) break logger.debug("Beets search query: %s", search_list) - tracks = self.remote.get_tracks_by(search_list, exact, []) + tracks = self.remote.get_tracks_by( + search_list, + exact_text=exact, + sort_fields=[], + ) uri = "-".join( [ item if isinstance(item, str) else "=".join(map(str, item)) for item in search_list ] ) - return SearchResult(uri="beets:search-" + uri, tracks=tracks) + return SearchResult( + uri=Uri(f"beets:search-{uri}"), + tracks=tuple(tracks), + ) - def lookup(self, uri=None, uris=None): - logger.debug("Beets lookup: %s", uri or uris) - if uri: - # the older method (mopidy < 1.0): return a list of tracks - # handle one or more tracks given with multiple semicolons - logger.debug("Beets lookup: %s", uri) - path, item_id = parse_uri(uri, uri_prefix=self.root_directory.uri) - if path == "track": - tracks = [self.remote.get_track(item_id)] - elif path == "album": - tracks = self.remote.get_tracks_by( - [("album_id", item_id)], - True, # noqa: FBT003 - ("disc+", "track+"), - ) - elif path == "artist": - artist_tracks = self.remote.get_tracks_by( - [("artist", item_id)], - True, # noqa: FBT003 - [], - ) - composer_tracks = self.remote.get_tracks_by( - [("composer", item_id)], - True, # noqa: FBT003 - [], - ) - # Append composer tracks to the artist tracks (unique items). - tracks = list(set(artist_tracks + composer_tracks)) - tracks.sort( - key=lambda t: (t.date or 0, t.disc_no or 0, t.track_no or 0) - ) - else: - logger.info("Unknown Beets lookup URI: %s", uri) - tracks = [] - # remove occourences of None - return [track for track in tracks if track] - # the newer method (mopidy>=1.0): return a dict of uris and tracks - return {uri: self.lookup(uri=uri) for uri in uris} + @override + def lookup(self, uri: Uri) -> list[Track]: + logger.debug("Beets lookup: %s", uri) + assert self.root_directory # noqa: S101 + path, item_id = parse_uri(uri, uri_prefix=self.root_directory.uri) + if item_id is None: + logger.info(f"Unknown item ID in Beets lookup URI: {uri}") + return [] + if path == "track": + tracks = [self.remote.get_track(item_id)] + elif path == "album": + tracks = self.remote.get_tracks_by( + [("album_id", item_id)], + exact_text=True, + sort_fields=("disc+", "track+"), + ) + elif path == "artist": + artist_tracks = self.remote.get_tracks_by( + [("artist", item_id)], + exact_text=True, + sort_fields=[], + ) + composer_tracks = self.remote.get_tracks_by( + [("composer", item_id)], + exact_text=True, + sort_fields=[], + ) + # Append composer tracks to the artist tracks (unique items). + tracks = list(set(artist_tracks + composer_tracks)) + tracks.sort(key=lambda t: (t.date or 0, t.disc_no or 0, t.track_no or 0)) + else: + logger.info("Unknown Beets lookup URI: %s", uri) + tracks = [] + # remove occourences of None + return [t for t in tracks if t] - def get_distinct(self, field, query=None): + @override + def get_distinct( + self, + field: DistinctField, + query: Query[SearchField] | None = None, + ) -> set[str]: logger.debug("Beets distinct query: %s (uri=%s)", field, query) return self.remote.get_sorted_unique_track_attributes(field) - def _validate_query(self, query): + def _validate_query(self, query: Query[SearchField]) -> None: for values in query.values(): if not values: msg = "Missing query" diff --git a/src/mopidy_beets/translator.py b/src/mopidy_beets/translator.py index 63ba4cb..7d38422 100644 --- a/src/mopidy_beets/translator.py +++ b/src/mopidy_beets/translator.py @@ -1,23 +1,30 @@ +from __future__ import annotations + import logging -import urllib.error import urllib.parse -import urllib.request +from typing import TYPE_CHECKING, Any from mopidy.models import Album, Artist, Track +from mopidy.types import Uri + +if TYPE_CHECKING: + from collections.abc import Iterable + + from mopidy_beets.client import BeetsRemoteClient logger = logging.getLogger(__name__) -def parse_date(data): +def parse_date(data: dict[str, Any]) -> str | None: # use 'original' dates if possible if "original_year" in data: - day = data.get("original_day", None) - month = data.get("original_month", None) - year = data.get("original_year", None) + day = data.get("original_day") + month = data.get("original_month") + year = data.get("original_year") elif "year" in data: - day = data.get("day", None) - month = data.get("month", None) - year = data.get("year", None) + day = data.get("day") + month = data.get("month") + year = data.get("year") else: return None # mopidy accepts dates as 'YYYY' or 'YYYY-MM-DD' @@ -26,7 +33,11 @@ def parse_date(data): return f"{year:04d}" -def _apply_beets_mapping(target_class, mapping, data): +def _apply_beets_mapping[T]( + target_class: type[T], + mapping: dict[str, str], + data: dict[str, Any], +) -> T | None: """evaluate a mapping of target keys and their source keys or callables 'target_class' is the Mopidy model to be used for creating the item. @@ -35,25 +46,25 @@ def _apply_beets_mapping(target_class, mapping, data): * string: the key for the corresponding value in 'data' * callable: a function with a dict ('data') as its only parameter """ - kwargs = {} + kwargs: dict[str, Any] = {} for key, map_value in mapping.items(): if map_value is None: value = None elif callable(map_value): value = map_value(data) else: - value = data.get(map_value, None) + value = data.get(map_value) # ignore None, empty strings or zeros (e.g. for length) if value: kwargs[key] = value return target_class(**kwargs) if kwargs else None -def _filter_none(values): +def _filter_none[T](values: Iterable[T | None]) -> list[T]: return [value for value in values if value is not None] -def parse_artist(data, name_keyword): +def parse_artist(data: dict[str, Any], name_keyword: str) -> Artist | None: # see https://docs.mopidy.com/en/latest/api/models/#mopidy.models.Artist mapping = { "uri": lambda d: assemble_uri("beets:library:artist", id_value=d[name_keyword]), @@ -71,7 +82,7 @@ def parse_artist(data, name_keyword): return _apply_beets_mapping(Artist, mapping, data) -def parse_album(data, _api): +def parse_album(data: dict[str, Any], _api: BeetsRemoteClient) -> Album | None: # see https://docs.mopidy.com/en/latest/api/models/#mopidy.models.Album # The order of items is based on the above documentation. # Attributes without corresponding Beets data are mapped to 'None'. @@ -87,7 +98,7 @@ def parse_album(data, _api): return _apply_beets_mapping(Album, mapping, data) -def parse_track(data, api): +def parse_track(data: dict[str, Any], api: BeetsRemoteClient) -> Track | None: # see https://docs.mopidy.com/en/latest/api/models/#mopidy.models.Track # The order of items is based on the above documentation. # Attributes without corresponding Beets data are mapped to 'None'. @@ -113,8 +124,11 @@ def parse_track(data, api): return _apply_beets_mapping(Track, mapping, data) -def parse_uri(uri, uri_prefix=None): - """split a URI into an optional prefix and a value +def parse_uri( + uri: Uri, + uri_prefix: str | None = None, +) -> tuple[str | None, str | int | None]: + """Split a URI into an optional prefix and a value. The format of a uri is similar to this: beets:library:album;Foo%20Bar @@ -123,7 +137,7 @@ def parse_uri(uri, uri_prefix=None): uri_prefix (optional): * remove the string from the beginning of uri * the match is valid only if the prefix is separated from the - remainder of the URI with a color, an ampersand or it is equal + remainder of the URI with a colon, an ampersand or it is equal to the full URI * the function returns 'None' if the uri_prefix cannot be removed (you should consider this an error condition) @@ -159,13 +173,12 @@ def parse_uri(uri, uri_prefix=None): return result_uri, id_value -def assemble_uri(*args, **kwargs): +def assemble_uri(*args: str, id_value: str | None = None) -> Uri: base_path = ":".join(args) - id_value = kwargs.pop("id_value", None) if id_value is None: - return base_path + return Uri(base_path) # convert numbers and other non-strings if not isinstance(id_value, str): id_value = str(id_value) id_string = urllib.parse.quote(id_value) - return f"{base_path};{id_string}" + return Uri(f"{base_path};{id_string}") diff --git a/tests/helper_beets.py b/tests/helper_beets.py index 40b99b0..81f997c 100644 --- a/tests/helper_beets.py +++ b/tests/helper_beets.py @@ -72,6 +72,7 @@ def stop(self): self._server.shutdown() self._server_thread.join() self._server_thread = None + self.teardown_beets() def get_connection_pair(self): return (self._bind_host, self._bind_port) diff --git a/tests/test_extension.py b/tests/test_extension.py index c69350c..96745d8 100644 --- a/tests/test_extension.py +++ b/tests/test_extension.py @@ -33,7 +33,10 @@ def test_get_backend_classes(self): assert mock.call("backend", BeetsBackend) in registry.add.mock_calls def test_init_backend(self): - backend = BeetsBackend(self.get_config(), None) + backend = BeetsBackend( + config=self.get_config(), + audio=None, # ty:ignore[invalid-argument-type] + ) assert backend is not None backend.on_start() backend.on_stop()