Skip to content

Commit ffacd83

Browse files
authored
Merge pull request #60 from matburt/send_support
Add send support back in
2 parents 4140062 + fe03f22 commit ffacd83

4 files changed

Lines changed: 33 additions & 7 deletions

File tree

receptor/config.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,10 @@ def __init__(self, args=None):
250250
# send options
251251
self.add_config_option(
252252
section='send',
253-
key='socket_path',
254-
default_value='/var/run/receptor_controller.sock',
255-
value_type='path',
256-
hint='Path to control socket for controller commands.',
253+
key='peer',
254+
default_value='',
255+
value_type='str',
256+
hint='The peer to relay the directive through'
257257
)
258258
self.add_config_option(
259259
section='send',
@@ -276,7 +276,7 @@ def __init__(self, args=None):
276276
long_option='send_payload',
277277
default_value='',
278278
value_type='str',
279-
hint='Payload of the directive to send. Use - for stdin.',
279+
hint='Payload of the directive to send. Use - for stdin or give the path to a file to transmit the file contents.',
280280
)
281281
# Component options. These are also only used in a config section
282282
# like auth, so they also set `subparse=False`.

receptor/entrypoints.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import logging
22
import time
33
import asyncio
4+
import sys
5+
import os
46

57
from prometheus_client import start_http_server
68

79
from .controller import Controller
10+
from .messages import Message
811

912
logger = logging.getLogger(__name__)
1013

@@ -84,4 +87,26 @@ async def send_pings():
8487

8588

8689
def run_as_send(config):
87-
pass
90+
async def send_entrypoint():
91+
read_task = controller.loop.create_task(read_responses())
92+
await controller.add_peer(config.send_peer)
93+
start_wait = time.time()
94+
while not controller.receptor.router.node_is_known(config.ping_recipient) and (time.time() - start_wait < 5):
95+
await asyncio.sleep(0.1)
96+
msg = Message(config.send_recipient, config.send_directive)
97+
if config.send_payload == "-":
98+
msg.buffer(sys.stdin)
99+
elif os.path.exists(config.send_payload):
100+
msg.file(config.send_payload)
101+
else:
102+
msg.data(config.send_payload)
103+
await controller.send(msg)
104+
await read_task
105+
106+
async def read_responses():
107+
while True:
108+
print("{}".format(await controller.recv()))
109+
110+
logger.info(f'Sending directive {config.send_directive} to {config.send_recipient} via {config.send_peer}')
111+
controller = Controller(config)
112+
controller.run(send_entrypoint)

receptor/messages/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .envelope import Message # noqa

receptor/messages/envelope.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
class Message:
2020

21-
__slots__ = ("fd")
21+
__slots__ = ("fd", "recipient", "directive")
2222

2323
def __init__(self, recipient, directive):
2424
self.fd = io.BytesIO()

0 commit comments

Comments
 (0)