Skip to content

Commit 03b3017

Browse files
Merge pull request #1 from AndreasAakesson/tcp_rcvwnd
TCP, pmr and microlb fixes
2 parents 8f38456 + 27fd8af commit 03b3017

4 files changed

Lines changed: 60 additions & 21 deletions

File tree

api/util/detail/alloc_pmr.hpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace os::mem::detail {
3333

3434
void* do_allocate(size_t size, size_t align) override {
3535
if (UNLIKELY(size + allocated_ > cap_total_)) {
36+
//printf("pmr about to throw bad alloc: sz=%zu alloc=%zu cap=%zu\n", size, allocated_, cap_total_);
3637
throw std::bad_alloc();
3738
}
3839

@@ -46,6 +47,7 @@ namespace os::mem::detail {
4647
void* buf = memalign(align, size);
4748

4849
if (buf == nullptr) {
50+
//printf("pmr memalign return nullptr, throw bad alloc\n");
4951
throw std::bad_alloc();
5052
}
5153

@@ -152,7 +154,9 @@ namespace os::mem::detail {
152154

153155
std::size_t resource_capacity() {
154156
if (cap_suballoc_ == 0)
157+
{
155158
return cap_total_ / (used_resources_ + os::mem::Pmr_pool::resource_division_offset);
159+
}
156160
return cap_suballoc_;
157161
}
158162

@@ -244,7 +248,9 @@ namespace os::mem {
244248
// Pmr_resource implementation
245249
//
246250
Pmr_resource::Pmr_resource(Pool_ptr p) : pool_{p} {}
247-
std::size_t Pmr_resource::capacity() { return pool_->resource_capacity(); }
251+
std::size_t Pmr_resource::capacity() {
252+
return std::min(pool_->resource_capacity(), pool_->allocatable());
253+
}
248254
std::size_t Pmr_resource::allocatable() {
249255
auto cap = capacity();
250256
if (used > cap)
@@ -267,10 +273,8 @@ namespace os::mem {
267273
}
268274

269275
void* buf = pool_->allocate(size, align);
270-
271276
used += size;
272277
allocs++;
273-
274278
return buf;
275279
}
276280

lib/microLB/micro_lb/balancer.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,12 @@ namespace microLB
259259
auto &session=get_session(idx);
260260

261261
// free session destroying potential unique ptr objects
262-
session.incoming =nullptr;
263-
session.outgoing=nullptr;
262+
session.incoming = nullptr;
263+
auto out_tcp = dynamic_cast<net::tcp::Stream*>(session.outgoing->bottom_transport())->tcp();
264+
session.outgoing = nullptr;
265+
// if we don't have anything to write to the backend, abort it.
266+
if(not out_tcp->sendq_size())
267+
out_tcp->abort();
264268
free_sessions.push_back(session.self);
265269
LBOUT("Session %d destroyed (total = %d)\n", session.self, session_cnt);
266270
}
@@ -359,7 +363,17 @@ namespace microLB
359363
}
360364
void Node::connect()
361365
{
362-
auto outgoing = this->stack.tcp().connect(this->addr);
366+
net::tcp::Connection_ptr outgoing;
367+
try
368+
{
369+
outgoing = this->stack.tcp().connect(this->addr);
370+
}
371+
catch([[maybe_unused]]const net::TCP_error& err)
372+
{
373+
LBOUT("Got exception: %s\n", err.what());
374+
this->restart_active_check();
375+
return;
376+
}
363377
// connecting to node atm.
364378
this->connecting++;
365379
// retry timer when connect takes too long

src/net/tcp/connection.cpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ Connection::Connection(TCP& host, Socket local, Socket remote, ConnectCallback c
5353

5454
Connection::~Connection()
5555
{
56-
//printf("<Connection> Deleted %p %s ACTIVE: %u\n", this,
56+
//printf("<Connection> Deleted %p %s ACTIVE: %zu\n", this,
5757
// to_string().c_str(), host_.active_connections());
58+
5859
rtx_clear();
5960
}
6061

@@ -434,6 +435,8 @@ bool Connection::handle_ack(const Packet_view& in)
434435

435436
if(is_win_update(in, true_win))
436437
{
438+
//if(cb.SND.WND < SMSS()*2)
439+
// printf("Win update: %u => %u\n", cb.SND.WND, true_win);
437440
cb.SND.WND = true_win;
438441
cb.SND.WL1 = in.seq();
439442
cb.SND.WL2 = in.ack();
@@ -613,7 +616,7 @@ void Connection::on_dup_ack(const Packet_view& in)
613616
// 3 dup acks
614617
else if(dup_acks_ == 3)
615618
{
616-
debug("<TCP::Connection::on_dup_ack> Dup ACK == 3 - UNA=%u recover=%u\n", cb.SND.UNA, cb.recover);
619+
//printf("<TCP::Connection::on_dup_ack> Dup ACK == 3 - UNA=%u recover=%u\n", cb.SND.UNA, cb.recover);
617620

618621
if(cb.SND.UNA - 1 > cb.recover)
619622
goto fast_rtx;
@@ -997,8 +1000,8 @@ void Connection::retransmit() {
9971000

9981001
// TODO: Finish to send window zero probe, but only on rtx timeout
9991002

1000-
debug2("<Connection::retransmit> With data (wq.sz=%u) buf.unacked=%u\n",
1001-
writeq.size(), buf->size(), buf->size() - writeq.acked());
1003+
//printf("<Connection::retransmit> With data (wq.sz=%zu) buf.size=%zu buf.unacked=%zu SND.WND=%u CWND=%u\n",
1004+
// writeq.size(), buf->size(), buf->size() - writeq.acked(), cb.SND.WND, cb.cwnd);
10021005
fill_packet(*packet, buf->data() + writeq.acked(), buf->size() - writeq.acked());
10031006
packet->set_flag(PSH);
10041007
}
@@ -1070,8 +1073,8 @@ void Connection::rtx_clear() {
10701073
begins (i.e., after the three-way handshake completes).
10711074
*/
10721075
void Connection::rtx_timeout() {
1073-
debug("<Connection::RTX@timeout> Timed out (RTO %lld ms). FS: %u\n",
1074-
rttm.rto_ms().count(), flight_size());
1076+
//printf("<Connection::RTX@timeout> Timed out (RTO %lld ms). FS: %u usable=%u\n",
1077+
// rttm.rto_ms().count(), flight_size(), usable_window());
10751078

10761079
signal_rtx_timeout();
10771080
// experimental
@@ -1421,12 +1424,12 @@ void Connection::reduce_ssthresh() {
14211424
fs = (fs >= two_seg) ? fs - two_seg : 0;
14221425

14231426
cb.ssthresh = std::max( (fs / 2), two_seg );
1424-
debug2("<TCP::Connection::reduce_ssthresh> Slow start threshold reduced: %u\n",
1425-
cb.ssthresh);
1427+
//printf("<TCP::Connection::reduce_ssthresh> Slow start threshold reduced: %u\n",
1428+
// cb.ssthresh);
14261429
}
14271430

14281431
void Connection::fast_retransmit() {
1429-
debug("<TCP::Connection::fast_retransmit> Fast retransmit initiated.\n");
1432+
//printf("<TCP::Connection::fast_retransmit> Fast retransmit initiated.\n");
14301433
// reduce sshtresh
14311434
reduce_ssthresh();
14321435
// retransmit segment starting SND.UNA
@@ -1441,5 +1444,5 @@ void Connection::finish_fast_recovery() {
14411444
fast_recovery_ = false;
14421445
//cb.cwnd = std::min(cb.ssthresh, std::max(flight_size(), (uint32_t)SMSS()) + SMSS());
14431446
cb.cwnd = cb.ssthresh;
1444-
debug("<TCP::Connection::finish_fast_recovery> Finished Fast Recovery - Cwnd: %u\n", cb.cwnd);
1447+
//printf("<TCP::Connection::finish_fast_recovery> Finished Fast Recovery - Cwnd: %u\n", cb.cwnd);
14451448
}

src/net/tcp/tcp.cpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -492,28 +492,43 @@ bool TCP::unbind(const Socket& socket)
492492
return false;
493493
}
494494

495-
bool TCP::add_connection(tcp::Connection_ptr conn) {
495+
bool TCP::add_connection(tcp::Connection_ptr conn)
496+
{
497+
const size_t alloc_thres = max_bufsize() * Read_request::buffer_limit;
496498
// Stat increment number of incoming connections
497499
(*incoming_connections_)++;
498500

499501
debug("<TCP::add_connection> Connection added %s \n", conn->to_string().c_str());
500-
conn->bufalloc = mempool_.get_resource();
502+
auto resource = mempool_.get_resource();
501503

502504
// Reject connection if we can't allocate memory
503-
if (conn->bufalloc == nullptr
504-
or conn->bufalloc->allocatable() < max_bufsize() * Read_request::buffer_limit){
505+
if(UNLIKELY(resource == nullptr or resource->allocatable() < alloc_thres))
506+
{
505507
conn->_on_cleanup_ = nullptr;
506508
conn->abort();
507509
return false;
508510
}
509511

512+
conn->bufalloc = std::move(resource);
513+
514+
//printf("New inc conn %s allocatable=%zu\n", conn->to_string().c_str(), conn->bufalloc->allocatable());
515+
510516
Expects(conn->bufalloc != nullptr);
511517
conn->_on_cleanup({this, &TCP::close_connection});
512518
return connections_.emplace(conn->tuple(), conn).second;
513519
}
514520

515521
Connection_ptr TCP::create_connection(Socket local, Socket remote, ConnectCallback cb)
516522
{
523+
const size_t alloc_thres = max_bufsize() * Read_request::buffer_limit;
524+
525+
auto resource = mempool_.get_resource();
526+
// Don't create connection if we can't allocate memory
527+
if(UNLIKELY(resource == nullptr or resource->allocatable() < alloc_thres))
528+
{
529+
throw TCP_error{"Unable to create new connection: Not enough allocatable memory"};
530+
}
531+
517532
// Stat increment number of outgoing connections
518533
(*outgoing_connections_)++;
519534

@@ -523,7 +538,10 @@ Connection_ptr TCP::create_connection(Socket local, Socket remote, ConnectCallba
523538
)
524539
).first->second;
525540
conn->_on_cleanup({this, &TCP::close_connection});
526-
conn->bufalloc = mempool_.get_resource();
541+
conn->bufalloc = std::move(resource);
542+
543+
//printf("New out conn %s allocatable=%zu\n", conn->to_string().c_str(), conn->bufalloc->allocatable());
544+
527545
Expects(conn->bufalloc != nullptr);
528546
return conn;
529547
}

0 commit comments

Comments
 (0)