Skip to content

Commit 35e1739

Browse files
authored
Remove prefix bytes shenanigans (#88)
Why === There was a world where we muxed crosis/pid1 and river/pid2 over one websocket, so we had some prefix bytes. We killed that months ago. Just cleaning up. What changed ============ :yeet: Test plan ========= river rivers
1 parent b500a0d commit 35e1739

4 files changed

Lines changed: 2 additions & 25 deletions

File tree

replit_river/client_transport.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ async def websocket_closed_callback() -> None:
250250
payload=handshake_request.model_dump(),
251251
),
252252
ws=websocket,
253-
prefix_bytes=self._transport_options.get_prefix_bytes(),
254253
websocket_closed_callback=websocket_closed_callback,
255254
)
256255
return handshake_request

replit_river/messages.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,19 @@ class FailedSendingMessageException(Exception):
3131

3232
PROTOCOL_VERSION = "v1.1"
3333

34-
CROSIS_PREFIX_BYTES = b"\x00\x00"
35-
PID2_PREFIX_BYTES = b"\xff\xff"
36-
3734

3835
async def send_transport_message(
3936
msg: TransportMessage,
4037
ws: WebSocketCommonProtocol,
4138
websocket_closed_callback: Callable[[], Coroutine[Any, Any, None]],
42-
prefix_bytes: bytes = b"",
4339
) -> None:
4440
logger.debug("sending a message %r to ws %s", msg, ws)
4541
try:
4642
packed = msgpack.packb(
4743
msg.model_dump(by_alias=True, exclude_none=True), datetime=True
4844
)
4945
assert isinstance(packed, bytes)
50-
await ws.send(prefix_bytes + packed)
46+
await ws.send(packed)
5147
except websockets.exceptions.ConnectionClosed as e:
5248
await websocket_closed_callback()
5349
raise WebsocketClosedException("Websocket closed during send message") from e
@@ -73,15 +69,6 @@ def parse_transport_msg(
7369
raise IgnoreMessageException(
7470
f"ignored a message beacuse it was a text frame: {message}"
7571
)
76-
if transport_options.use_prefix_bytes:
77-
if message.startswith(CROSIS_PREFIX_BYTES):
78-
raise IgnoreMessageException("Skip crosis message")
79-
elif message.startswith(PID2_PREFIX_BYTES):
80-
message = message[len(PID2_PREFIX_BYTES) :]
81-
else:
82-
raise InvalidMessageException(
83-
f"Got message without prefix bytes: {formatted_bytes(message)[:5]}"
84-
)
8572
try:
8673
# :param int timestamp:
8774
# Control how timestamp type is unpacked:

replit_river/session.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ async def _send_buffered_messages(
317317
await self._send_transport_message(
318318
msg,
319319
websocket,
320-
prefix_bytes=self._transport_options.get_prefix_bytes(),
321320
)
322321
except WebsocketClosedException:
323322
logger.info(
@@ -332,11 +331,10 @@ async def _send_transport_message(
332331
self,
333332
msg: TransportMessage,
334333
websocket: websockets.WebSocketCommonProtocol,
335-
prefix_bytes: bytes = b"",
336334
) -> None:
337335
try:
338336
await send_transport_message(
339-
msg, websocket, self._begin_close_session_countdown, prefix_bytes
337+
msg, websocket, self._begin_close_session_countdown
340338
)
341339
except WebsocketClosedException as e:
342340
raise e
@@ -401,7 +399,6 @@ async def send_message(
401399
await self._send_transport_message(
402400
msg,
403401
self._ws_wrapper.ws,
404-
prefix_bytes=self._transport_options.get_prefix_bytes(),
405402
)
406403
except WebsocketClosedException as e:
407404
logger.debug(

replit_river/transport_options.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
from pydantic import BaseModel
55

6-
CROSIS_PREFIX_BYTES = b"\x00\x00"
7-
PID2_PREFIX_BYTES = b"\xff\xff"
86
MAX_MESSAGE_BUFFER_SIZE = 1024
97

108

@@ -25,15 +23,11 @@ class TransportOptions(BaseModel):
2523
heartbeat_ms: float = 2_500
2624
# TODO: This should have a better name like max_failed_heartbeats
2725
heartbeats_until_dead: int = 4
28-
use_prefix_bytes: bool = False
2926
close_session_check_interval_ms: float = 100
3027
connection_retry_options: ConnectionRetryOptions = ConnectionRetryOptions()
3128
buffer_size: int = 1_000
3229
transparent_reconnect: bool = True
3330

34-
def get_prefix_bytes(self) -> bytes:
35-
return PID2_PREFIX_BYTES if self.use_prefix_bytes else b""
36-
3731
def websocket_disconnect_grace_ms(self) -> float:
3832
return self.heartbeat_ms * self.heartbeats_until_dead
3933

0 commit comments

Comments
 (0)