Skip to content

Commit 5ec02a9

Browse files
committed
raw sockets are functional now, logging now has node id
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 23be559 commit 5ec02a9

3 files changed

Lines changed: 23 additions & 17 deletions

File tree

receptor/__main__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def main(args=None):
2121
'disable_existing_loggers': False,
2222
'formatters': {
2323
'verbose': {
24-
'format': '{levelname} {asctime} {module} {message}',
24+
'format': '{levelname} {asctime} {node_id} {module} {message}',
2525
'style': '{',
2626
}
2727
},
@@ -40,6 +40,13 @@ def main(args=None):
4040
}
4141
)
4242

43+
def _f(record):
44+
record.node_id = config.default_node_id
45+
return True
46+
47+
for h in logging.getLogger('receptor').handlers:
48+
h.addFilter(_f)
49+
4350
try:
4451
config.go()
4552
except Exception:

receptor/connection/__init__.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
11
import logging
22

33
import asyncio
4+
from collections.abc import AsyncIterator
5+
from abc import abstractmethod, abstractproperty
46

57
from ..messages.envelope import FramedBuffer
68

79
logger = logging.getLogger(__name__)
810

911

10-
class Transport:
11-
def __aiter__(self):
12-
return self
13-
14-
async def __anext__(self):
15-
raise NotImplementedError("subclasses should implement this")
12+
class Transport(AsyncIterator):
1613

14+
@abstractmethod
1715
async def close(self):
18-
raise NotImplementedError("subclasses should implement this")
16+
pass
1917

20-
@property
18+
@abstractproperty
2119
def closed(self):
22-
raise NotImplementedError("subclasses should implement this")
20+
pass
2321

22+
@abstractmethod
2423
async def send(self, bytes_):
25-
raise NotImplementedError("subclasses should implement this")
24+
pass
2625

2726

2827
async def watch_queue(conn, buf):
@@ -41,7 +40,6 @@ async def watch_queue(conn, buf):
4140
logger.exception("watch_queue: error received trying to write")
4241
await buf.put(msg)
4342
return await conn.close()
44-
logger.debug("watch_queue: ws is now closed")
4543

4644

4745
class Worker:
@@ -56,7 +54,6 @@ def __init__(self, receptor, loop):
5654
self.write_task = None
5755

5856
def start_receiving(self):
59-
logger.debug("starting recv")
6057
self.read_task = self.loop.create_task(self.receive())
6158

6259
async def receive(self):
@@ -98,7 +95,7 @@ async def start_processing(self):
9895
return await self.write_task
9996

10097
async def _wait_handshake(self):
101-
logger.debug("serve: waiting for HI")
98+
logger.debug("waiting for HI")
10299
response = await self.buf.get() # TODO: deal with timeout
103100
self.remote_id = response.header["id"]
104101
self.register()
@@ -110,7 +107,7 @@ async def client(self, transport):
110107
await self.hello()
111108
await self._wait_handshake()
112109
await self.start_processing()
113-
logger.debug("connect: normal exit")
110+
logger.debug("normal exit")
114111
finally:
115112
self.unregister()
116113

receptor/connection/sock.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66

77

88
class RawSocket(Transport):
9-
def __init__(self, reader, writer):
9+
def __init__(self, reader, writer, chunk_size=2 ** 8):
1010
self.reader = reader
1111
self.writer = writer
1212
self._closed = False
13+
self.chunk_size = chunk_size
1314

1415
async def __anext__(self):
15-
return await self.reader.read()
16+
bytes_ = await self.reader.read(self.chunk_size)
17+
return bytes_
1618

1719
@property
1820
def closed(self):

0 commit comments

Comments
 (0)