33from __future__ import annotations
44
55# std imports
6+ import zlib
67import asyncio
78import logging
89import weakref
@@ -66,6 +67,12 @@ def __init__(
6667 self .writer : Optional [Union [TelnetWriter , TelnetWriterUnicode ]] = None
6768 self ._limit = limit
6869
70+ # MCCP2: server→client decompression
71+ self ._mccp2_decompressor : Optional [zlib .Decompress ] = None
72+ # MCCP3: client→server compression
73+ self ._mccp3_compressor : Optional [zlib .Compress ] = None
74+ self ._mccp3_orig_write : Any = None
75+
6976 # High-throughput receive pipeline
7077 self ._rx_queue : collections .deque [bytes ] = collections .deque ()
7178 self ._rx_bytes = 0
@@ -93,6 +100,11 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
93100 return
94101 self ._closing = True
95102
103+ # Clean up MCCP compressors/decompressors
104+ self ._mccp2_decompressor = None
105+ self ._mccp3_compressor = None
106+ self ._mccp3_orig_write = None
107+
96108 # Drain any pending rx data before signalling EOF to prevent
97109 # _process_rx from calling feed_data() after feed_eof().
98110 self ._rx_queue .clear ()
@@ -343,6 +355,26 @@ def _process_chunk(self, data: bytes) -> bool:
343355 """Process a chunk of received bytes; return True if any IAC/SB cmd observed."""
344356 self ._last_received = datetime .datetime .now ()
345357
358+ # MCCP2: decompress server→client data when active
359+ if self ._mccp2_decompressor is not None :
360+ try :
361+ data = self ._mccp2_decompressor .decompress (data )
362+ except zlib .error :
363+ self .log .warning ("MCCP2 decompression error, disabling" )
364+ self ._mccp2_end ()
365+ return False
366+ if self ._mccp2_decompressor .eof :
367+ unused = self ._mccp2_decompressor .unused_data
368+ self ._mccp2_end ()
369+ cmd = self ._process_chunk_inner (data )
370+ if unused :
371+ cmd = self ._process_chunk (unused ) or cmd
372+ return cmd
373+
374+ return self ._process_chunk_inner (data )
375+
376+ def _process_chunk_inner (self , data : bytes ) -> bool :
377+ """Inner chunk processing with IAC interpretation and mid-chunk MCCP2 detection."""
346378 try :
347379 mode = self .writer .mode
348380 except Exception :
@@ -355,7 +387,22 @@ def _process_chunk(self, data: bytes) -> bool:
355387 else :
356388 slc_special = None
357389
358- return _process_data_chunk (data , self .writer , self .reader , slc_special , self .log .warning )
390+ cmd_received = _process_data_chunk (
391+ data , self .writer , self .reader , slc_special , self .log .warning
392+ )
393+
394+ if self .writer ._compressed_remainder is not None :
395+ remainder = self .writer ._compressed_remainder
396+ self .writer ._compressed_remainder = None
397+ self ._mccp2_start ()
398+ if remainder :
399+ cmd_received = self ._process_chunk (remainder ) or cmd_received
400+
401+ # MCCP3: start compressor when writer signals activation
402+ if self .writer .mccp3_active and self ._mccp3_compressor is None :
403+ self ._mccp3_start ()
404+
405+ return cmd_received
359406
360407 async def _process_rx (self ) -> None :
361408 """Async processor for receive queue that yields control and applies backpressure."""
@@ -395,6 +442,51 @@ async def _process_rx(self) -> None:
395442 if any_cmd and not self ._waiter_connected .done ():
396443 self ._check_negotiation_timer ()
397444
445+ def _mccp2_start (self ) -> None :
446+ """Start MCCP2 decompression of server→client data."""
447+ self ._mccp2_decompressor = zlib .decompressobj ()
448+ self .log .debug ("MCCP2 decompression started (server→client)" )
449+
450+ def _mccp2_end (self ) -> None :
451+ """Stop MCCP2 decompression."""
452+ self ._mccp2_decompressor = None
453+ self .writer .mccp2_active = False
454+ self .log .debug ("MCCP2 decompression ended (server→client)" )
455+
456+ def _mccp3_start (self ) -> None :
457+ """Start MCCP3 compression of client→server data."""
458+ self ._mccp3_compressor = zlib .compressobj (
459+ zlib .Z_BEST_COMPRESSION , zlib .DEFLATED , 12 , 5 , zlib .Z_DEFAULT_STRATEGY
460+ )
461+ # Wrap transport.write so all outbound bytes are compressed
462+ transport = self .writer ._transport
463+ orig_write = transport .write
464+
465+ def compressed_write (data : bytes ) -> None :
466+ if self ._mccp3_compressor is not None :
467+ compressed = self ._mccp3_compressor .compress (data )
468+ compressed += self ._mccp3_compressor .flush (zlib .Z_SYNC_FLUSH )
469+ orig_write (compressed )
470+ else :
471+ orig_write (data )
472+
473+ transport .write = compressed_write # type: ignore[assignment]
474+ self ._mccp3_orig_write = orig_write
475+ self .log .debug ("MCCP3 compression started (client→server)" )
476+
477+ def _mccp3_end (self ) -> None :
478+ """Stop MCCP3 compression, flush Z_FINISH."""
479+ if self ._mccp3_compressor is not None :
480+ if not self .writer .is_closing ():
481+ self ._mccp3_orig_write (
482+ self ._mccp3_compressor .flush (zlib .Z_FINISH )
483+ )
484+ self ._mccp3_compressor = None
485+ # Restore original transport.write
486+ self .writer ._transport .write = self ._mccp3_orig_write # type: ignore[method-assign]
487+ self .writer .mccp3_active = False
488+ self .log .debug ("MCCP3 compression ended (client→server)" )
489+
398490 def _check_negotiation_timer (self ) -> None :
399491 self ._check_later .cancel ()
400492 self ._tasks .remove (self ._check_later )
0 commit comments