11from __future__ import annotations
22
3+ import asyncio
34import base64
45import logging
56import re
67from datetime import datetime
8+ from typing import AsyncGenerator
79from typing import Dict
810from typing import List
11+ from typing import Optional
912from typing import Union
1013
1114import aiohttp
1720__all__ = ("Client" ,)
1821
1922AM_URL_REGEX = re .compile (
20- r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^?]+) " ,
23+ r"https?://music\ .apple\ .com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+? )/(?P<id>[^/ ?]+?)(?:/)?(?:\?.*)?$ " ,
2124)
2225AM_SINGLE_IN_ALBUM_REGEX = re .compile (
23- r"https?://music.apple.com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>. +)(\?i=)(?P<id2>.+) " ,
26+ r"https?://music\ .apple\ .com/(?P<country>[a-zA-Z]{2})/(?P<type>album|playlist|song|artist)/(?P<name>.+)/(?P<id>[^/?] +)(\?i=)(?P<id2>[^&]+)(?:&.*)?$ " ,
2427)
2528
2629AM_SCRIPT_REGEX = re .compile (r'<script.*?src="(/assets/index-.*?)"' )
@@ -35,12 +38,14 @@ class Client:
3538 and translating it to a valid Lavalink track. No client auth is required here.
3639 """
3740
38- def __init__ (self ) -> None :
41+ def __init__ (self , * , playlist_concurrency : int = 6 ) -> None :
3942 self .expiry : datetime = datetime (1970 , 1 , 1 )
4043 self .token : str = ""
4144 self .headers : Dict [str , str ] = {}
4245 self .session : aiohttp .ClientSession = None # type: ignore
4346 self ._log = logging .getLogger (__name__ )
47+ # Concurrency knob for parallel playlist page retrieval
48+ self ._playlist_concurrency = max (1 , playlist_concurrency )
4449
4550 async def _set_session (self , session : aiohttp .ClientSession ) -> None :
4651 self .session = session
@@ -167,25 +172,127 @@ async def search(self, query: str) -> Union[Album, Playlist, Song, Artist]:
167172 "This playlist is empty and therefore cannot be queued." ,
168173 )
169174
170- _next = track_data .get ("next" )
171- if _next :
172- next_page_url = AM_BASE_URL + _next
173-
174- while next_page_url is not None :
175- resp = await self .session .get (next_page_url , headers = self .headers )
175+ # Apple Music uses cursor pagination with 'next'. We'll fetch subsequent pages
176+ # concurrently by first collecting cursors in rolling waves.
177+ next_cursor = track_data .get ("next" )
178+ semaphore = asyncio .Semaphore (self ._playlist_concurrency )
176179
180+ async def fetch_page (url : str ) -> List [Song ]:
181+ async with semaphore :
182+ resp = await self .session .get (url , headers = self .headers )
177183 if resp .status != 200 :
178- raise AppleMusicRequestException (
179- f"Error while fetching results: { resp .status } { resp .reason } " ,
180- )
184+ if self ._log :
185+ self ._log .warning (
186+ f"Apple Music page fetch failed { resp .status } { resp .reason } for { url } " ,
187+ )
188+ return []
189+ pj : dict = await resp .json (loads = json .loads )
190+ songs = [Song (track ) for track in pj .get ("data" , [])]
191+ # Return songs; we will look for pj.get('next') in streaming iterator variant
192+ return songs , pj .get ("next" ) # type: ignore
193+
194+ # We'll implement a wave-based approach similar to Spotify but need to follow cursors.
195+ # Because we cannot know all cursors upfront, we'll iteratively fetch waves.
196+ waves : List [List [Song ]] = []
197+ cursors : List [str ] = []
198+ if next_cursor :
199+ cursors .append (next_cursor )
200+
201+ # Limit total waves to avoid infinite loops in malformed responses
202+ max_waves = 50
203+ wave_size = self ._playlist_concurrency * 2
204+ wave_counter = 0
205+ while cursors and wave_counter < max_waves :
206+ current = cursors [:wave_size ]
207+ cursors = cursors [wave_size :]
208+ tasks = [
209+ fetch_page (AM_BASE_URL + cursor ) for cursor in current # type: ignore[arg-type]
210+ ]
211+ results = await asyncio .gather (* tasks , return_exceptions = True )
212+ for res in results :
213+ if isinstance (res , tuple ): # (songs, next)
214+ songs , nxt = res
215+ if songs :
216+ waves .append (songs )
217+ if nxt :
218+ cursors .append (nxt )
219+ wave_counter += 1
220+
221+ for w in waves :
222+ album_tracks .extend (w )
223+
224+ return Playlist (data , album_tracks )
181225
182- next_data : dict = await resp .json (loads = json .loads )
183- album_tracks .extend (Song (track ) for track in next_data ["data" ])
226+ async def iter_playlist_tracks (
227+ self ,
228+ * ,
229+ query : str ,
230+ batch_size : int = 100 ,
231+ ) -> AsyncGenerator [List [Song ], None ]:
232+ """Stream Apple Music playlist tracks in batches.
233+
234+ Parameters
235+ ----------
236+ query: str
237+ Apple Music playlist URL.
238+ batch_size: int
239+ Logical grouping size for yielded batches.
240+ """
241+ if not self .token or datetime .utcnow () > self .expiry :
242+ await self .request_token ()
184243
185- _next = next_data .get ("next" )
186- if _next :
187- next_page_url = AM_BASE_URL + _next
188- else :
189- next_page_url = None
244+ result = AM_URL_REGEX .match (query )
245+ if not result or result .group ("type" ) != "playlist" :
246+ raise InvalidAppleMusicURL ("Provided query is not a valid Apple Music playlist URL." )
190247
191- return Playlist (data , album_tracks )
248+ country = result .group ("country" )
249+ playlist_id = result .group ("id" )
250+ request_url = AM_REQ_URL .format (country = country , type = "playlist" , id = playlist_id )
251+ resp = await self .session .get (request_url , headers = self .headers )
252+ if resp .status != 200 :
253+ raise AppleMusicRequestException (
254+ f"Error while fetching results: { resp .status } { resp .reason } " ,
255+ )
256+ data : dict = await resp .json (loads = json .loads )
257+ playlist_data = data ["data" ][0 ]
258+ track_data : dict = playlist_data ["relationships" ]["tracks" ]
259+
260+ first_page_tracks = [Song (track ) for track in track_data ["data" ]]
261+ for i in range (0 , len (first_page_tracks ), batch_size ):
262+ yield first_page_tracks [i : i + batch_size ]
263+
264+ next_cursor = track_data .get ("next" )
265+ semaphore = asyncio .Semaphore (self ._playlist_concurrency )
266+
267+ async def fetch (cursor : str ) -> tuple [List [Song ], Optional [str ]]:
268+ url = AM_BASE_URL + cursor
269+ async with semaphore :
270+ r = await self .session .get (url , headers = self .headers )
271+ if r .status != 200 :
272+ if self ._log :
273+ self ._log .warning (
274+ f"Skipping Apple Music page due to { r .status } { r .reason } " ,
275+ )
276+ return [], None
277+ pj : dict = await r .json (loads = json .loads )
278+ songs = [Song (track ) for track in pj .get ("data" , [])]
279+ return songs , pj .get ("next" )
280+
281+ # Rolling waves of fetches following cursor chain
282+ max_waves = 50
283+ wave_size = self ._playlist_concurrency * 2
284+ waves = 0
285+ cursors : List [str ] = []
286+ if next_cursor :
287+ cursors .append (next_cursor )
288+ while cursors and waves < max_waves :
289+ current = cursors [:wave_size ]
290+ cursors = cursors [wave_size :]
291+ results = await asyncio .gather (* [fetch (c ) for c in current ])
292+ for songs , nxt in results :
293+ if songs :
294+ for j in range (0 , len (songs ), batch_size ):
295+ yield songs [j : j + batch_size ]
296+ if nxt :
297+ cursors .append (nxt )
298+ waves += 1
0 commit comments