99 NamedTuple ,
1010 Any ,
1111 Set ,
12- Generic ,
13- TypeVar ,
1412 Optional ,
1513 Iterator ,
1614 AsyncIterator ,
17- Awaitable ,
18- TypeVar ,
1915 Union ,
2016)
2117
@@ -107,17 +103,19 @@ class ElementState(NamedTuple):
107103
108104class Layout (AbstractLayout ):
109105
110- __slots__ = "_event_handlers"
106+ __slots__ = "_event_handlers" , "_rendering_queue"
111107
112108 def __init__ (
113109 self , root : "AbstractElement" , loop : Optional [asyncio .AbstractEventLoop ] = None
114110 ) -> None :
115111 super ().__init__ (root , loop )
116112 self ._event_handlers : Dict [str , EventHandler ] = {}
113+ self ._rendering_queue = _ElementQueue ()
114+ self ._rendering_queue .put (self ._root )
117115
118116 def update (self , element : "AbstractElement" ) -> None :
119117 try :
120- self ._rendering_queue .put (self . _create_layout_update ( element ) )
118+ self ._rendering_queue .put (element )
121119 except CannotAccessResource :
122120 logger .info (f"Did not update { element } - resources of { self } are closed" )
123121
@@ -131,16 +129,10 @@ async def trigger(self, event: LayoutEvent) -> None:
131129 await handler (event .data )
132130
133131 async def render (self ) -> Dict [str , Any ]:
134- return await self ._rendering_queue .get ()
135-
136- @async_resource
137- async def _rendering_queue (self ) -> AsyncIterator ["FutureQueue[LayoutUpdate]" ]:
138- queue : FutureQueue [LayoutUpdate ] = FutureQueue ()
139- queue .put (self ._create_layout_update (self ._root ))
140- try :
141- yield queue
142- finally :
143- await queue .cancel ()
132+ while True :
133+ element = await self ._rendering_queue .get ()
134+ if element .id in self ._element_states :
135+ return await self ._create_layout_update (element )
144136
145137 @async_resource
146138 async def _element_states (self ) -> AsyncIterator [ElementState ]:
@@ -289,47 +281,21 @@ def _render_with_life_cycle_hook(element_state: ElementState) -> Iterator[None]:
289281 element_state .life_cycle_hook .unset_current ()
290282
291283
292- # future queue type
293- _FQT = TypeVar ("_FQT" )
284+ class _ElementQueue :
294285
286+ __slots__ = "_queue" , "_pending"
295287
296- class FutureQueue (Generic [_FQT ]):
297- """A queue which returns the result of futures as they complete."""
288+ def __init__ (self ):
289+ self ._queue = asyncio .Queue ()
290+ self ._pending = set ()
298291
299- def __init__ (self ) -> None :
300- self ._loop = asyncio .get_event_loop ()
301- self ._pending : Dict [int , asyncio .Future [_FQT ]] = {}
302- self ._done : asyncio .Queue [asyncio .Future [_FQT ]] = asyncio .Queue ()
303-
304- def put (self , awaitable : Awaitable [_FQT ]) -> None :
305- """Put an awaitable in the queue
306-
307- The result will be returned by a call to :meth:`FutureQueue.get` only
308- when the awaitable has completed.
309- """
310-
311- async def wrapper () -> None :
312- future = asyncio .ensure_future (awaitable )
313- self ._pending [id (future )] = future
314- try :
315- await future
316- finally :
317- del self ._pending [id (future )]
318- await self ._done .put (future )
319- return None
320-
321- asyncio .run_coroutine_threadsafe (wrapper (), self ._loop )
292+ def put (self , element : AbstractElement ) -> None :
293+ if element .id not in self ._pending :
294+ self ._pending .add (element .id )
295+ self ._queue .put_nowait (element )
322296 return None
323297
324- async def get (self ) -> _FQT :
325- """Get the result of a queued awaitable that has completed."""
326- future = await self ._done .get ()
327- return await future
328-
329- async def cancel (self ) -> None :
330- for f in self ._pending .values ():
331- f .cancel ()
332- if self ._pending :
333- await asyncio .wait (
334- list (self ._pending .values ()), return_when = asyncio .ALL_COMPLETED
335- )
298+ async def get (self ) -> AbstractElement :
299+ element = await self ._queue .get ()
300+ self ._pending .remove (element .id )
301+ return element
0 commit comments