Skip to content

Commit b7d2317

Browse files
committed
relax atomic consistency; after all, nothing could possibly go wrong
1 parent 845383c commit b7d2317

2 files changed

Lines changed: 66 additions & 47 deletions

File tree

loaders/common.hpp

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -133,48 +133,56 @@ template <class Item> class BackgroundProducer {
133133
virtual bool Produce(Item &) = 0;
134134

135135
private:
136-
vector<Item> ring_;
136+
// ~constant:
137137
int R_;
138138
unique_ptr<thread> worker_;
139+
140+
// writable by either thread:
139141
atomic<bool> stop_;
142+
143+
// writable by background producer thread:
144+
vector<Item> ring_;
145+
atomic<long long> p_; // # produced
140146
string errmsg_;
141-
atomic<long long> p_, // produced
142-
c_; // if c_>0 then item (c_-1)%R is currently being consumed
143-
chrono::duration<double> p_blocked_ = chrono::duration<double>::zero(),
144-
c_blocked_ = chrono::duration<double>::zero();
145147
chrono::time_point<chrono::high_resolution_clock> t0_;
148+
chrono::duration<double> p_blocked_ = chrono::duration<double>::zero();
146149

147-
bool empty() { return p_ == c_; }
148-
149-
bool full() { return p_ - max(c_.load(), 1LL) == R_ - 1; }
150+
// writable by consumer thread:
151+
atomic<long long> c_; // if c_>0 then item (c_-1)%R is currently being consumed
152+
const Item *item_ = nullptr;
153+
chrono::duration<double> c_blocked_ = chrono::duration<double>::zero();
150154

151155
void background_thread() {
152156
t0_ = chrono::high_resolution_clock::now();
153157
do {
154-
assert(p_ >= c_);
158+
auto p = p_.load(memory_order_acquire);
155159
bool ok = false;
156160
try {
157-
ok = Produce(ring_[p_ % R_]);
161+
ok = Produce(ring_[p % R_]);
158162
} catch (exception &exn) {
159163
errmsg_ = exn.what();
160164
if (errmsg_.empty()) {
161165
errmsg_ = "unknown error on producer thread";
162166
}
163-
stop_ = true;
167+
stop_.store(true, memory_order_release);
164168
}
165169
if (ok) {
166-
++p_;
167-
if (full()) {
170+
++p;
171+
p_.store(p, memory_order_release);
172+
if (p - max(c_.load(memory_order_acquire), 1LL) == R_ - 1) {
168173
auto t_spin = chrono::high_resolution_clock::now();
169174
do {
175+
// assumption -- producer will usually be faster than the consumer, and
176+
// ringsize_ provides buffer if that's occasionally not the case
170177
this_thread::sleep_for(chrono::milliseconds(1));
171-
} while (!stop_ && full());
178+
} while (!stop_.load(memory_order_relaxed) &&
179+
(p - max(c_.load(memory_order_acquire), 1LL) == R_ - 1));
172180
p_blocked_ += chrono::high_resolution_clock::now() - t_spin;
173181
}
174182
} else {
175-
stop_ = true;
183+
stop_.store(true, memory_order_relaxed);
176184
}
177-
} while (!stop_);
185+
} while (!stop_.load(memory_order_relaxed));
178186
}
179187

