Skip to content

Commit 75c07f7

Browse files
authored
Merge pull request #48 from jksr/master
ballc format support
2 parents d899f95 + 35199d8 commit 75c07f7

4 files changed

Lines changed: 107 additions & 18 deletions

File tree

ALLCools/_allc_to_region_count.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ def _map_to_sparse_chrom_bin(site_bed, out_bed, chrom_size_path, bin_size=500):
135135
)
136136
def allc_to_region_count(
137137
allc_path: str,
138+
cmeta_path: str,
138139
output_prefix: str,
139140
chrom_size_path: str,
140141
mc_contexts: List[str],
@@ -215,6 +216,7 @@ def allc_to_region_count(
215216
strandness = "split" if split_strand else "both"
216217
output_paths_dict = extract_allc(
217218
allc_path=allc_path,
219+
cmeta_path=cmeta_path,
218220
output_prefix=output_prefix,
219221
mc_contexts=mc_contexts,
220222
strandness=strandness,
@@ -332,10 +334,11 @@ def batch_allc_to_region_count(
332334

333335
with ProcessPoolExecutor(cpu) as executor:
334336
futures = {}
335-
for cell_id, allc_path in allc_series.iteritems():
337+
for cell_id, (allc_path, cmeta_path) in allc_series.iterrows():
336338
future = executor.submit(
337339
allc_to_region_count,
338340
allc_path=allc_path,
341+
cmeta_path=cmeta_path,
339342
output_prefix=str(output_dir / cell_id),
340343
chrom_size_path=chrom_size_path,
341344
mc_contexts=mc_contexts,

ALLCools/_extract_allc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ def _extract_allc_parallel(
219219
)
220220
def extract_allc(
221221
allc_path: str,
222+
cmeta_path: str,
222223
output_prefix: str,
223224
mc_contexts: Union[str, list],
224225
chrom_size_path: str,
@@ -346,7 +347,7 @@ def extract_allc(
346347

347348
# split file first
348349
# strandness function
349-
with open_allc(allc_path, region=region) as allc:
350+
with open_allc(allc_path, region=region, cmeta_path=cmeta_path) as allc:
350351
if strandness == "Split":
351352
for line in allc:
352353
cur_line = line.split("\t")

ALLCools/_open.py

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,80 @@ def _raise_if_error(self):
206206
"""
207207
Raise an exception if the gzip process has exited with an error.
208208
209-
Raise IOError if process is not running anymore and the
209+
Raise OSError if process is not running anymore and the
210+
exit code is nonzero.
211+
"""
212+
return_code = self.process.poll()
213+
if return_code is not None and return_code != 0:
214+
message = self._stderr.read().strip()
215+
raise OSError(message)
216+
217+
def read(self, *args):
218+
data = self._file.read(*args)
219+
if len(args) == 0 or args[0] <= 0:
220+
# wait for process to terminate until we check the exit code
221+
self.process.wait()
222+
self._raise_if_error()
223+
return data
224+
225+
226+
class PipedBAllCReader(Closing):
227+
def __init__(self, path, cmeta_path=None, region=None, mode="r"):
228+
if mode not in ("r"):
229+
raise ValueError(f"Mode can only be 'r' for ballc file")
230+
if cmeta_path is None:
231+
raise NotImplementedError
232+
cmeta_path = pathlib.Path(cmeta_path).resolve()
233+
if not cmeta_path.exists():
234+
raise FileNotFoundError(f"{cmeta_path} does not exist.")
235+
cmeta_path = str(cmeta_path)
236+
237+
if region is None:
238+
self.process = Popen(
239+
["ballcools", "query", path, '"*"', "-c", cmeta_path],
240+
# stdout=PIPE, stderr=PIPE, encoding="utf8")
241+
stdout=PIPE,
242+
stderr=PIPE,
243+
encoding=None,
244+
)
245+
else:
246+
self.process = Popen(
247+
["ballcools", "query", path] + region.split(" ") + ["-c", cmeta_path],
248+
stdout=PIPE,
249+
stderr=PIPE,
250+
# encoding="utf8",)
251+
encoding=None,
252+
)
253+
254+
self.name = path
255+
self._file = self.process.stdout
256+
self._stderr = self.process.stderr
257+
self.closed = False
258+
# Give ballcools a little bit of time to report any errors
259+
# (such as a non-existing file)
260+
time.sleep(0.01)
261+
self._raise_if_error()
262+
263+
def close(self):
264+
self.closed = True
265+
return_code = self.process.poll()
266+
if return_code is None:
267+
# still running
268+
self.process.terminate()
269+
self._raise_if_error()
270+
271+
def __iter__(self):
272+
for line in self._file:
273+
yield line.decode("utf-8")
274+
self.process.wait()
275+
self._raise_if_error()
276+
277+
def readline(self):
278+
return self._file.readline().decode("utf-8")
279+
280+
def _raise_if_error(self):
281+
"""
282+
Raise OSError if process is not running anymore and the
210283
exit code is nonzero.
211284
"""
212285
return_code = self.process.poll()
@@ -275,7 +348,7 @@ def readline(self):
275348
return self.file.readline()
276349

277350
def _raise_if_error(self):
278-
"""Raise IOError if process is not running anymore and the exit code is nonzero."""
351+
"""Raise OSError if process is not running anymore and the exit code is nonzero."""
279352
return_code = self.process.poll()
280353
if return_code is not None and return_code != 0:
281354
message = self._stderr.read().strip()
@@ -382,7 +455,11 @@ def open_gz(file_path, mode="r", compresslevel=3, threads=1, region=None):
382455
return gzip.open(file_path, mode, compresslevel=compresslevel)
383456

384457

385-
def open_allc(file_path, mode="r", compresslevel=3, threads=1, region=None):
458+
def open_ballc(file_path, mode="r", compresslevel=None, threads=None, region=None, cmeta_path=None):
459+
return PipedBAllCReader(file_path, cmeta_path=cmeta_path, region=region, mode="r")
460+
461+
462+
def open_allc(file_path, mode="r", compresslevel=3, threads=1, region=None, cmeta_path=None):
386463
"""
387464
Open a .allc file.
388465
@@ -422,7 +499,7 @@ def open_allc(file_path, mode="r", compresslevel=3, threads=1, region=None):
422499
raise ValueError(f"mode '{mode}' not supported")
423500
if compresslevel not in range(1, 10):
424501
raise ValueError("compresslevel must be between 1 and 9")
425-
if region is not None:
502+
if (region is not None) and (not file_path.endswith(".ballc")):
426503
# unzipped file
427504
if not file_path.endswith("gz"):
428505
raise ValueError(f"File must be compressed by bgzip to use region query. File path {file_path}")
@@ -435,8 +512,10 @@ def open_allc(file_path, mode="r", compresslevel=3, threads=1, region=None):
435512
if not os.path.exists(file_path + ".tbi"):
436513
raise FileNotFoundError("region query provided, but .tbi index not found")
437514

438-
if file_path.endswith("gz"):
515+
if file_path.endswith(".gz"):
439516
return open_gz(file_path, mode, compresslevel, threads, region=region)
517+
elif file_path.endswith(".ballc"):
518+
return open_ballc(file_path, region=region, cmeta_path=cmeta_path)
440519
else:
441520
return open(file_path, mode)
442521

ALLCools/count_matrix/mcds.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -331,11 +331,17 @@ def generate_mcds(
331331
dtype = parse_dtype(dtype)
332332

333333
if isinstance(allc_table, str):
334-
allc_series = pd.read_csv(allc_table, header=None, index_col=0, squeeze=True, sep="\t")
335-
if not isinstance(allc_series, pd.Series):
336-
raise ValueError("allc_table malformed, should only have 2 columns, 1. file_uid, 2. file_path")
334+
allc_series = pd.read_csv(allc_table, header=None, index_col=0, sep="\t")
335+
if allc_series.shape[1] == 1:
336+
allc_series[2] = ""
337+
elif allc_series.shape[1] == 2:
338+
pass
339+
else:
340+
raise ValueError(
341+
"allc_table malformed, should have 2 or 3 columns, 1. file_uid, 2. file_path, 3. cmeta_path if ballc format"
342+
)
337343
else:
338-
allc_series = allc_table.dropna()
344+
allc_series = allc_table[~allc_table[1].isna()]
339345
if allc_series.index.duplicated().sum() != 0:
340346
raise ValueError("allc_table file uid have duplicates (1st column)")
341347

@@ -363,7 +369,7 @@ def generate_mcds(
363369
else:
364370
if n_matched < rna_series.size:
365371
print(f"{rna_series.size - n_matched} file_uid in RNA table do not exist in ALLC table")
366-
if n_matched < allc_series.size:
372+
if n_matched < allc_series.shape[0]:
367373
print(f"{allc_series.size - n_matched} file_uid in ALLC table do not exist in RNA table")
368374

369375
# reindex allc and rna series, this may create NaN, will be dropped
@@ -372,19 +378,19 @@ def generate_mcds(
372378
rna_series = rna_series.reindex(union_index)
373379

374380
# if allc files exceed max_per_mcds, save them into chunks
375-
if allc_series.size > max_per_mcds:
376-
mcds_n_chunk = ceil(allc_series.size / max_per_mcds)
377-
chunk_size = ceil(allc_series.size / mcds_n_chunk)
381+
if allc_series.shape[0] > max_per_mcds:
382+
mcds_n_chunk = ceil(allc_series.shape[0] / max_per_mcds)
383+
chunk_size = ceil(allc_series.shape[0] / mcds_n_chunk)
378384
allc_series_chunks = [
379-
allc_series[chunk_start : chunk_start + chunk_size]
380-
for chunk_start in range(0, allc_series.size, chunk_size)
385+
allc_series.iloc[chunk_start : chunk_start + chunk_size]
386+
for chunk_start in range(0, allc_series.shape[0], chunk_size)
381387
]
382388
if rna_table is not None:
383389
rna_series_chunks = [
384390
rna_series[chunk_start : chunk_start + chunk_size]
385391
for chunk_start in range(0, rna_series.size, chunk_size)
386392
]
387-
print(f"Number of file_uids {allc_series.size} > max_per_mcds {max_per_mcds}, ")
393+
print(f"Number of file_uids {allc_series.shape[0]} > max_per_mcds {max_per_mcds}, ")
388394
print(f"will generate MCDS in {len(allc_series_chunks)} chunks.")
389395

390396
# mcds chunks execute sequentially

0 commit comments

Comments
 (0)