Skip to content

Commit 9fca752

Browse files
authored
Add --always-dont and --always-wont=TEL,OPTS (jquast#134)
This mirrors --always-do and --always-will. If you're making custom clients and servers or automation's you might find issues or bugs in our implementation or the remote's, common ones like TTYPE, NAWS etc, that can be worked around by rejecting them entirely.
1 parent 11ff62c commit 9fca752

4 files changed

Lines changed: 146 additions & 11 deletions

File tree

telnetlib3/client.py

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,11 @@ async def run_client() -> None:
655655

656656
always_will: set[bytes] = args["always_will"]
657657
always_do: set[bytes] = args["always_do"]
658+
always_wont: set[bytes] = args["always_wont"]
659+
always_dont: set[bytes] = args["always_dont"]
658660

659-
# Wrap client factory to inject always_will/always_do and encoding
660-
# flags before negotiation starts.
661+
# Wrap client factory to inject always_will/always_do/always_wont/always_dont
662+
# and encoding flags before negotiation starts.
661663
encoding_explicit = args["encoding"] not in ("utf8", "utf-8", False)
662664
gmcp_modules: Optional[List[str]] = args.get("gmcp_modules")
663665

@@ -675,6 +677,10 @@ def _patched_connection_made(transport: asyncio.BaseTransport) -> None:
675677
if always_will:
676678
client.writer.always_will = always_will
677679
client.writer.always_do = always_do
680+
if always_wont:
681+
client.writer.always_wont = always_wont
682+
if always_dont:
683+
client.writer.always_dont = always_dont
678684
from .telopt import GMCP as _GMCP
679685

680686
client.writer.passive_do = {_GMCP}
@@ -779,14 +785,32 @@ def _get_argument_parser() -> argparse.ArgumentParser:
779785
action="append",
780786
default=[],
781787
metavar="OPT",
782-
help="always send DO for this option (name like GMCP or number, repeatable)",
788+
help="always send DO for this option (comma-separated, named like GMCP"
789+
" or numeric like 201, repeatable)",
790+
)
791+
parser.add_argument(
792+
"--always-dont",
793+
action="append",
794+
default=[],
795+
metavar="OPT",
796+
help="always send DONT for this option, refusing even natively supported"
797+
" options (comma-separated, named or numeric, repeatable)",
783798
)
784799
parser.add_argument(
785800
"--always-will",
786801
action="append",
787802
default=[],
788803
metavar="OPT",
789-
help="always send WILL for this option (name like MXP or number, repeatable)",
804+
help="always send WILL for this option (comma-separated, named like MXP"
805+
" or numeric like 91, repeatable)",
806+
)
807+
parser.add_argument(
808+
"--always-wont",
809+
action="append",
810+
default=[],
811+
metavar="OPT",
812+
help="always send WONT for this option, refusing even natively supported"
813+
" options (comma-separated, named or numeric, repeatable)",
790814
)
791815
parser.add_argument(
792816
"--ansi-keys",
@@ -925,6 +949,22 @@ def _parse_option_arg(value: str) -> bytes:
925949
return bytes([int(value)])
926950

927951

952+
def _parse_option_list(values: List[str]) -> set[bytes]:
953+
"""
954+
Parse a list of option arguments, splitting comma-separated values.
955+
956+
:param values: List of option strings, each may be comma-separated.
957+
:returns: Set of parsed option bytes.
958+
"""
959+
result: set[bytes] = set()
960+
for v in values:
961+
for item in v.split(","):
962+
item = item.strip()
963+
if item:
964+
result.add(_parse_option_arg(item))
965+
return result
966+
967+
928968
def _transform_args(args: argparse.Namespace) -> Dict[str, Any]:
929969
# Auto-enable force_binary for any non-ASCII encoding that uses high-bit bytes.
930970
from .encodings import FORCE_BINARY_ENCODINGS
@@ -971,8 +1011,10 @@ def _transform_args(args: argparse.Namespace) -> Dict[str, Any]:
9711011
"connect_minwait": args.connect_minwait,
9721012
"connect_timeout": args.connect_timeout or None,
9731013
"send_environ": tuple(v.strip() for v in args.send_environ.split(",") if v.strip()),
974-
"always_will": {_parse_option_arg(v) for v in args.always_will},
975-
"always_do": {_parse_option_arg(v) for v in args.always_do},
1014+
"always_will": _parse_option_list(args.always_will),
1015+
"always_do": _parse_option_list(args.always_do),
1016+
"always_wont": _parse_option_list(args.always_wont),
1017+
"always_dont": _parse_option_list(args.always_dont),
9761018
"raw_mode": raw_mode,
9771019
"ascii_eol": args.ascii_eol,
9781020
"ansi_keys": args.ansi_keys,
@@ -1012,14 +1054,32 @@ def _get_fingerprint_argument_parser() -> argparse.ArgumentParser:
10121054
action="append",
10131055
default=[],
10141056
metavar="OPT",
1015-
help="always send DO for this option (name like GMCP or number, repeatable)",
1057+
help="always send DO for this option (comma-separated, named like GMCP"
1058+
" or numeric like 201, repeatable)",
1059+
)
1060+
parser.add_argument(
1061+
"--always-dont",
1062+
action="append",
1063+
default=[],
1064+
metavar="OPT",
1065+
help="always send DONT for this option, refusing even natively supported"
1066+
" options (comma-separated, named or numeric, repeatable)",
10161067
)
10171068
parser.add_argument(
10181069
"--always-will",
10191070
action="append",
10201071
default=[],
10211072
metavar="OPT",
1022-
help="always send WILL for this option (name like MXP or number, repeatable)",
1073+
help="always send WILL for this option (comma-separated, named like MXP"
1074+
" or numeric like 91, repeatable)",
1075+
)
1076+
parser.add_argument(
1077+
"--always-wont",
1078+
action="append",
1079+
default=[],
1080+
metavar="OPT",
1081+
help="always send WONT for this option, refusing even natively supported"
1082+
" options (comma-separated, named or numeric, repeatable)",
10231083
)
10241084
parser.add_argument(
10251085
"--banner-max-bytes", default=65536, type=int, help="max bytes per banner read call"
@@ -1141,9 +1201,11 @@ async def run_fingerprint_client() -> None:
11411201
banner_max_bytes=args.banner_max_bytes,
11421202
)
11431203

