Skip to content

Commit 97075af

Browse files
committed
Clarify functions and add additional utilities.
Some of the callback functions specified by the messager struct were badly named; their names have been updated to reflect their actual usage. Additionally, the GLib implementations provide some utility functions that make looping really easy. A complete event loop using poll(2) now looks something like this: while (!curvecpr_client_messager_glib_is_finished(cmg)) { long long timeout = curvecpr_client_messager_glib_next_timeout(cmg) / 1000000; struct pollfd polls[1] = { { .fd = my_socket, .events = POLLIN } }; if (poll(polls, 1, timeout) < 0) break; if (polls[0].revents) { unsigned char buf[1152]; int r = recvfrom(my_socket, buf, 1152, 0, NULL, NULL); if (r <= 0) break; curvecpr_client_messager_glib_recv(cmg, buf, r); } curvecpr_client_messager_glib_process_sendq(cmg); }
1 parent cbdefd4 commit 97075af

7 files changed

Lines changed: 48 additions & 23 deletions

File tree

libcurvecpr-glib/include/curvecpr_glib/client_messager_glib.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ void curvecpr_client_messager_glib_new (struct curvecpr_client_messager_glib *cm
5454
void curvecpr_client_messager_glib_dealloc (struct curvecpr_client_messager_glib *cmg);
5555
int curvecpr_client_messager_glib_connected (struct curvecpr_client_messager_glib *cmg);
5656
int curvecpr_client_messager_glib_send (struct curvecpr_client_messager_glib *cmg, const unsigned char *buf, size_t num);
57-
int curvecpr_client_messager_glib_close (struct curvecpr_client_messager_glib *cmg);
5857
int curvecpr_client_messager_glib_recv (struct curvecpr_client_messager_glib *cmg, const unsigned char *buf, size_t num);
58+
unsigned char curvecpr_client_messager_glib_is_finished (struct curvecpr_client_messager_glib *cmg);
59+
int curvecpr_client_messager_glib_finish (struct curvecpr_client_messager_glib *cmg);
5960
int curvecpr_client_messager_glib_process_sendq (struct curvecpr_client_messager_glib *cmg);
6061
long long curvecpr_client_messager_glib_next_timeout (struct curvecpr_client_messager_glib *cmg);
6162

libcurvecpr-glib/include/curvecpr_glib/messager_glib.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ struct curvecpr_messager_glib {
4949

5050
void curvecpr_messager_glib_new (struct curvecpr_messager_glib *mg, struct curvecpr_messager_glib_cf *cf, unsigned char client);
5151
void curvecpr_messager_glib_dealloc (struct curvecpr_messager_glib *mg);
52-
int curvecpr_messager_glib_close (struct curvecpr_messager_glib *mg);
52+
unsigned char curvecpr_messager_glib_is_finished (struct curvecpr_messager_glib *mg);
53+
int curvecpr_messager_glib_finish (struct curvecpr_messager_glib *mg);
5354
int curvecpr_messager_glib_send (struct curvecpr_messager_glib *mg, const unsigned char *buf, size_t num);
5455
int curvecpr_messager_glib_recv (struct curvecpr_messager_glib *mg, const unsigned char *buf, size_t num);
5556
int curvecpr_messager_glib_process_sendq (struct curvecpr_messager_glib *mg);

libcurvecpr-glib/lib/client_messager_glib.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,14 @@ int curvecpr_client_messager_glib_send (struct curvecpr_client_messager_glib *cm
111111
return curvecpr_messager_glib_send(&cmg->mg, buf, num);
112112
}
113113

114-
int curvecpr_client_messager_glib_close (struct curvecpr_client_messager_glib *cmg)
114+
unsigned char curvecpr_client_messager_glib_is_finished (struct curvecpr_client_messager_glib *cmg)
115115
{
116-
return curvecpr_messager_glib_close(&cmg->mg);
116+
return curvecpr_messager_glib_is_finished(&cmg->mg);
117+
}
118+
119+
int curvecpr_client_messager_glib_finish (struct curvecpr_client_messager_glib *cmg)
120+
{
121+
return curvecpr_messager_glib_finish(&cmg->mg);
117122
}
118123

119124
int curvecpr_client_messager_glib_recv (struct curvecpr_client_messager_glib *cmg, const unsigned char *buf, size_t num)
@@ -123,10 +128,10 @@ int curvecpr_client_messager_glib_recv (struct curvecpr_client_messager_glib *cm
123128

124129
int curvecpr_client_messager_glib_process_sendq (struct curvecpr_client_messager_glib *cmg)
125130
{
126-
return curvecpr_messager_glib_process_sendq(&cmg->mg);
131+
return cmg->client.negotiated == CURVECPR_CLIENT_PENDING ? 0 : curvecpr_messager_glib_process_sendq(&cmg->mg);
127132
}
128133

129134
long long curvecpr_client_messager_glib_next_timeout (struct curvecpr_client_messager_glib *cmg)
130135
{
131-
return curvecpr_messager_glib_next_timeout(&cmg->mg);
136+
return cmg->client.negotiated == CURVECPR_CLIENT_PENDING ? -1 : curvecpr_messager_glib_next_timeout(&cmg->mg);
132137
}

libcurvecpr-glib/lib/messager_glib.c

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,19 @@ static int _sendq_head (struct curvecpr_messager *messager, struct curvecpr_bloc
8484
return -1;
8585
}
8686

87+
static unsigned char _sendq_is_empty (struct curvecpr_messager *messager)
88+
{
89+
struct curvecpr_messager_glib *mg = messager->cf.priv;
90+
91+
return !mg->sendq_head_exists && /* We don't have a block actually waiting to be
92+
written. */
93+
mg->pending_used == 0 && /* We don't have any bytes that we could turn into a
94+
block to be written. */
95+
(!mg->pending_eof || /* The EOF flag isn't set. */
96+
mg->messager.my_eof); /* Even if our EOF flag is set, the messager must
97+
not have sent the EOF message. */
98+
}
99+
87100
static int _sendq_move_to_sendmarkq (struct curvecpr_messager *messager, const struct curvecpr_block *block, struct curvecpr_block **block_stored)
88101
{
89102
struct curvecpr_messager_glib *mg = messager->cf.priv;
@@ -251,7 +264,7 @@ static int _recvmarkq_put (struct curvecpr_messager *messager, const struct curv
251264
return 0;
252265
}
253266

254-
static int _recvmarkq_get (struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored)
267+
static int _recvmarkq_get_nth_unacknowledged (struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored)
255268
{
256269
struct curvecpr_messager_glib *mg = messager->cf.priv;
257270

@@ -280,7 +293,7 @@ static int _recvmarkq_get (struct curvecpr_messager *messager, unsigned int n, s
280293
return -1;
281294
}
282295

283-
static int _recvmarkq_move_range_to_recvq (struct curvecpr_messager *messager, unsigned long long start, unsigned long long end)
296+
static int _recvmarkq_remove_range (struct curvecpr_messager *messager, unsigned long long start, unsigned long long end)
284297
{
285298
struct curvecpr_messager_glib *mg = messager->cf.priv;
286299

@@ -317,17 +330,18 @@ void curvecpr_messager_glib_new (struct curvecpr_messager_glib *mg, struct curve
317330
.ops = {
318331
.sendq_head = _sendq_head,
319332
.sendq_move_to_sendmarkq = _sendq_move_to_sendmarkq,
320-
333+
.sendq_is_empty = _sendq_is_empty,
334+
321335
.sendmarkq_head = _sendmarkq_head,
322336
.sendmarkq_get = _sendmarkq_get,
323337
.sendmarkq_remove_range = _sendmarkq_remove_range,
324338
.sendmarkq_is_full = _sendmarkq_is_full,
325-
339+
326340
.recvmarkq_put = _recvmarkq_put,
327-
.recvmarkq_get = _recvmarkq_get,
341+
.recvmarkq_get_nth_unacknowledged = _recvmarkq_get_nth_unacknowledged,
328342
.recvmarkq_is_full = _recvmarkq_is_full,
329-
.recvmarkq_move_range_to_recvq = _recvmarkq_move_range_to_recvq,
330-
343+
.recvmarkq_remove_range = _recvmarkq_remove_range,
344+
331345
.send = _send
332346
},
333347
.priv = mg
@@ -362,7 +376,12 @@ void curvecpr_messager_glib_dealloc (struct curvecpr_messager_glib *mg)
362376
g_sequence_free(mg->recvmarkq);
363377
}
364378

365-
int curvecpr_messager_glib_close (struct curvecpr_messager_glib *mg)
379+
unsigned char curvecpr_messager_glib_is_finished (struct curvecpr_messager_glib *mg)
380+
{
381+
return mg->messager.my_final && mg->messager.their_final;
382+
}
383+
384+
int curvecpr_messager_glib_finish (struct curvecpr_messager_glib *mg)
366385
{
367386
if (mg->pending_eof)
368387
return -EINVAL;

libcurvecpr/include/curvecpr/messager.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ struct curvecpr_messager;
1313
struct curvecpr_messager_ops {
1414
int (*sendq_head)(struct curvecpr_messager *messager, struct curvecpr_block **block_stored);
1515
int (*sendq_move_to_sendmarkq)(struct curvecpr_messager *messager, const struct curvecpr_block *block, struct curvecpr_block **block_stored);
16+
unsigned char (*sendq_is_empty)(struct curvecpr_messager *messager);
1617

1718
/* The sent-to-be-marked queue (sendmarkq) is a priority queue of blocks ordered by
1819
the time at which they were last sent. */
@@ -22,9 +23,9 @@ struct curvecpr_messager_ops {
2223
unsigned char (*sendmarkq_is_full)(struct curvecpr_messager *messager);
2324

2425
int (*recvmarkq_put)(struct curvecpr_messager *messager, const struct curvecpr_block *block, struct curvecpr_block **block_stored);
25-
int (*recvmarkq_get)(struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored);
26+
int (*recvmarkq_get_nth_unacknowledged)(struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored);
2627
unsigned char (*recvmarkq_is_full)(struct curvecpr_messager *messager);
27-
int (*recvmarkq_move_range_to_recvq)(struct curvecpr_messager *messager, unsigned long long start, unsigned long long end);
28+
int (*recvmarkq_remove_range)(struct curvecpr_messager *messager, unsigned long long start, unsigned long long end);
2829

2930
int (*send)(struct curvecpr_messager *messager, const unsigned char *buf, size_t num);
3031
};

libcurvecpr/lib/messager.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ static int _send_block (struct curvecpr_messager *messager, struct curvecpr_bloc
265265
unsigned long long maximum_gap = UINT32_MAX;
266266

267267
for (;;) {
268-
if (cf->ops.recvmarkq_get(messager, block_num++, &received_block))
268+
if (cf->ops.recvmarkq_get_nth_unacknowledged(messager, block_num++, &received_block))
269269
break;
270270

271271
acknowledgment_ranges[i].exists = 1;
@@ -401,7 +401,7 @@ static int _send_block (struct curvecpr_messager *messager, struct curvecpr_bloc
401401
int i;
402402

403403
for (i = 0; i < 6 && acknowledgment_ranges[i].exists; ++i)
404-
cf->ops.recvmarkq_move_range_to_recvq(messager, acknowledgment_ranges[i].start, acknowledgment_ranges[i].end);
404+
cf->ops.recvmarkq_remove_range(messager, acknowledgment_ranges[i].start, acknowledgment_ranges[i].end);
405405

406406
}
407407

@@ -486,9 +486,7 @@ long long curvecpr_messager_next_timeout (struct curvecpr_messager *messager)
486486

487487
if (!cf->ops.sendmarkq_is_full(messager)) {
488488
/* If we have pending data, we might write it. */
489-
if (cf->ops.sendq_head(messager, &block)) {
490-
/* Nothing to write. */
491-
} else {
489+
if (!cf->ops.sendq_is_empty(messager)) {
492490
/* Write at the write rate. */
493491
if (at > messager->my_sent_clock + chicago->wr_rate)
494492
at = messager->my_sent_clock + chicago->wr_rate;

libcurvecpr/test/messager/test_send_with_1_failure_moves_message_from_sendq.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ static unsigned char t_q_is_full (struct curvecpr_messager *messager)
1616
return 0;
1717
}
1818

19-
static int t_recvmarkq_get (struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored)
19+
static int t_recvmarkq_get_nth_unacknowledged (struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored)
2020
{
2121
return 1;
2222
}
@@ -64,7 +64,7 @@ START_TEST (test_send_with_1_failure_moves_message_from_sendq)
6464
.ops = {
6565
.sendmarkq_is_full = t_q_is_full,
6666
.recvmarkq_is_full = t_q_is_full,
67-
.recvmarkq_get = t_recvmarkq_get,
67+
.recvmarkq_get_nth_unacknowledged = t_recvmarkq_get_nth_unacknowledged,
6868
.sendmarkq_head = t_sendmarkq_head,
6969
.sendq_head = t_sendq_head,
7070
.send = t_send,

0 commit comments

Comments
 (0)