Skip to content

Commit 7def409

Browse files
tcp: Added virtual Stream class for exposing a more general TCP Connection
1 parent 7dcb123 commit 7def409

14 files changed

Lines changed: 267 additions & 92 deletions

File tree

api/net/http/client_connection.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace http {
3636
using timeout_duration = std::chrono::milliseconds;
3737

3838
public:
39-
explicit Client_connection(Client&, TCP_conn);
39+
explicit Client_connection(Client&, Stream_ptr);
4040

4141
bool available() const
4242
{ return on_response_ == nullptr && keep_alive_; }

api/net/http/connection.hpp

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,30 @@ namespace http {
2929

3030
class Connection {
3131
public:
32-
using TCP_conn = net::tcp::Connection_ptr;
32+
using Stream = net::tcp::Connection::Stream;
33+
using Stream_ptr = std::unique_ptr<Stream>;
3334
using Peer = net::tcp::Socket;
3435
using buffer_t = net::tcp::buffer_t;
3536

3637
public:
37-
inline explicit Connection(TCP_conn tcpconn, bool keep_alive = true);
38+
inline explicit Connection(Stream_ptr stream, bool keep_alive = true);
3839

3940
template <typename TCP>
4041
explicit Connection(TCP&, Peer);
4142

4243
inline explicit Connection() noexcept;
4344

4445
net::tcp::port_t local_port() const noexcept
45-
{ return (tcpconn_) ? tcpconn_->local_port() : 0; }
46+
{ return (stream_) ? stream_->local_port() : 0; }
4647

4748
Peer peer() const noexcept
4849
{ return peer_; }
4950

5051
void timeout()
51-
{ tcpconn_->is_closing() ? tcpconn_->abort() : tcpconn_->close(); }
52+
{ stream_->is_closing() ? stream_->abort() : stream_->close(); }
5253

53-
auto&& tcp() const
54-
{ return tcpconn_; }
54+
auto& stream() const
55+
{ return stream_; }
5556

5657
/**
5758
* @brief Shutdown the underlying TCP connection
@@ -64,15 +65,15 @@ namespace http {
6465
*
6566
* @return The underlying TCP connection
6667
*/
67-
inline TCP_conn release();
68+
inline Stream_ptr release();
6869

6970
/**
7071
* @brief Whether the underlying TCP connection has been released or not
7172
*
7273
* @return true if the underlying TCP connection is released
7374
*/
7475
bool released() const
75-
{ return tcpconn_ == nullptr; }
76+
{ return stream_ == nullptr; }
7677

7778
static Connection& empty() noexcept
7879
{
@@ -101,52 +102,50 @@ namespace http {
101102
virtual ~Connection() {}
102103

103104
protected:
104-
TCP_conn tcpconn_;
105+
Stream_ptr stream_;
105106
bool keep_alive_;
106107
Peer peer_;
107108

108109
virtual void close() {}
109110

110111
}; // < class Connection
111112

112-
inline Connection::Connection(TCP_conn tcpconn, bool keep_alive)
113-
: tcpconn_{std::move(tcpconn)},
113+
inline Connection::Connection(Stream_ptr stream, bool keep_alive)
114+
: stream_{std::move(stream)},
114115
keep_alive_{keep_alive},
115-
peer_{tcpconn_->remote()}
116+
peer_{stream_->remote()}
116117
{
117-
Ensures(tcpconn_ != nullptr);
118+
Ensures(stream_ != nullptr);
118119
debug("<http::Connection> Created %u -> %s %p\n", local_port(), peer().to_string().c_str(), this);
119120
}
120121

121122
template <typename TCP>
122123
Connection::Connection(TCP& tcp, Peer addr)
123-
: Connection(tcp.connect(addr))
124+
: Connection(std::make_unique<Stream>(tcp.connect(addr)))
124125
{
125126
}
126127

127128
inline Connection::Connection() noexcept
128-
: tcpconn_(nullptr),
129+
: stream_(nullptr),
129130
keep_alive_(false),
130131
peer_{}
131132
{
132133
}
133134

134135
inline void Connection::shutdown()
135136
{
136-
if(not released() and not tcpconn_->is_closing())
137-
tcpconn_->close();
137+
if(not released() and not stream_->is_closing())
138+
stream_->close();
138139
}
139140

140-
inline Connection::TCP_conn Connection::release()
141+
inline Connection::Stream_ptr Connection::release()
141142
{
142-
auto copy = tcpconn_;
143+
auto copy = std::move(stream_);
143144

144145
// this is expensive and may be unecessary,
145146
// but just to be safe for now
146147
copy->reset_callbacks();
147148

148-
tcpconn_ = nullptr;
149-
150149
return copy;
151150
}
152151

api/net/http/server.hpp

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ namespace http {
6363
*
6464
* @param[in] port The port to listen on
6565
*/
66-
void listen(uint16_t port);
66+
virtual void listen(uint16_t port);
6767

6868
/**
6969
* @brief Setup handler for when a Request is received
@@ -90,21 +90,6 @@ namespace http {
9090
*/
9191
Response_ptr create_response(status_t code = http::OK) const;
9292

93-
/**
94-
* @brief Returns a vector of all TCP connections currently connected to the server
95-
*
96-
* @return A vector of TCP connections
97-
*/
98-
std::vector<TCP_conn> active_tcp_connections() const;
99-
100-
/**
101-
* @brief Reconnects a TCP connection by creating a Server connection
102-
*
103-
* @param[in] conn The TCP connection
104-
*/
105-
void reconnect(TCP_conn conn)
106-
{ if (conn != nullptr) connect(conn); }
107-
10893
~Server();
10994

11095
private:
@@ -124,7 +109,10 @@ namespace http {
124109
Stat& stat_req_bad_;
125110
Stat& stat_timeouts_;
126111

127-
void connect(TCP_conn conn);
112+
void connect(TCP_conn conn)
113+
{ connect(std::make_unique<Connection::Stream>(conn)); }
114+
115+
void connect(Connection::Stream_ptr stream);
128116

129117
void close(Server_connection&);
130118

api/net/http/server_connection.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ namespace http {
3333
static constexpr size_t DEFAULT_BUFSIZE = 1460;
3434

3535
public:
36-
explicit Server_connection(Server&, TCP_conn, size_t idx, const size_t bufsize = DEFAULT_BUFSIZE);
36+
explicit Server_connection(Server&, Stream_ptr, size_t idx, const size_t bufsize = DEFAULT_BUFSIZE);
3737

3838
void send(Response_ptr res);
3939

api/net/tcp/connection.hpp

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ class Connection : public std::enable_shared_from_this<Connection> {
5656
struct Disconnect;
5757
/** Reason for packet being dropped */
5858
enum class Drop_reason;
59+
/** A Connection stream */
60+
class Stream;
61+
5962
using Byte = uint8_t;
6063

6164
using WriteBuffer = Write_queue::WriteBuffer;
@@ -214,6 +217,205 @@ class Connection : public std::enable_shared_from_this<Connection> {
214217
*/
215218
inline void abort();
216219

220+
/**
221+
* @brief Exposes a TCP Connection as a Stream with only the most necessary features.
222+
* May be overrided by extensions like TLS etc for additional functionality.
223+
*/
224+
class Stream {
225+
public:
226+
/**
227+
* @brief Construct a Stream for a Connection ptr
228+
*
229+
* @param[in] conn The connection
230+
*/
231+
Stream(Connection_ptr conn)
232+
: ptr{std::move(conn)}
233+
{}
234+
235+
/** Called when the stream is ready to use. */
236+
using ConnectCallback = delegate<void(Stream& self)>;
237+
/**
238+
* @brief Event when the stream is connected/established/ready to use.
239+
*
240+
* @param[in] cb The connect callback
241+
*/
242+
virtual void on_connect(ConnectCallback cb)
243+
{
244+
ptr->on_connect(Connection::ConnectCallback::make_packed(
245+
[this, cb] (Connection_ptr)
246+
{ cb(*this); }));
247+
}
248+
249+
/** Called with a shared buffer and the length of the data when received. */
250+
using ReadCallback = delegate<void(buffer_t, size_t)>;
251+
/**
252+
* @brief Event when data is received.
253+
*
254+
* @param[in] n The size of the receive buffer
255+
* @param[in] cb The read callback
256+
*/
257+
virtual void on_read(size_t n, ReadCallback cb)
258+
{ ptr->on_read(n, cb); }
259+
260+
/** Called with nothing ¯\_(ツ)_/¯ */
261+
using CloseCallback = delegate<void()>;
262+
/**
263+
* @brief Event for when the Stream is being closed.
264+
*
265+
* @param[in] cb The close callback
266+
*/
267+
virtual void on_close(CloseCallback cb)
268+
{ ptr->on_close(cb); }
269+
270+
/** Called with the number of bytes written. */
271+
using WriteCallback = delegate<void(size_t)>;
272+
/**
273+
* @brief Event for when data has been written.
274+
*
275+
* @param[in] cb The write callback
276+
*/
277+
virtual void on_write(WriteCallback cb)
278+
{ ptr->on_write(cb); }
279+
280+
/**
281+
* @brief Async write of a data with a length.
282+
*
283+
* @param[in] buf data
284+
* @param[in] n length
285+
*/
286+
virtual void write(const void* buf, size_t n)
287+
{ ptr->write(buf, n); }
288+
289+
/**
290+
* @brief Async write of a chunk.
291+
*
292+
* @param[in] c A chunk
293+
*/
294+
virtual void write(Chunk c)
295+
{ ptr->write(c); }
296+
297+
/**
298+
* @brief Async write of a shared buffer with a length.
299+
* Calls write(Chunk c).
300+
*
301+
* @param[in] buffer shared buffer
302+
* @param[in] n length
303+
*/
304+
virtual void write(buffer_t buf, size_t n)
305+
{ ptr->write(buf, n); }
306+
307+
/**
308+
* @brief Async write of a string.
309+
* Calls write(const void* buf, size_t n)
310+
*
311+
* @param[in] str The string
312+
*/
313+
void write(const std::string& str)
314+
{ write(str.data(), str.size()); }
315+
316+
/**
317+
* @brief Closes the stream.
318+
*/
319+
virtual void close()
320+
{ ptr->close(); }
321+
322+
/**
323+
* @brief Aborts (terminates) the stream.
324+
*/
325+
virtual void abort()
326+
{ ptr->abort(); }
327+
328+
/**
329+
* @brief Resets all callbacks.
330+
*/
331+
virtual void reset_callbacks()
332+
{ ptr->reset_callbacks(); }
333+
334+
/**
335+
* @brief Returns the streams local socket.
336+
*
337+
* @return A TCP Socket
338+
*/
339+
tcp::Socket local() const
340+
{ return ptr->local(); }
341+
342+
/**
343+
* @brief Returns the streams remote socket.
344+
*
345+
* @return A TCP Socket
346+
*/
347+
tcp::Socket remote() const
348+
{ return ptr->remote(); }
349+
350+
/**
351+
* @brief Returns the local port.
352+
*
353+
* @return A TCP port
354+
*/
355+
uint16_t local_port() const
356+
{ return ptr->local_port(); }
357+
358+
/**
359+
* @brief Returns a string representation of the stream.
360+
*
361+
* @return String representation of the stream.
362+
*/
363+
virtual std::string to_string() const noexcept
364+
{ return ptr->to_string(); }
365+
366+
/**
367+
* @brief Determines if connected (established).
368+
*
369+
* @return True if connected, False otherwise.
370+
*/
371+
virtual bool is_connected() const noexcept
372+
{ return ptr->is_connected(); }
373+
374+
/**
375+
* @brief Determines if writable. (write is allowed)
376+
*
377+
* @return True if writable, False otherwise.
378+
*/
379+
virtual bool is_writable() const noexcept
380+
{ return ptr->is_writable(); }
381+
382+
/**
383+
* @brief Determines if readable. (data can be received)
384+
*
385+
* @return True if readable, False otherwise.
386+
*/
387+
virtual bool is_readable() const noexcept
388+
{ return ptr->is_readable(); }
389+
390+
/**
391+
* @brief Determines if closing.
392+
*
393+
* @return True if closing, False otherwise.
394+
*/
395+
virtual bool is_closing() const noexcept
396+
{ return ptr->is_closing(); }
397+
398+
/**
399+
* @brief Determines if closed.
400+
*
401+
* @return True if closed, False otherwise.
402+
*/
403+
virtual bool is_closed() const noexcept
404+
{ return ptr->is_closed(); };
405+
406+
protected:
407+
/**
408+
* @brief Returns the underlying TCP connection.
409+
*
410+
* @return A TCP Connection ptr
411+
*/
412+
Connection_ptr tcp()
413+
{ return ptr; };
414+
415+
private:
416+
Connection_ptr ptr;
417+
418+
}; // < class Connection::Stream
217419

218420
/**
219421
* @brief Reason for disconnect event.

0 commit comments

Comments
 (0)