Skip to content

Commit bd26a7c

Browse files
branch-4.0: [bugfix](arrowflight) should call done run in on_xxx method to make work in async mode #60282 (#60325)
Cherry-picked from #60282 Co-authored-by: yiguolei <guolei@selectdb.com>
1 parent d1e7df5 commit bd26a7c

5 files changed

Lines changed: 35 additions & 11 deletions

File tree

be/src/service/internal_service.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,8 +660,7 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control
660660
PFetchArrowDataResult* result,
661661
google::protobuf::Closure* done) {
662662
bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
663-
brpc::ClosureGuard closure_guard(done);
664-
auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result);
663+
auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result, done);
665664
TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id
666665
std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> arrow_buffer;
667666
auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id, arrow_buffer);

be/src/vec/sink/varrow_flight_result_writer.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ namespace doris::vectorized {
3333
void GetArrowResultBatchCtx::on_failure(const Status& status) {
3434
DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
3535
status.to_protobuf(_result->mutable_status());
36+
_done->Run();
3637
}
3738

3839
void GetArrowResultBatchCtx::on_close(int64_t packet_seq, int64_t /* returned_rows */) {
3940
Status status;
4041
status.to_protobuf(_result->mutable_status());
4142
_result->set_packet_seq(packet_seq);
4243
_result->set_eos(true);
44+
_done->Run();
4345
}
4446

4547
Status GetArrowResultBatchCtx::on_data(const std::shared_ptr<vectorized::Block>& block,
@@ -72,6 +74,8 @@ Status GetArrowResultBatchCtx::on_data(const std::shared_ptr<vectorized::Block>&
7274
_result->clear_block();
7375
}
7476
st.to_protobuf(_result->mutable_status());
77+
78+
_done->Run();
7579
return Status::OK();
7680
}
7781

be/src/vec/sink/varrow_flight_result_writer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class GetArrowResultBatchCtx {
3535
public:
3636
using ResultType = vectorized::Block;
3737
ENABLE_FACTORY_CREATOR(GetArrowResultBatchCtx)
38-
GetArrowResultBatchCtx(PFetchArrowDataResult* result) : _result(result) {}
38+
GetArrowResultBatchCtx(PFetchArrowDataResult* result, google::protobuf::Closure* done)
39+
: _result(result), _done(done) {}
3940
#ifdef BE_TEST
4041
GetArrowResultBatchCtx() = default;
4142
#endif
@@ -53,6 +54,7 @@ class GetArrowResultBatchCtx {
5354
int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
5455
#endif
5556
PFetchArrowDataResult* _result = nullptr;
57+
google::protobuf::Closure* _done = nullptr;
5658
};
5759

5860
class ArrowFlightResultBlockBuffer final : public ResultBlockBuffer<GetArrowResultBatchCtx> {

be/test/vec/sink/arrow_result_block_buffer_test.cpp

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@
3434

3535
namespace doris::vectorized {
3636

37+
class MockClosure : public google::protobuf::Closure {
38+
public:
39+
MockClosure() {}
40+
MockClosure(std::function<void()> cb) : _cb(cb) {}
41+
void Run() override { _cb(); }
42+
43+
std::function<void()> _cb;
44+
};
3745
class ArrowResultBlockBufferTest : public ::testing::Test {
3846
public:
3947
ArrowResultBlockBufferTest() = default;
@@ -44,8 +52,9 @@ class MockGetArrowResultBatchCtx : public GetArrowResultBatchCtx {
4452
public:
4553
ENABLE_FACTORY_CREATOR(MockGetArrowResultBatchCtx)
4654
MockGetArrowResultBatchCtx(std::function<void()> fail_cb, std::function<void()> close_cb,
47-
std::function<void()> data_cb, PFetchArrowDataResult* result)
48-
: GetArrowResultBatchCtx(result),
55+
std::function<void()> data_cb, PFetchArrowDataResult* result,
56+
google::protobuf::Closure* done)
57+
: GetArrowResultBatchCtx(result, done),
4958
_fail_cb(fail_cb),
5059
_close_cb(close_cb),
5160
_data_cb(data_cb) {}
@@ -78,9 +87,11 @@ TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) {
7887
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
7988
buffer.set_dependency(ins_id, dep);
8089
PFetchArrowDataResult presult;
90+
91+
MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
8192
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
8293
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
83-
[&]() -> void { data = true; }, &presult);
94+
[&]() -> void { data = true; }, &presult, &done);
8495

8596
{
8697
auto num_rows = 2;
@@ -201,9 +212,11 @@ TEST_F(ArrowResultBlockBufferTest, TestCancelArrowResultBlockBuffer) {
201212
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
202213
buffer.set_dependency(ins_id, dep);
203214
PFetchArrowDataResult presult;
215+
216+
MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
204217
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
205218
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
206-
[&]() -> void { data = true; }, &presult);
219+
[&]() -> void { data = true; }, &presult, &done);
207220

208221
{
209222
EXPECT_TRUE(buffer.get_batch(ctx).ok());
@@ -273,9 +286,11 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
273286
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
274287
buffer.set_dependency(ins_id, dep);
275288
PFetchArrowDataResult presult;
289+
290+
MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
276291
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
277292
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
278-
[&]() -> void { data = true; }, &presult);
293+
[&]() -> void { data = true; }, &presult, &done);
279294

280295
{
281296
EXPECT_TRUE(buffer.get_batch(ctx).ok());
@@ -330,9 +345,11 @@ TEST_F(ArrowResultBlockBufferTest, TestArrowResultSerializeFailure) {
330345
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
331346
buffer.set_dependency(ins_id, dep);
332347
PFetchArrowDataResult presult;
348+
349+
MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
333350
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
334351
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
335-
[&]() -> void { data = true; }, &presult);
352+
[&]() -> void { data = true; }, &presult, &done);
336353

337354
{
338355
auto num_rows = 2;

be/test/vec/sink/get_result_batch_test.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ class GetResultBatchCtxTest : public ::testing::Test {
4040

4141
class MockClosure : public google::protobuf::Closure {
4242
public:
43+
MockClosure() {}
4344
MockClosure(std::function<void()> cb) : _cb(cb) {}
4445
void Run() override { _cb(); }
4546

46-
private:
4747
std::function<void()> _cb;
4848
};
4949

@@ -126,7 +126,9 @@ TEST_F(GetResultBatchCtxTest, TestGetResultBatchCtx) {
126126

127127
TEST_F(GetResultBatchCtxTest, TestGetArrowResultBatchCtx) {
128128
PFetchArrowDataResult result;
129-
auto ctx = GetArrowResultBatchCtx::create_shared(&result);
129+
MockClosure closure;
130+
closure._cb = [&]() { std::cout << "cb" << std::endl; };
131+
auto ctx = GetArrowResultBatchCtx::create_shared(&result, &closure);
130132

131133
{
132134
// on_failure

0 commit comments

Comments
 (0)