Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 65 additions & 55 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,23 @@ bool MatchHttp11Line(string_view line) {
absl::EndsWith(line, "HTTP/1.1");
}

void UpdateIoBufCapacity(const io::IoBuf& io_buf, ConnectionStats* stats,
absl::FunctionRef<void()> f) {
const size_t prev_capacity = io_buf.Capacity();
f();
const size_t capacity = io_buf.Capacity();
if (prev_capacity != capacity) {
VLOG(2) << "Grown io_buf to " << capacity;
stats->read_buf_capacity += capacity - prev_capacity;
struct ReadBufTracker {
explicit ReadBufTracker(const io::IoBuf& io_buf)
: io_buf_(io_buf), last_capacity_(io_buf.Capacity()) {
}

~ReadBufTracker() {
size_t capacity = io_buf_.Capacity();
if (last_capacity_ != capacity) {
VLOG(2) << "Grown io_buf to " << capacity;
tl_facade_stats->conn_stats.read_buf_capacity += capacity - last_capacity_;
}
}
}

private:
const io::IoBuf& io_buf_;
size_t last_capacity_;
};

size_t UsedMemoryInternal(const ParsedCommand& msg) {
return msg.GetSize() + msg.HeapMemory();
Expand Down Expand Up @@ -1049,7 +1056,6 @@ io::Result<bool> Connection::CheckForHttpProto() {

size_t last_len = 0;
auto* peer = socket_.get();
auto& conn_stats = tl_facade_stats->conn_stats;
do {
auto buf = io_buf_.AppendBuffer();
DCHECK(!buf.empty());
Expand Down Expand Up @@ -1082,7 +1088,10 @@ io::Result<bool> Connection::CheckForHttpProto() {
return MatchHttp11Line(ib);
}
last_len = io_buf_.InputLen();
UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { io_buf_.EnsureCapacity(128); });
{
ReadBufTracker tracker(io_buf_);
io_buf_.EnsureCapacity(128);
}
} while (last_len < 1024);

return false;
Expand All @@ -1108,7 +1117,7 @@ void Connection::ConnectionFlow() {
if (io_buf_.InputLen() > 0) {
phase_ = PROCESS;
if (redis_parser_ && !ioloop_v2_) {
parse_status = ParseRedis(10000);
parse_status = ParseRedis(io_buf_, 10000);
} else {
parse_status = ParseLoop();
}
Expand All @@ -1118,7 +1127,10 @@ void Connection::ConnectionFlow() {

// Main loop.
if (parse_status != ERROR && !ec) {
UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { io_buf_.EnsureCapacity(64); });
{
ReadBufTracker tracker(io_buf_);
io_buf_.EnsureCapacity(64);
}
variant<error_code, Connection::ParserStatus> res;
if (ioloop_v2_) {
res = IoLoopV2();
Expand Down Expand Up @@ -1266,7 +1278,8 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
}
}

Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool enqueue_only) {
Connection::ParserStatus Connection::ParseRedis(base::IoBuf& io_buf, unsigned max_busy_cycles,
bool enqueue_only) {
uint32_t consumed = 0;
RespSrvParser::Result result = RespSrvParser::OK;

Expand All @@ -1281,7 +1294,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool e
auto* cmd = std::exchange(parsed_cmd_, ptr.release());
EnqueueParsedCommand(cmd);
};
io::Bytes read_buffer = io_buf_.InputBuffer();
io::Bytes read_buffer = io_buf.InputBuffer();
// Keep track of total bytes consumed/parsed. The do/while{} loop below preempts,
// and InputBuffer() size might change between preemption points. There is a corner case,
// that ConsumeInput() will strip a portion of the request which makes the test_publish_stuck
Expand Down Expand Up @@ -1336,7 +1349,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool e
}
} while (RespSrvParser::OK == result && read_buffer.size() > 0 && !reply_builder_->GetError());

io_buf_.ConsumeInput(total_consumed);
io_buf.ConsumeInput(total_consumed);

parser_error_ = result;
if (result == RespSrvParser::OK)
Expand All @@ -1359,7 +1372,7 @@ auto Connection::ParseLoop() -> ParserStatus {

bool commands_parsed = false;
do {
commands_parsed = (this->*parse_func)();
commands_parsed = (this->*parse_func)(io_buf_);

if (!ExecuteBatch())
return ERROR;
Expand Down Expand Up @@ -1476,10 +1489,10 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoop() {
}

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;
bool reached_capacity = io_buf_.AppendLen() == 0;

if (redis_parser_) {
parse_status = ParseRedis(max_busy_read_cycles_cached);
parse_status = ParseRedis(io_buf_, max_busy_read_cycles_cached);
} else {
DCHECK(memcache_parser_);
parse_status = ParseLoop();
Expand All @@ -1504,19 +1517,16 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoop() {
// (Note: The buffer object is only working in power-of-2 sizes,
// so there's no danger of accidental O(n^2) behavior.)
if (parser_hint > capacity) {
auto& conn_stats = GetLocalConnStats();
UpdateIoBufCapacity(io_buf_, &conn_stats,
[&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); });
ReadBufTracker tracker(io_buf_);
io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint));
}

// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (is_iobuf_full && capacity < max_iobfuf_len / 2) {
auto& conn_stats = GetLocalConnStats();
if (reached_capacity && capacity < max_iobfuf_len / 2) {
// Last io used most of the io_buf to the end.
UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() {
io_buf_.Reserve(capacity * 2); // Valid growth range.
});
ReadBufTracker tracker(io_buf_);
io_buf_.Reserve(capacity * 2); // Valid growth range.
}

if (io_buf_.AppendLen() == 0U) {
Expand Down Expand Up @@ -2306,7 +2316,7 @@ bool Connection::IsReplySizeOverLimit() const {
return over_limit;
}

bool Connection::ParseRedisBatch() {
bool Connection::ParseRedisBatch(base::IoBuf& buf) {
QueueBackpressure& qbp = GetQueueBackpressure();

// Only throttle parsing if this connection is actively contributing to the queue.
Expand All @@ -2319,11 +2329,11 @@ bool Connection::ParseRedisBatch() {
GetLocalConnStats().pipeline_throttle_count++;
return false;
}
return ParseRedis(max_busy_read_cycles_cached, true) == ParserStatus::OK;
return ParseRedis(buf, max_busy_read_cycles_cached, true) == ParserStatus::OK;
}

bool Connection::ParseMCBatch() {
CHECK(io_buf_.InputLen() > 0);
bool Connection::ParseMCBatch(base::IoBuf& io_buf) {
CHECK(io_buf.InputLen() > 0);

do {
if (parsed_cmd_ == nullptr) {
Expand All @@ -2333,9 +2343,9 @@ bool Connection::ParseMCBatch() {
}
uint32_t consumed = 0;
memcache_parser_->set_last_unix_time(time(nullptr));
MemcacheParser::Result result = memcache_parser_->Parse(io::View(io_buf_.InputBuffer()),
MemcacheParser::Result result = memcache_parser_->Parse(io::View(io_buf.InputBuffer()),
&consumed, parsed_cmd_->mc_command());
io_buf_.ConsumeInput(consumed);
io_buf.ConsumeInput(consumed);

DVLOG(2) << "mc_result " << unsigned(result) << " consumed: " << consumed << " type "
<< unsigned(parsed_cmd_->mc_command()->type);
Expand Down Expand Up @@ -2370,7 +2380,7 @@ bool Connection::ParseMCBatch() {
break;
}
}
} while (parsed_cmd_q_len_ < 128 && io_buf_.InputLen() > 0);
} while (parsed_cmd_q_len_ < 128 && io_buf.InputLen() > 0);
return true;
}

Expand Down Expand Up @@ -2453,12 +2463,12 @@ bool Connection::ExecuteBatch() {

bool Connection::ReplyBatch() {
reply_builder_->SetBatchMode(true);
while (HasDispatchedCommands() && parsed_head_->CanReply()) {
while (HasInFlightCommands() && parsed_head_->CanReply()) {
current_wait_.reset(); // Clear the subscription before moving to the next command
auto* cmd = parsed_head_;
parsed_head_ = cmd->next;
cmd->SendReply();
ReleaseParsedCommand(cmd, HasDispatchedCommands() /* is_pipelined */);
ReleaseParsedCommand(cmd, HasInFlightCommands() /* is_pipelined */);
if (reply_builder_->GetError())
return false;
}
Expand Down Expand Up @@ -2663,8 +2673,10 @@ void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n)
pending_input_ = true;
} else if (std::holds_alternative<io::MutableBytes>(n.read_result)) { // provided buffer.
io::MutableBytes buf = std::get<io::MutableBytes>(n.read_result);
UpdateIoBufCapacity(io_buf_, &tl_facade_stats->conn_stats,
[&]() { io_buf_.WriteAndCommit(buf.data(), buf.size()); });
{
ReadBufTracker tracker(io_buf_);
io_buf_.WriteAndCommit(buf.data(), buf.size());
}
last_interaction_ = time(nullptr);
} else {
LOG(FATAL) << "Should not reach here";
Expand Down Expand Up @@ -2700,11 +2712,10 @@ void Connection::ReadPendingInput() {
}
}

void Connection::CheckIoBufCapacity(bool is_iobuf_full) {
auto& conn_stats = tl_facade_stats->conn_stats;
void Connection::CheckIoBufCapacity(bool reached_capacity, base::IoBuf* io_buf) {
size_t max_io_buf_len = GetFlag(FLAGS_max_client_iobuf_len);

size_t capacity = io_buf_.Capacity();
size_t capacity = io_buf->Capacity();
if (capacity < max_io_buf_len) {
size_t parser_hint = 0;
if (redis_parser_)
Expand All @@ -2716,23 +2727,22 @@ void Connection::CheckIoBufCapacity(bool is_iobuf_full) {
// (Note: The buffer object is only working in power-of-2 sizes,
// so there's no danger of accidental O(n^2) behavior.)
if (parser_hint > capacity) {
UpdateIoBufCapacity(io_buf_, &conn_stats,
[&]() { io_buf_.Reserve(std::min(max_io_buf_len, parser_hint)); });
ReadBufTracker tracker(*io_buf);
io_buf->Reserve(std::min(max_io_buf_len, parser_hint));
}

// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (is_iobuf_full && capacity < max_io_buf_len / 2) {
if (reached_capacity && capacity < max_io_buf_len / 2) {
// Last io used most of the io_buf to the end.
UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() {
io_buf_.Reserve(capacity * 2); // Valid growth range.
});
ReadBufTracker tracker(*io_buf);
io_buf->Reserve(capacity * 2); // Valid growth range.
}

if (io_buf_.AppendLen() == 0U) {
if (io_buf->AppendLen() == 0U) {
// it can happen with memcached but not for RedisParser, because RedisParser fully
// consumes the passed buffer
LOG_EVERY_T(WARNING, 10) << "Maximum io_buf length reached " << io_buf_.Capacity()
LOG_EVERY_T(WARNING, 10) << "Maximum io_buf length reached " << io_buf->Capacity()
<< ", consider to increase max_client_iobuf_len flag";
}
}
Expand Down Expand Up @@ -2795,7 +2805,7 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
HandleMigrateRequest();

// Register completion for current head if its pending and we don't wait on current_wait_.
if (HasDispatchedCommands() && !current_wait_.has_value()) {
if (HasInFlightCommands() && !current_wait_.has_value()) {
current_wait_.emplace(parsed_head_, &cmd_completion_waiter);
}

Expand All @@ -2813,20 +2823,20 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {

// We wake up if:
// 1. New data arrived or is pending (io_buf_.InputLen() > 0 || pending_input_).
// 2. A parsed command is ready to execute (HeadReadyToDispatch()).
// 2. A parsed command is ready to execute (HasCommandToExecute()).
// 3. An executed command is ready to send its reply (parsed_head_ &&
// parsed_head_->CanReply()).
// 4. Control-plane messages arrived (!dispatch_q_.empty()).
// 5. The socket encountered an error/closed (io_ec_).
// 6. A migration to another thread was requested AND is actionable now (no subscriptions).
return io_buf_.InputLen() > 0 || pending_input_ || HeadReadyToDispatch() ||
return io_buf_.InputLen() > 0 || pending_input_ || HasCommandToExecute() ||
(parsed_head_ && parsed_head_->CanReply()) || !dispatch_q_.empty() || io_ec_ ||
is_ready_to_migrate();
});
}

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;
bool reached_capacity = io_buf_.AppendLen() == 0;

// Temporary: Handle dispatch queue items (Control Path) one by one blocking command execution
if (!dispatch_q_.empty()) {
Expand Down Expand Up @@ -2880,7 +2890,7 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
size_t mem_before = conn_stats.pipeline_queue_bytes;

if (parsed_head_) {
if (HeadReadyToDispatch())
if (HasCommandToExecute())
ExecuteBatch();
ReplyBatch();
}
Expand Down Expand Up @@ -2954,7 +2964,7 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {

if (parse_status == NEED_MORE) {
parse_status = OK;
CheckIoBufCapacity(is_iobuf_full);
CheckIoBufCapacity(reached_capacity, &io_buf_);
}
} while (peer->IsOpen());

Expand Down
20 changes: 12 additions & 8 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class Connection : public util::Connection {
// Drains currently available bytes from socket into io_buf_ using non-blocking reads.
void ReadPendingInput();

void CheckIoBufCapacity(bool is_iobuf_full);
void CheckIoBufCapacity(bool reached_capacity, base::IoBuf* buf);
Copy link
Copy Markdown
Contributor

@glevkovich glevkovich Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. nit: personally, I feel like is_iobuf_full is clearer, but I guess it's matter of taste. Even if you decide to keep the new name, there is inconsistency with the local variable naming: if you decided to use reached_capacity, I would suggest to rename local variables as well:
    is_iobuf_full should be renamed to reached_capacity. See both in IoLoopV2() and IoLoop().

  2. Also, I think it would be more consistent, to update this to take a reference (base::IoBuf& buf). The rest of the file uses references for these non-optional mutable buffers.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean is_iobuf_full ? what does it mean a collection (vector) is full?

I am confused about the semantics of this as our collections are resizable and do not have "is_full" state. moreover, when we call CheckIoBufCapacity, io_buf is not "full".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean is_iobuf_full ? what does it mean a collection (vector) is full?

I am confused about the semantics of this as our collections are resizable and do not have "is_full" state. moreover, when we call CheckIoBufCapacity, io_buf is not "full".

OK, make sense, still the suggestion to rename on the 1st part is still valid.
look for (2 places):
bool is_iobuf_full = io_buf_.AppendLen() == 0;


// Main loop reading client messages and passing requests to dispatch queue.
std::variant<std::error_code, ParserStatus> IoLoopV2();
Expand Down Expand Up @@ -319,7 +319,7 @@ class Connection : public util::Connection {
// If add is true, stats are incremented, otherwise decremented.
void UpdateDispatchStats(const MessageHandle& msg, bool add);

ParserStatus ParseRedis(unsigned max_busy_cycles, bool enqueue_only = false);
ParserStatus ParseRedis(base::IoBuf& buf, unsigned max_busy_cycles, bool enqueue_only = false);

void OnBreakCb(int32_t mask);

Expand Down Expand Up @@ -366,9 +366,13 @@ class Connection : public util::Connection {
// Returns true if one or more commands were parsed from the read buffer,
// and false if no complete commands could be parsed (for example, when
// parsing is pending more input).
bool ParseMCBatch();
bool ParseMCBatch(base::IoBuf& buf);

bool ParseRedisBatch();
bool ParseRedisBatch(base::IoBuf& buf);

// Call the appropriate ParseMCBatch or ParseRedisBatch based on the protocol.
// Only CPU-bound work; must not perform I/O or fiber suspension.
void ParseFromBuffer(base::IoBuf& buf);

// Call appropriate ParseBatch function, proceed with Execute and Reply all why input is remaining
ParserStatus ParseLoop();
Expand Down Expand Up @@ -453,13 +457,13 @@ class Connection : public util::Connection {
size_t parsed_cmd_q_bytes_ = 0;

// Returns true if there are dispatched commands that haven't been replied yet.
bool HasDispatchedCommands() const {
bool HasInFlightCommands() const {
return parsed_head_ != parsed_to_execute_;
}

// Returns true if the head command is ready to dispatch (nothing in-flight ahead of it).
bool HeadReadyToDispatch() const {
return parsed_head_ && !HasDispatchedCommands();
// Returns true if the head command is ready to execute (nothing in-flight ahead of it).
bool HasCommandToExecute() const {
return parsed_head_ && !HasInFlightCommands();
}

// Returns true if there are any commands pending in the parsed command queue or dispatch queue.
Expand Down
Loading