1144-
# Parse --always-will/--always-do option names/numbers
1145-
fp_always_will = {_parse_option_arg(v) for v in args.always_will}
1146-
fp_always_do = {_parse_option_arg(v) for v in args.always_do}
1204+
# Parse --always-will/--always-do/--always-wont/--always-dont option names/numbers
1205+
fp_always_will = _parse_option_list(args.always_will)
1206+
fp_always_do = _parse_option_list(args.always_do)
1207+
fp_always_wont = _parse_option_list(args.always_wont)
1208+
fp_always_dont = _parse_option_list(args.always_dont)
11471209

11481210
# Parse --send-env KEY=VALUE pairs
11491211
extra_env: Dict[str, str] = {}
@@ -1178,6 +1240,10 @@ def patched_connection_made(transport: asyncio.BaseTransport) -> None:
11781240
mud_opts = {opt for opt, _, _ in fingerprinting.EXTENDED_OPTIONS}
11791241
client.writer.always_will = fp_always_will | mud_opts
11801242
client.writer.always_do = fp_always_do | mud_opts
1243+
if fp_always_wont:
1244+
client.writer.always_wont = fp_always_wont
1245+
if fp_always_dont:
1246+
client.writer.always_dont = fp_always_dont
11811247

11821248
def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]:
11831249
result = orig_send_env(keys)

