Skip to content

Commit 32e2e05

Browse files
committed
loaders: factor out BackgroundProducer
1 parent 4ee578d commit 32e2e05

2 files changed

Lines changed: 146 additions & 96 deletions

File tree

loaders/common.hpp

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
#include "SQLiteCpp/SQLiteCpp.h"
44
#include "genomicsqlite.h"
55
#include "strlcpy.h"
6+
#include <atomic>
7+
#include <chrono>
68
#include <iostream>
79
#include <memory>
810
#include <sqlite3.h>
911
#include <string>
12+
#include <thread>
13+
#include <vector>
1014

1115
#ifndef NDEBUG
1216
#define _DBG cerr << __FILE__ << ":" << __LINE__ << ": "
@@ -43,7 +47,7 @@ void split(string &s, char delim, Out result, uint64_t maxsplit = ULLONG_MAX) {
4347
split(&s[0], delim, result, maxsplit);
4448
}
4549

46-
// because std::ostringstream is too slow :(
50+
// because ostringstream is too slow :(
4751
class OStringStream {
4852
public:
4953
OStringStream(size_t initial_capacity) : buf_size_(initial_capacity), cursor_(0) {
@@ -84,7 +88,7 @@ class OStringStream {
8488
Add(s);
8589
return *this;
8690
}
87-
inline OStringStream &operator<<(const std::string &s) { return *this << s.c_str(); }
91+
inline OStringStream &operator<<(const string &s) { return *this << s.c_str(); }
8892

8993
inline const char *Get() const {
9094
assert(buf_[cursor_] == 0);
@@ -118,3 +122,117 @@ class OStringStream {
118122
unique_ptr<char[]> buf_;
119123
size_t buf_size_, cursor_;
120124
};
125+
126+
// Producer-consumer pattern: background producer thread preprocessing "Items" to be consumed by
127+
// the main thread. Queues up to `ringsize` prepared items.
128+
template <class Item> class BackgroundProducer {
129+
protected:
130+
// Will be called on the background thread to populate the next item. Fills it out in-place
131+
// from undefined initial state (avoids reallocating); returns true on success, false if the
132+
// item stream is successfully complete, or throws an exception.
133+
virtual bool Produce(Item &) = 0;
134+
135+
private:
136+
vector<Item> ring_;
137+
int R_;
138+
unique_ptr<thread> worker_;
139+
atomic<bool> stop_;
140+
string errmsg_;
141+
atomic<long long> p_, // produced
142+
c_; // if c_>0 then item (c_-1)%R is currently being consumed
143+
chrono::duration<double> p_blocked_ = chrono::duration<double>::zero(),
144+
c_blocked_ = chrono::duration<double>::zero();
145+
chrono::time_point<chrono::high_resolution_clock> t0_;
146+
147+
bool empty() { return p_ == c_; }
148+
149+
bool full() { return p_ - max(c_.load(), 1LL) == R_ - 1; }
150+
151+
void background_thread() {
152+
t0_ = chrono::high_resolution_clock::now();
153+
do {
154+
assert(p_ >= c_);
155+
bool ok = false;
156+
try {
157+
ok = Produce(ring_[p_ % R_]);
158+
} catch (exception &exn) {
159+
errmsg_ = exn.what();
160+
if (errmsg_.empty()) {
161+
errmsg_ = "unknown error on producer thread";
162+
}
163+
stop_ = true;
164+
}
165+
if (ok) {
166+
++p_;
167+
if (full()) {
168+
auto t_spin = chrono::high_resolution_clock::now();
169+
do {
170+
this_thread::sleep_for(chrono::milliseconds(1));
171+
} while (!stop_ && full());
172+
p_blocked_ += chrono::high_resolution_clock::now() - t_spin;
173+
}
174+
} else {
175+
stop_ = true;
176+
}
177+
} while (!stop_);
178+
}
179+
180+
public:
181+
BackgroundProducer(int ringsize) : R_(ringsize) {
182+
assert(R_ > 1);
183+
stop_ = false;
184+
p_ = 0;
185+
c_ = 0;
186+
}
187+
188+
virtual ~BackgroundProducer() { abort(); }
189+
190+
// advance to next item for consumption, return false when item stream has ended successfully,
191+
// or throw an exception.
192+
bool next() {
193+
if (!worker_) {
194+
while (ring_.size() < R_) {
195+
ring_.push_back(Item());
196+
}
197+
worker_.reset(new thread([this]() { this->background_thread(); }));
198+
}
199+
if (empty()) {
200+
auto t_start = chrono::high_resolution_clock::now();
201+
while (!stop_ && empty()) {
202+
this_thread::yield();
203+
}
204+
c_blocked_ += chrono::high_resolution_clock::now() - t_start;
205+
}
206+
if (stop_ && empty()) {
207+
if (!errmsg_.empty()) {
208+
throw runtime_error(errmsg_);
209+
}
210+
return false;
211+
}
212+
assert(c_ < p_);
213+
c_++;
214+
return true;
215+
}
216+
217+
// get current item for consumption; defined only after next() returned true
218+
Item &item() {
219+
assert(c_ && errmsg_.empty());
220+
return ring_[(c_ - 1) % R_];
221+
}
222+
223+
void abort() {
224+
stop_ = true;
225+
if (worker_) {
226+
worker_->join();
227+
}
228+
}
229+
230+
string log() {
231+
chrono::duration<double> elapsed = chrono::high_resolution_clock::now() - t0_;
232+
OStringStream ans;
233+
ans << to_string(c_) << " item(s) processed in " << to_string(elapsed.count()) << "s"
234+
<< "; producer blocked for " << to_string(p_blocked_.count()) << "s"
235+
<< "; consumer blocked for " << to_string(c_blocked_.count()) << "s";
236+
return string(ans.Get());
237+
}
238+
};

loaders/vcf_into_sqlite.cc

Lines changed: 26 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,14 @@
77
* - array values are stored as JSON
88
*/
99
#include "common.hpp"
10-
#include <atomic>
11-
#include <chrono>
1210
#include <cmath>
1311
#include <functional>
1412
#include <getopt.h>
1513
#include <htslib/vcf.h>
1614
#include <map>
1715
#include <set>
1816
#include <sstream>
19-
#include <thread>
2017
#include <unistd.h>
21-
#include <vector>
2218

2319
// unpack each bcf_hrec_t with the key type (e.g. INFO, FORMAT) into an easier-to-use map
2420
vector<map<string, string>> extract_hrecs(bcf_hdr_t *hdr, const char *key,
@@ -513,98 +509,33 @@ void insert_genotypes(bcf_hdr_t *hdr, bcf1_t *rec, vector<map<string, string>> &
513509
}
514510

515511
// stream BCF records using background thread
516-
class BCFReader {
512+
class BCFReader : public BackgroundProducer<shared_ptr<bcf1_t>> {
513+
public:
514+
BCFReader(vcfFile *vcf, bcf_hdr_t *hdr, int ringsize)
515+
: BackgroundProducer<shared_ptr<bcf1_t>>(ringsize), vcf_(vcf), hdr_(hdr) {}
516+
517+
private:
517518
vcfFile *vcf_;
518519
bcf_hdr_t *hdr_;
519520

520-
vector<unique_ptr<bcf1_t, void (*)(bcf1_t *)>> ring_;
521-
unique_ptr<thread> worker_;
522-
atomic<bool> stop_;
523-
int err_ = 0, errcode_ = 0;
524-
atomic<long long> p_, // produced
525-
c_; // if c_>0 then item (c_-1)%R is currently being consumed
526-
chrono::duration<double> p_spin_ = chrono::duration<double>::zero(),
527-
c_spin_ = chrono::duration<double>::zero();
528-
chrono::time_point<chrono::high_resolution_clock> t0_;
529-
530-
void background() {
531-
t0_ = chrono::high_resolution_clock::now();
532-
auto R = ring_.size();
533-
do {
534-
assert(p_ >= c_);
535-
bcf1_t *rec = ring_[p_ % R].get();
536-
int ret = bcf_read(vcf_, hdr_, rec);
537-
if (ret != 0) {
538-
if (ret != -1 || rec->errcode) {
539-
err_ = ret;
540-
errcode_ = rec->errcode;
541-
}
542-
stop_ = true;
543-
} else {
544-
ret = bcf_unpack(rec, BCF_UN_ALL);
545-
if (ret != 0) {
546-
err_ = ret;
547-
stop_ = true;
548-
} else {
549-
++p_;
550-
}
551-
}
552-
auto t_spin = chrono::high_resolution_clock::now();
553-
for (int i = 0; !stop_ && p_ - max(c_.load(), 1LL) == R - 1; ++i) {
554-
this_thread::sleep_for(chrono::milliseconds(1));
555-
}
556-
p_spin_ += chrono::high_resolution_clock::now() - t_spin;
557-
} while (!stop_);
558-
}
559-
560-
public:
561-
BCFReader(vcfFile *vcf, bcf_hdr_t *hdr, int ringsize) : vcf_(vcf), hdr_(hdr) {
562-
assert(ringsize > 1);
563-
stop_ = false;
564-
p_ = 0;
565-
c_ = 0;
566-
for (int i = 0; i < ringsize; i++) {
567-
ring_.emplace_back(bcf_init(), &bcf_destroy);
521+
bool Produce(shared_ptr<bcf1_t> &it) override {
522+
if (!it) {
523+
it = shared_ptr<bcf1_t>(bcf_init(), &bcf_destroy);
568524
}
569-
}
570-
571-
bcf1_t *read() {
572-
if (!worker_) {
573-
worker_.reset(new thread([this]() { this->background(); }));
574-
}
575-
auto t_spin = chrono::high_resolution_clock::now();
576-
while (!stop_ && p_ == c_)
577-
this_thread::yield();
578-
c_spin_ += chrono::high_resolution_clock::now() - t_spin;
579-
if (stop_) {
580-
if (err_ || errcode_) {
581-
worker_->join();
582-
ostringstream msg;
583-
msg << "vcf_into_sqlite: failed reading VCF; bcf_read() -> " << err_
584-
<< " bcf1_t::errcode = " << errcode_ << '\n';
585-
throw runtime_error(msg.str());
586-
}
587-
if (c_ == p_) {
588-
worker_->join();
589-
return nullptr;
590-
}
525+
int ret = bcf_read(vcf_, hdr_, it.get());
526+
if (ret == -1) {
527+
return false;
528+
} else if (ret != 0 || it->errcode) {
529+
ostringstream msg;
530+
msg << "VCF parser failed: bcf1_read() -> " << ret
531+
<< ", bcf1_t::errcode = " << it->errcode;
532+
throw runtime_error(msg.str());
591533
}
592-
assert(c_ < p_);
593-
return ring_[c_++ % ring_.size()].get();
594-
}
595-
596-
void cancel() {
597-
stop_ = true;
598-
if (worker_) {
599-
worker_->join();
534+
ret = bcf_unpack(it.get(), BCF_UN_ALL);
535+
if (ret != 0) {
536+
throw runtime_error("Corrupt VCF/BCF record; bcf_unpack() -> " + to_string(ret));
600537
}
601-
}
602-
603-
void log() {
604-
chrono::duration<double> elapsed = chrono::high_resolution_clock::now() - t0_;
605-
cerr << c_ << " record(s) processed in " << elapsed.count() << "s"
606-
<< "; producer thread spun for " << p_spin_.count() << "s"
607-
<< "; consumer thread spun for " << c_spin_.count() << "s" << endl;
538+
return true;
608539
}
609540
};
610541

@@ -778,8 +709,9 @@ int main(int argc, char *argv[]) {
778709
<< (format_hrecs.empty() ? "..." : " & genotypes...") << endl;
779710

780711
BCFReader reader(vcf.get(), hdr.get(), 64);
781-
bcf1_t *rec;
782-
while ((rec = reader.read())) {
712+
while (reader.next()) {
713+
bcf1_t *rec = reader.item().get();
714+
assert(rec);
783715
try {
784716
insert_variant(hdr.get(), rec, info_hrecs, *insert_variant_stmt);
785717
if (!format_hrecs.empty()) {
@@ -791,11 +723,11 @@ int main(int argc, char *argv[]) {
791723
*insert_genotype_stmt);
792724
}
793725
} catch (exception &exn) {
794-
reader.cancel();
726+
reader.abort();
795727
throw exn;
796728
}
797729
}
798-
progress && (reader.log(), true);
730+
progress &&cerr << reader.log() << endl;
799731

800732
// create GRI
801733
if (gri) {

0 commit comments

Comments
 (0)