Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@
### Added

- Add left_multiplication and right_multiplication to ThreadingInstructions (#99)
- OpenMP parallelism for Li-Stephens Viterbi and hets phases across targets (#127)
- NumPy batch API for zero-copy genotype transfer from Python to C++ (#127)

### Changed

- Build wheels on macOS 14 for arm64 and macOS 15 for x86_64 (#108)
- Replace hash map traceback storage with deque arena in Viterbi (#127)
- Replace hash map state lookup with flat vectors in ThreadsLowMem (#127)
- Boolean array neighbor search in Matcher replaces red-black tree (#127)

### Fixed

- Memory leak: raw HMM pointer replaced with unique_ptr (#127)
- Sign-extension in pair_key/coord_id_key hash functions (#127)
- Bounds check after array access in ThreadsFastLS (#127)
- exit(1) calls replaced with exceptions for proper Python error propagation (#127)

## [0.2.1] - 2025-06-03

Expand Down
26 changes: 18 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,9 @@ authors = [
requires-python = ">=3.9"

dependencies = [
"click",
"xarray",
"h5py",
"pandas",
"numba",
"numpy",
"tszip",
"arg-needle-lib==1.2.1",
"cyvcf2",
"ray",
"pgenlib",
"tqdm"
]
Expand All @@ -43,7 +36,24 @@ classifiers = [

[project.optional-dependencies]
dev = [
"pytest"
"pytest",
"h5py"
]
convert = [
"tszip"
]
impute = [
"pandas",
"scipy"
]
normalize = [
"msprime"
]
all = [
"tszip",
"msprime",
"pandas",
"scipy"
]

[project.scripts]
Expand Down
141 changes: 91 additions & 50 deletions src/AlleleAges.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#include "AlleleAges.hpp"
#include "GenotypeIterator.hpp"

#include <numeric>
#include <execution>
#include <algorithm>
#include <cmath>
#include <vector>
#include <unordered_map>
#include <map>

#include <boost/container/flat_set.hpp>

AgeEstimator::AgeEstimator(const ThreadingInstructions& instructions) {
num_samples = instructions.num_samples;
Expand Down Expand Up @@ -55,7 +52,12 @@ void AgeEstimator::process_site(const std::vector<int>& genotypes) {
} else {
if (genotypes.at(i) == 1) {
int target = threading_iterators.at(i).current_target;
path_lengths[i] = path_lengths[target] + 1;
// Self-referencing targets don't extend carrier chains
if (static_cast<size_t>(target) != i) {
path_lengths[i] = path_lengths[target] + 1;
} else {
path_lengths[i] = 1;
}
} else {
path_lengths[i] = 0;
}
Expand Down Expand Up @@ -84,64 +86,94 @@ void AgeEstimator::process_site(const std::vector<int>& genotypes) {
size_t start_tmp = path_start;
while (start_tmp > 0) {
int next_sample = threading_iterators.at(start_tmp).current_target;
// Skip self-referencing targets to avoid infinite loops
if (next_sample == static_cast<int>(start_tmp)) {
break;
}
double this_tmrca = threading_iterators.at(start_tmp).current_tmrca;
running_max = std::max(running_max, this_tmrca);
tmrcas.at(next_sample) = running_max;
start_tmp = next_sample;
}
if (start_tmp != 0) {
throw std::runtime_error("Invalid threading instruction traversal.");
}

for (int i = 0; i < num_samples; i++) {
if (tmrcas.at(i) < 0) {
int target = threading_iterators.at(i).current_target;
double tmrca = threading_iterators.at(i).current_tmrca;
tmrcas.at(i) = std::max(tmrcas.at(target), tmrca);
if (target == i || target < 0 || tmrcas.at(target) < 0) {
// Self-ref or unfilled target: use own tmrca
tmrcas.at(i) = tmrca;
} else {
tmrcas.at(i) = std::max(tmrcas.at(target), tmrca);
}
}
}

// Create a sorted unique list of coalescence times as transform_reduce
// below must be done in order. For performance, boost's flat_set is
// faster than std::set or sorting a std::vector in this instance.
boost::container::flat_set<double> unique_tmrcas(tmrcas.begin(), tmrcas.end());

// For each sample, check its tmrca with path_start and
// update the score for each tmrca bin accordingly
std::map<double, int> scores;
for (double t : unique_tmrcas) {
scores[t] = std::transform_reduce(
tmrcas.begin(),
tmrcas.end(),
genotypes.begin(),
0,
std::plus<>(),
[t](double tmrca, int genotype) {
bool is_carrier = genotype > 0;
if (is_carrier && tmrca <= t) {
return 1;
}
if (!is_carrier && tmrca > t) {
return 1;
}
return 0;
}
);
// Sort samples by tmrca, then sweep to find the threshold that
// maximizes: carriers_at_or_below(t) + non_carriers_above(t).
struct TmrcaSample {
double tmrca;
int genotype;
};
std::vector<TmrcaSample> sorted_samples;
sorted_samples.reserve(num_samples);
int total_non_carriers = 0;
for (int i = 0; i < num_samples; i++) {
sorted_samples.push_back({tmrcas[i], genotypes[i]});
if (genotypes[i] == 0) total_non_carriers++;
}

std::vector<double> age_bin_boundaries;
for (auto const& imap: scores)
age_bin_boundaries.push_back(imap.first);
std::vector<double> age_bins;

for (size_t k = 0; k < age_bin_boundaries.size(); k++) {
int score = scores.at(age_bin_boundaries.at(k));
if (score > max_score) {
max_score = score;
allele_age = (k == age_bin_boundaries.size() - 1)
? age_bin_boundaries.at(k) + 1
: (age_bin_boundaries.at(k) + age_bin_boundaries.at(k + 1)) / 2.;
// NaN-safe sort: put NaN values last
std::sort(sorted_samples.begin(), sorted_samples.end(),
[](const TmrcaSample& a, const TmrcaSample& b) {
if (std::isnan(a.tmrca)) return false;
if (std::isnan(b.tmrca)) return true;
return a.tmrca < b.tmrca;
});

// Initial score: threshold below all tmrcas → 0 carriers correct,
// all non-carriers correct (they're all above threshold).
int score = total_non_carriers;
int best_score = score;
double best_boundary = sorted_samples.front().tmrca;
double next_boundary = sorted_samples.front().tmrca;

// Sweep through sorted samples in groups of equal tmrca
size_t i_sweep = 0;
size_t n_sorted = sorted_samples.size();
while (i_sweep < n_sorted) {
double current_t = sorted_samples[i_sweep].tmrca;
// Stop at NaN values (they are sorted to the end)
if (std::isnan(current_t)) break;
// Process all samples at this tmrca
int carriers_at_t = 0;
int non_carriers_at_t = 0;
size_t group_end = i_sweep;
while (group_end < n_sorted && sorted_samples[group_end].tmrca == current_t) {
if (sorted_samples[group_end].genotype > 0)
carriers_at_t++;
else
non_carriers_at_t++;
group_end++;
}
// Moving threshold to include this group:
// carriers become correctly classified (+), non-carriers become incorrect (-)
score += carriers_at_t - non_carriers_at_t;

if (score > best_score) {
best_score = score;
best_boundary = current_t;
next_boundary = (group_end < n_sorted)
? sorted_samples[group_end].tmrca
: current_t;
}
i_sweep = group_end;
}

if (best_score > max_score) {
max_score = best_score;
allele_age = (best_boundary == next_boundary)
? best_boundary + 1
: (best_boundary + next_boundary) / 2.;
}
}

Expand All @@ -152,3 +184,12 @@ void AgeEstimator::process_site(const std::vector<int>& genotypes) {
std::vector<double> AgeEstimator::get_inferred_ages() const {
return estimated_ages;
}

std::vector<double> estimate_ages(const ThreadingInstructions& instructions) {
GenotypeIterator gt_it(instructions);
AgeEstimator estimator(instructions);
while (gt_it.has_next_genotype()) {
estimator.process_site(gt_it.next_genotype());
}
return estimator.get_inferred_ages();
}
4 changes: 4 additions & 0 deletions src/AlleleAges.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ class AgeEstimator {
std::vector<double> estimated_ages;
};

// Bulk function: creates GenotypeIterator + AgeEstimator internally,
// processes all sites in C++, returns estimated ages.
std::vector<double> estimate_ages(const ThreadingInstructions& instructions);

#endif // THREADS_ARG_ALLELE_AGES_HPP
27 changes: 27 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
find_package(Boost REQUIRED)
message(STATUS "Found Boost ${Boost_VERSION}")

# Find HDF5 C library (for .threads file I/O)
find_package(HDF5 REQUIRED COMPONENTS C)
message(STATUS "Found HDF5 ${HDF5_VERSION}")

# Optional OpenMP for parallel Viterbi across targets
find_package(OpenMP)
if(OpenMP_CXX_FOUND)
message(STATUS "OpenMP found — parallel Viterbi enabled")
else()
message(STATUS "OpenMP not found — building single-threaded")
endif()

# Threads static library
set(threads_arg_src
Demography.cpp
Expand All @@ -34,6 +46,8 @@ set(threads_arg_src
AlleleAges.cpp
GenotypeIterator.cpp
VCFWriter.cpp
ForwardBackward.cpp
ThreadsIO.cpp
)

set(threads_arg_hdr
Expand All @@ -52,6 +66,8 @@ set(threads_arg_hdr
AlleleAges.hpp
GenotypeIterator.hpp
VCFWriter.hpp
ForwardBackward.hpp
ThreadsIO.hpp
)

add_library(threads_arg STATIC
Expand All @@ -72,8 +88,17 @@ target_link_libraries(threads_arg
PRIVATE
Boost::headers
project_warnings
$<$<BOOL:${OpenMP_CXX_FOUND}>:OpenMP::OpenMP_CXX>
PUBLIC
HDF5::HDF5
)

# Native-architecture tuning + LTO for hot numeric kernels
target_compile_options(threads_arg PRIVATE
$<$<CXX_COMPILER_ID:GNU,Clang>:-march=native -O3>
)
set_property(TARGET threads_arg PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)

# Conditionally create python bindings
if(PYTHON_BINDINGS)
set_target_properties(threads_arg
Expand All @@ -92,5 +117,7 @@ if(PYTHON_BINDINGS)
project_warnings
)

set_property(TARGET threads_arg_python_bindings PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)

install(TARGETS threads_arg_python_bindings LIBRARY DESTINATION .)
endif()
27 changes: 25 additions & 2 deletions src/DataConsistency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#include "DataConsistency.hpp"
#include "GenotypeIterator.hpp"
#include <algorithm>
#include <limits>
#include <iostream>
#include <vector>
Expand Down Expand Up @@ -189,7 +191,9 @@ void ConsistencyWrapper::process_site(std::vector<int>& genotypes) {
// Otherwise we try traversing the local threading graph to find another carrier
int current_target = converter.current_target;
while (current_target != -1 && genotypes.at(current_target) != 1) {
current_target = instruction_converters.at(current_target).current_target;
int next = instruction_converters.at(current_target).current_target;
if (next == current_target) break; // avoid self-referencing loop
current_target = next;
}
if (current_target > 0 && genotypes.at(current_target) == 1) {
new_target = current_target;
Expand All @@ -199,6 +203,12 @@ void ConsistencyWrapper::process_site(std::vector<int>& genotypes) {
}
}
}
// Self-referencing targets break mismatch recording (comparing a
// sample's genotype with itself is always equal), so replace with
// sample 0 which is always correctly reconstructed.
if (new_target == current_hap) {
new_target = 0;
}
if (new_target != converter.current_target) {
// Force a new threading segment bounded by [0, allele_age) and
// using the new target
Expand All @@ -222,9 +232,22 @@ ThreadingInstructions ConsistencyWrapper::get_consistent_instructions() {
// Make output threading instructions
std::vector<ThreadingInstruction> output_instructions;
output_instructions.reserve(instruction_converters.size());
for (InstructionConverter converter : instruction_converters) {
for (auto& converter : instruction_converters) {
output_instructions.push_back(converter.parse_converted_instructions());
}

return ThreadingInstructions(output_instructions, physical_positions);
}

ThreadingInstructions run_consistency(ThreadingInstructions& instructions, const std::vector<double>& allele_ages) {
GenotypeIterator gt_it(instructions);
ConsistencyWrapper cw(instructions, allele_ages);
while (gt_it.has_next_genotype()) {
auto g = gt_it.next_genotype();
// next_genotype returns const ref; process_site takes non-const ref
std::vector<int> genotypes(g.begin(), g.end());
cw.process_site(genotypes);
}
return cw.get_consistent_instructions();
}

4 changes: 4 additions & 0 deletions src/DataConsistency.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ class ConsistencyWrapper {
std::vector<InstructionConverter> instruction_converters;
};

// Bulk function: creates GenotypeIterator + ConsistencyWrapper internally,
// processes all sites in C++, returns consistent instructions.
ThreadingInstructions run_consistency(ThreadingInstructions& instructions, const std::vector<double>& allele_ages);

#endif // THREADS_ARG_DATA_CONSISTENCY_HPP
2 changes: 1 addition & 1 deletion src/Demography.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ double Demography::expected_branch_length(const int N) {

std::ostream& operator<<(std::ostream& os, const Demography& d) {
for (std::size_t i = 0; i < d.sizes.size(); i++) {
std::cout << d.times[i] << " " << d.sizes[i] << " " << d.std_times[i] << std::endl;
os << d.times[i] << " " << d.sizes[i] << " " << d.std_times[i] << "\n";
}
return os;
}
Loading
Loading