1010from dataclasses import dataclass
1111
1212# local
13+ from . import slc as slc_module
1314from . import accessories
1415from ._session_context import TelnetSessionContext
1516
1617log = logging .getLogger (__name__ )
1718
1819# local
20+ from .telopt import LINEMODE # noqa: E402
1921from .accessories import TRACE # noqa: E402
2022from .stream_reader import TelnetReader , TelnetReaderUnicode # noqa: E402
2123from .stream_writer import TelnetWriter , TelnetWriterUnicode # noqa: E402
@@ -180,6 +182,93 @@ class _RawLoopState:
180182 reactivate_repl : bool = False
181183
182184
185+ class LinemodeBuffer :
186+ """
187+ Client-side line buffer for LINEMODE EDIT mode (RFC 1184 §3.1).
188+
189+ Accumulates characters typed by the user, applying local SLC editing functions (erase-char,
190+ erase-line, erase-word) and transmitting complete lines to the server. When TRAPSIG is enabled,
191+ signal characters (^C etc.) are sent as IAC commands instead of buffered.
192+
193+ :param slctab: The writer's current SLC character table.
194+ :param forwardmask: FORWARDMASK received from server, or None.
195+ :param trapsig: When True, signal characters are sent as IAC commands.
196+ """
197+
198+ def __init__ (
199+ self ,
200+ slctab : Dict [bytes , slc_module .SLC ],
201+ forwardmask : Optional [slc_module .Forwardmask ] = None ,
202+ trapsig : bool = False ,
203+ ) -> None :
204+ """Initialize LinemodeBuffer."""
205+ from .telopt import IP , AYT , BRK , EOF , IAC , SUSP , ABORT
206+
207+ self ._buf : list [str ] = []
208+ self .slctab = slctab
209+ self .forwardmask = forwardmask
210+ self .trapsig = trapsig
211+ self ._trapsig_map : Dict [bytes , bytes ] = {
212+ slc_module .SLC_IP : IAC + IP ,
213+ slc_module .SLC_ABORT : IAC + ABORT ,
214+ slc_module .SLC_SUSP : IAC + SUSP ,
215+ slc_module .SLC_EOF : IAC + EOF ,
216+ slc_module .SLC_BRK : IAC + BRK ,
217+ slc_module .SLC_AYT : IAC + AYT ,
218+ }
219+
220+ def _slc_val (self , func : bytes ) -> Optional [int ]:
221+ """Return the active byte value for SLC function, or None if unsupported."""
222+ defn = self .slctab .get (func )
223+ if defn is None or defn .nosupport :
224+ return None
225+ v = defn .val
226+ return ord (v ) if v and v != slc_module .theNULL else None
227+
228+ def feed (self , char : str ) -> Tuple [str , Optional [bytes ]]:
229+ """
230+ Feed one character into the buffer.
231+
232+ :returns: ``(echo, data)`` where ``echo`` is text to display locally
233+ (may be empty) and ``data`` is bytes to send to server, or None
234+ if buffering.
235+ """
236+ b = ord (char )
237+ if self .trapsig :
238+ for func , cmd in self ._trapsig_map .items ():
239+ if b == self ._slc_val (func ):
240+ return ("" , cmd )
241+ if b == self ._slc_val (slc_module .SLC_EC ):
242+ if self ._buf :
243+ self ._buf .pop ()
244+ return ("\b \b " , None )
245+ return ("" , None )
246+ if b == self ._slc_val (slc_module .SLC_EL ):
247+ n = len (self ._buf )
248+ self ._buf .clear ()
249+ return ("\b \b " * n , None )
250+ if b == self ._slc_val (slc_module .SLC_EW ):
251+ popped = 0
252+ # skip trailing spaces (POSIX VWERASE behaviour)
253+ while self ._buf and self ._buf [- 1 ] == " " :
254+ self ._buf .pop ()
255+ popped += 1
256+ while self ._buf and self ._buf [- 1 ] != " " :
257+ self ._buf .pop ()
258+ popped += 1
259+ return ("\b \b " * popped , None )
260+ if char in ("\r " , "\n " ):
261+ line = "" .join (self ._buf ) + char
262+ self ._buf .clear ()
263+ return (char , line .encode ())
264+ if self .forwardmask is not None and b in self .forwardmask :
265+ data = ("" .join (self ._buf ) + char ).encode ()
266+ self ._buf .clear ()
267+ return (char , data )
268+ self ._buf .append (char )
269+ return (char , None )
270+
271+
183272if sys .platform == "win32" :
184273
185274 async def telnet_client_shell (
@@ -314,10 +403,7 @@ def _server_will_sga(self) -> bool:
314403 from .telopt import SGA
315404
316405 w = self .telnet_writer
317- return bool (
318- w .client
319- and (w .remote_option .enabled (SGA ) or w .local_option .enabled (SGA ))
320- )
406+ return bool (w .client and (w .remote_option .enabled (SGA ) or w .local_option .enabled (SGA )))
321407
322408 def check_auto_mode (
323409 self , switched_to_raw : bool , last_will_echo : bool
@@ -334,6 +420,20 @@ def check_auto_mode(
334420 return None
335421 wecho = self .telnet_writer .will_echo
336422 wsga = self ._server_will_sga ()
423+ # LINEMODE EDIT: kernel must handle line editing; keep/restore cooked mode.
424+ # This takes priority over the SGA/ECHO raw-mode heuristics below.
425+ if (
426+ self .telnet_writer .local_option .enabled (LINEMODE )
427+ and self .telnet_writer .linemode .edit
428+ ):
429+ if switched_to_raw :
430+ assert self ._save_mode is not None
431+ self .set_mode (self ._save_mode )
432+ self .telnet_writer .log .debug (
433+ "auto: LINEMODE EDIT confirmed, restoring cooked mode"
434+ )
435+ return (False , wecho , False )
436+ return None
337437 # WILL ECHO alone = line mode with server echo (suppress local echo)
338438 # WILL SGA (with or without ECHO) = raw/character-at-a-time
339439 should_go_raw = not switched_to_raw and wsga
@@ -366,20 +466,35 @@ def determine_mode(self, mode: "Terminal.ModeDef") -> "Terminal.ModeDef":
366466
367467 Auto mode (``_raw_mode is None``): follows the server's negotiation.
368468
369- ================= ======== ========== ================================
469+ ================= ======== ========== ========================================
370470 Server negotiates ICANON ECHO Behavior
371- ================= ======== ========== ================================
471+ ================= ======== ========== ========================================
372472 Nothing on on Line mode, local echo
473+ LINEMODE EDIT **on** on Cooked mode, kernel handles EC/EL/echo
474+ LINEMODE remote **off** **off** Raw, server echoes
373475 WILL SGA only **off** on Character-at-a-time, local echo
374476 WILL ECHO only on **off** Line mode, server echoes
375477 WILL SGA + ECHO **off** **off** Full kludge mode (most common)
376- ================= ======== ========== ================================
478+ ================= ======== ========== ========================================
377479 """
378480 raw_mode = _get_raw_mode (self .telnet_writer )
379481 will_echo = self .telnet_writer .will_echo
380482 will_sga = self ._server_will_sga ()
381483 # Auto mode (None): follow server negotiation
382484 if raw_mode is None :
485+ if self .telnet_writer .local_option .enabled (LINEMODE ):
486+ linemode_mode = self .telnet_writer .linemode
487+ if linemode_mode .edit :
488+ # RFC 1184 / NetBSD reference: LINEMODE EDIT means ICANON on.
489+ # The kernel line discipline handles EC (VERASE), EL (VKILL),
490+ # EW (VWERASE), and echo. No software line editing needed.
491+ self .telnet_writer .log .debug (
492+ "auto: LINEMODE EDIT, cooked mode (kernel line editing)"
493+ )
494+ self .software_echo = False
495+ return mode # keep ICANON on; kernel handles EC/EL/EW and echo
496+ self .telnet_writer .log .debug ("auto: LINEMODE remote, raw input server echo" )
497+ return self ._make_raw (mode , suppress_echo = True )
383498 if will_echo and will_sga :
384499 self .telnet_writer .log .debug ("auto: server echo + SGA, kludge mode" )
385500 return self ._make_raw (mode )
@@ -535,6 +650,18 @@ def _ensure_autoreply_engine(
535650 """Return the autoreply engine from the writer's context, if set."""
536651 return telnet_writer .ctx .autoreply_engine
537652
653+ def _get_linemode_buffer (writer : Union [TelnetWriter , TelnetWriterUnicode ]) -> "LinemodeBuffer" :
654+ """Return (or lazily create) the LinemodeBuffer attached to *writer*."""
655+ buf : Optional [LinemodeBuffer ] = getattr (writer , "_linemode_buf" , None )
656+ if buf is None :
657+ buf = LinemodeBuffer (
658+ slctab = writer .slctab ,
659+ forwardmask = writer .forwardmask ,
660+ trapsig = writer .linemode .trapsig ,
661+ )
662+ writer ._linemode_buf = buf
663+ return buf
664+
538665 async def _raw_event_loop (
539666 telnet_reader : Union [TelnetReader , TelnetReaderUnicode ],
540667 telnet_writer : Union [TelnetWriter , TelnetWriterUnicode ],
@@ -593,7 +720,26 @@ async def _raw_event_loop(
593720 wait_for .remove (telnet_task )
594721 handle_close ("Connection closed." )
595722 break
596- new_timer , has_pending = _send_stdin (inp , telnet_writer , stdout , state .local_echo )
723+ linemode_edit = (
724+ telnet_writer .local_option .enabled (LINEMODE ) and telnet_writer .linemode .edit
725+ )
726+ if linemode_edit and state .switched_to_raw :
727+ # Raw PTY or non-TTY: kernel not doing line editing, use LinemodeBuffer
728+ lmbuf = _get_linemode_buffer (telnet_writer )
729+ for ch in inp .decode (errors = "replace" ):
730+ echo , data = lmbuf .feed (ch )
731+ if echo :
732+ stdout .write (echo .encode ())
733+ if data :
734+ telnet_writer ._write (data )
735+ new_timer , has_pending = None , False
736+ elif linemode_edit :
737+ # Cooked PTY: kernel already handled EC/EL/echo; forward line directly
738+ new_timer , has_pending = _send_stdin (inp , telnet_writer , stdout , False )
739+ else :
740+ new_timer , has_pending = _send_stdin (
741+ inp , telnet_writer , stdout , state .local_echo
742+ )
597743 if has_pending and esc_timer_task not in wait_for :
598744 esc_timer_task = new_timer
599745 if esc_timer_task is not None :
0 commit comments