File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -250,10 +250,10 @@ def __init__(self, args=None):
250250 # send options
251251 self .add_config_option (
252252 section = 'send' ,
253- key = 'socket_path ' ,
254- default_value = '/var/run/receptor_controller.sock ' ,
255- value_type = 'path ' ,
256- hint = 'Path to control socket for controller commands.' ,
253+ key = 'peer ' ,
254+ default_value = '' ,
255+ value_type = 'str ' ,
256+ hint = 'The peer to relay the directive through'
257257 )
258258 self .add_config_option (
259259 section = 'send' ,
@@ -276,7 +276,7 @@ def __init__(self, args=None):
276276 long_option = 'send_payload' ,
277277 default_value = '' ,
278278 value_type = 'str' ,
279- hint = 'Payload of the directive to send. Use - for stdin.' ,
279+ hint = 'Payload of the directive to send. Use - for stdin or give the path to a file to transmit the file contents .' ,
280280 )
281281 # Component options. These are also only used in a config section
282282 # like auth, so they also set `subparse=False`.
Original file line number Diff line number Diff line change 11import logging
22import time
33import asyncio
4+ import sys
5+ import os
46
57from prometheus_client import start_http_server
68
79from .controller import Controller
10+ from .messages import Message
811
912logger = logging .getLogger (__name__ )
1013
@@ -84,4 +87,26 @@ async def send_pings():
8487
8588
8689def run_as_send (config ):
87- pass
90+ async def send_entrypoint ():
91+ read_task = controller .loop .create_task (read_responses ())
92+ await controller .add_peer (config .send_peer )
93+ start_wait = time .time ()
94+ while not controller .receptor .router .node_is_known (config .ping_recipient ) and (time .time () - start_wait < 5 ):
95+ await asyncio .sleep (0.1 )
96+ msg = Message (config .send_recipient , config .send_directive )
97+ if config .send_payload == "-" :
98+ msg .buffer (sys .stdin )
99+ elif os .path .exists (config .send_payload ):
100+ msg .file (config .send_payload )
101+ else :
102+ msg .data (config .send_payload )
103+ await controller .send (msg )
104+ await read_task
105+
106+ async def read_responses ():
107+ while True :
108+ print ("{}" .format (await controller .recv ()))
109+
110+ logger .info (f'Sending directive { config .send_directive } to { config .send_recipient } via { config .send_peer } ' )
111+ controller = Controller (config )
112+ controller .run (send_entrypoint )
Original file line number Diff line number Diff line change 1+ from .envelope import Message # noqa
Original file line number Diff line number Diff line change 1818
1919class Message :
2020
21- __slots__ = ("fd" )
21+ __slots__ = ("fd" , "recipient" , "directive" )
2222
2323 def __init__ (self , recipient , directive ):
2424 self .fd = io .BytesIO ()
You can’t perform that action at this time.
0 commit comments