telnetlib3/stream_writer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,16 @@ def __init__(
234234
#: DONT rejection in :meth:`handle_will`.
235235
self.always_do: set[bytes] = set()
236236

237+
#: Set of option byte(s) for which the client always sends WONT
238+
#: in response to DO, refusing the option even when natively
239+
#: supported. Overrides the default WILL in :meth:`handle_do`.
240+
self.always_wont: set[bytes] = set()
241+
242+
#: Set of option byte(s) for which the client always sends DONT
243+
#: in response to WILL, refusing the option even when natively
244+
#: supported. Overrides the default DO in :meth:`handle_will`.
245+
self.always_dont: set[bytes] = set()
246+
237247
#: Set of option byte(s) for which the client sends DO only
238248
#: in response to a server WILL (passive negotiation).
239249
self.passive_do: set[bytes] = set()
@@ -1824,6 +1834,11 @@ def handle_do(self, opt: bytes) -> bool:
18241834
# False for unsupported option, or an option invalid in that context,
18251835
# such as LOGOUT.
18261836
self.log.debug("handle_do(%s)", name_command(opt))
1837+
if opt in self.always_wont:
1838+
self.log.debug("DO %s: always-wont, declining.", name_command(opt))
1839+
if not self.local_option.enabled(opt):
1840+
self.iac(WONT, opt)
1841+
return False
18271842
if opt == ECHO and self.client:
18281843
# What do we have here? A Telnet Server attempting to
18291844
# fingerprint us as a broken 4.4BSD Telnet Client, which
@@ -1978,6 +1993,12 @@ def handle_will(self, opt: bytes) -> None:
19781993
"""
19791994
self.log.debug("handle_will(%s)", name_command(opt))
19801995

1996+
if opt in self.always_dont:
1997+
self.log.debug("WILL %s: always-dont, refusing.", name_command(opt))
1998+
self.iac(DONT, opt)
1999+
self.remote_option[opt] = False
2000+
return
2001+
19812002
if opt in (
19822003
BINARY,
19832004
SGA,

telnetlib3/tests/test_stream_writer_full.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,38 @@ def test_handle_will_always_do_sends_do():
11111111
assert AUTHENTICATION not in w.rejected_will
11121112

11131113

1114+
def test_handle_will_always_dont_refuses():
1115+
w, t, p = new_writer(server=True)
1116+
w.always_dont.add(SGA)
1117+
w.handle_will(SGA)
1118+
assert t.writes[-1] == IAC + DONT + SGA
1119+
assert not w.remote_option.enabled(SGA)
1120+
1121+
1122+
def test_handle_do_always_wont_refuses():
1123+
w, t, p = new_writer(server=False, client=True)
1124+
w.always_wont.add(TTYPE)
1125+
result = w.handle_do(TTYPE)
1126+
assert result is False
1127+
assert t.writes[-1] == IAC + WONT + TTYPE
1128+
1129+
1130+
def test_always_dont_overrides_native_support():
1131+
w, t, p = new_writer(server=True)
1132+
w.always_dont.add(BINARY)
1133+
w.handle_will(BINARY)
1134+
assert t.writes[-1] == IAC + DONT + BINARY
1135+
assert not w.remote_option.enabled(BINARY)
1136+
1137+
1138+
def test_always_wont_overrides_native_support():
1139+
w, t, p = new_writer(server=False, client=True)
1140+
w.always_wont.add(NAWS)
1141+
result = w.handle_do(NAWS)
1142+
assert result is False
1143+
assert t.writes[-1] == IAC + WONT + NAWS
1144+
1145+
11141146
def test_write_non_bytes_raises_type_error():
11151147
w, t, p = new_writer(server=True)
11161148
with pytest.raises(TypeError, match="buf expected bytes"):

telnetlib3/tests/test_tls.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,22 @@ async def test_parse_option_arg(input_val, expected):
406406
assert _parse_option_arg(input_val) == expected
407407

408408

409+
@pytest.mark.parametrize(
410+
"values, expected",
411+
[
412+
pytest.param(["TTYPE,NAWS"], {bytes([24]), bytes([31])}, id="comma-separated"),
413+
pytest.param(["91,TTYPE"], {bytes([91]), bytes([24])}, id="mixed-comma"),
414+
pytest.param(["TTYPE", "91"], {bytes([24]), bytes([91])}, id="repeated"),
415+
pytest.param(["TTYPE, 91"], {bytes([24]), bytes([91])}, id="comma-with-spaces"),
416+
pytest.param([], set(), id="empty"),
417+
],
418+
)
419+
async def test_parse_option_list(values, expected):
420+
from telnetlib3.client import _parse_option_list
421+
422+
assert _parse_option_list(values) == expected
423+
424+
409425
_MAX_SUBPROC_SECONDS = 8
410426

411427

0 commit comments

Comments
 (0)