180188
public:
@@ -192,36 +200,36 @@ template <class Item> class BackgroundProducer {
192200
bool next() {
193201
if (!worker_) {
194202
while (ring_.size() < R_) {
195-
ring_.push_back(Item());
203+
ring_.emplace_back();
196204
}
197205
worker_.reset(new thread([this]() { this->background_thread(); }));
198206
}
199-
if (empty()) {
200-
auto t_start = chrono::high_resolution_clock::now();
201-
while (!stop_ && empty()) {
207+
auto c = c_.load(memory_order_acquire), p = p_.load(memory_order_acquire);
208+
if (c == p) {
209+
auto t_spin = chrono::high_resolution_clock::now();
210+
while (c == p && !stop_.load(memory_order_relaxed)) {
202211
this_thread::yield();
212+
p = p_.load(memory_order_acquire);
203213
}
204-
c_blocked_ += chrono::high_resolution_clock::now() - t_start;
214+
c_blocked_ += chrono::high_resolution_clock::now() - t_spin;
205215
}
206-
if (stop_ && empty()) {
216+
if (c == p && stop_.load(memory_order_acquire)) {
207217
if (!errmsg_.empty()) {
208218
throw runtime_error(errmsg_);
209219
}
210220
return false;
211221
}
212-
assert(c_ < p_);
213-
c_++;
222+
assert(c < p);
223+
item_ = &ring_[c % R_];
224+
c_.store(c + 1, memory_order_release);
214225
return true;
215226
}
216227

217228
// get current item for consumption; defined only after next() returned true
218-
Item &item() {
219-
assert(c_ && errmsg_.empty());
220-
return ring_[(c_ - 1) % R_];
221-
}
229+
const Item &item() { return *item_; }
222230

223231
void abort() {
224-
stop_ = true;
232+
stop_.store(true, memory_order_relaxed);
225233
if (worker_) {
226234
worker_->join();
227235
}

loaders/sam_into_sqlite.cc

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,36 @@ struct SamItem {
112112
shared_ptr<kstring_t> line;
113113
vector<char *> fields;
114114
shared_ptr<bam1_t> rec;
115+
116+
string tags_json;
117+
int rg_id = -1;
118+
119+
SamItem() {
120+
line = kstringXX();
121+
rec = shared_ptr<bam1_t>(bam_init1(), &bam_destroy1);
122+
}
123+
124+
void Clear() {
125+
line->l = 0;
126+
fields.clear();
127+
tags_json.clear();
128+
rg_id = -1;
129+
}
115130
};
116131

117132
class SamReader : public BackgroundProducer<SamItem> {
118133
public:
119-
SamReader(samFile *sam, bam_hdr_t *hdr, int ringsize)
120-
: BackgroundProducer<SamItem>(ringsize), sam_(sam), hdr_(hdr) {}
134+
SamReader(samFile *sam, bam_hdr_t *hdr, const map<string, int> &readgroups, int ringsize)
135+
: BackgroundProducer<SamItem>(ringsize), sam_(sam), hdr_(hdr), readgroups_(readgroups) {}
121136

122137
private:
123138
samFile *sam_;
124139
bam_hdr_t *hdr_;
140+
const map<string, int> &readgroups_;
141+
OStringStream tagsbuf_;
125142

126143
bool Produce(SamItem &it) override {
127-
if (!it.line) {
128-
it.line = kstringXX();
129-
}
130-
it.line->l = 0;
131-
it.fields.clear();
132-
if (!it.rec) {
133-
it.rec = shared_ptr<bam1_t>(bam_init1(), &bam_destroy1);
134-
}
144+
it.Clear();
135145
int ret = sam_read1(sam_, hdr_, it.rec.get());
136146
if (ret == -1) {
137147
return false;
@@ -148,6 +158,10 @@ class SamReader : public BackgroundProducer<SamItem> {
148158
throw runtime_error("Corrupt SAM record; fields.size() = " +
149159
to_string(it.fields.size()));
150160
}
161+
// formulate tags JSON on here on background thread
162+
tagsbuf_.Clear();
163+
it.rg_id = write_tags_json(readgroups_, it.fields, tagsbuf_);
164+
it.tags_json = tagsbuf_.Get();
151165
return true;
152166
}
153167
};
@@ -314,10 +328,9 @@ int main(int argc, char *argv[]) {
314328

315329
// stream bam1_t records
316330
progress &&cerr << "inserting reads...";
317-
SamReader reader(sam.get(), hdr.get(), 64);
318-
OStringStream cigarstr, tagsbuf;
331+
SamReader reader(sam.get(), hdr.get(), readgroups, 64);
319332
while (reader.next()) {
320-
SamItem &it = reader.item();
333+
const SamItem &it = reader.item();
321334
bam1_t *rec = it.rec.get();
322335
auto &sam_fields = it.fields;
323336

@@ -347,10 +360,8 @@ int main(int argc, char *argv[]) {
347360
if (rec->core.isize)
348361
insert_read.bind(9, rec->core.isize); // tlen
349362

350-
tagsbuf.Clear();
351-
int rg_id = write_tags_json(readgroups, sam_fields, tagsbuf);
352-
if (rg_id >= 0 && rg_id < readgroups.size()) {
353-
insert_read.bind(10, rg_id); // rg_id
363+
if (it.rg_id >= 0 && it.rg_id < readgroups.size()) {
364+
insert_read.bind(10, it.rg_id); // rg_id
354365
}
355366

356367
insert_read.exec();
@@ -370,7 +381,7 @@ int main(int argc, char *argv[]) {
370381

371382
insert_tags.reset();
372383
insert_tags.bind(1, rowid);
373-
insert_tags.bind(2, tagsbuf.Get()); // tags_json
384+
insert_tags.bindNoCopy(2, it.tags_json.c_str()); // tags_json
374385
insert_tags.exec();
375386
}
376387
progress &&cerr << reader.log() << endl;

0 commit comments

Comments
 (0)