Skip to content

Commit a1c92a5

Browse files
shoyerXarray-Beam authors
authored andcommitted
Update xarray_beam._src.rechunker_algorithm to Google style
PiperOrigin-RevId: 810063295
1 parent a9465c8 commit a1c92a5

1 file changed

Lines changed: 45 additions & 49 deletions

File tree

xarray_beam/_src/rechunker_algorithm.py

Lines changed: 45 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
"""
2727
from collections.abc import Sequence
2828
import logging
29-
from math import ceil, floor, lcm, prod
29+
import math
3030
import warnings
3131

3232
import numpy as np
3333

3434
logger = logging.getLogger(__name__)
3535

36+
# pylint: disable=g-doc-return-or-yield
37+
3638

3739
def consolidate_chunks(
3840
shape: Sequence[int],
@@ -45,22 +47,18 @@ def consolidate_chunks(
4547
4648
Consolidation starts on the highest axis and proceeds towards axis 0.
4749
48-
Parameters
49-
----------
50-
shape : Tuple
51-
Array shape
52-
chunks : Tuple
53-
Original chunk shape (must be in form (5, 10, 20), no irregular chunks)
54-
max_mem : Int
55-
Maximum permissible chunk memory size, measured in units of itemsize
56-
chunk_limits : Tuple, optional
57-
Maximum size of each chunk along each axis. If None, don't consolidate
58-
axis. If -1, no limit.
59-
60-
Returns
61-
-------
62-
new_chunks : tuple
63-
The new chunks, size guaranteed to be <= mam_mem
50+
Args:
51+
shape: Array shape.
52+
chunks: Original chunk shape (must be in form (5, 10, 20), no irregular
53+
chunks).
54+
itemsize: Number of bytes per array element.
55+
max_mem: Maximum permissible chunk memory size, measured in units of
56+
itemsize.
57+
chunk_limits: Maximum size of each chunk along each axis. If None, don't
58+
consolidate axis. If -1, no limit.
59+
60+
Returns:
61+
The new chunks, size guaranteed to be <= max_mem.
6462
"""
6563

6664
ndim = len(shape)
@@ -82,7 +80,7 @@ def consolidate_chunks(
8280
else:
8381
raise ValueError(f"Invalid chunk_limits {chunk_limits}.")
8482

85-
chunk_mem = itemsize * prod(chunks)
83+
chunk_mem = itemsize * math.prod(chunks)
8684
if chunk_mem > max_mem:
8785
raise ValueError(f"chunk_mem {chunk_mem} > max_mem {max_mem}")
8886
headroom = max_mem / chunk_mem
@@ -95,7 +93,7 @@ def consolidate_chunks(
9593
upper_bound = min(shape[n_axis], chunk_limit_per_axis[n_axis])
9694
# try to just increase the chunk to the upper bound
9795
new_chunks[n_axis] = upper_bound
98-
chunk_mem = itemsize * prod(new_chunks)
96+
chunk_mem = itemsize * math.prod(new_chunks)
9997
upper_bound_headroom = max_mem / chunk_mem
10098
if upper_bound_headroom > 1:
10199
# ok it worked
@@ -105,9 +103,10 @@ def consolidate_chunks(
105103
# nope, that was too much
106104
# instead increase it by an integer multiple
107105
larger_chunk = int(chunks[n_axis] * int(headroom))
108-
# not sure the min check is needed any more; it safeguards against making it too big
106+
# not sure the min check is needed any more; it safeguards against making
107+
# it too big
109108
new_chunks[n_axis] = min(larger_chunk, upper_bound)
110-
chunk_mem = itemsize * prod(new_chunks)
109+
chunk_mem = itemsize * math.prod(new_chunks)
111110
headroom = max_mem / chunk_mem
112111

113112
logger.debug(f" axis {n_axis}, {chunks[n_axis]} -> {new_chunks[n_axis]}")
@@ -165,7 +164,7 @@ def calculate_stage_chunks(
165164
cannot be stored in Zarr arrays.
166165
"""
167166
approx_stages = np.geomspace(read_chunks, write_chunks, num=stage_count + 1)
168-
return [tuple(floor(c) for c in stage) for stage in approx_stages[1:-1]]
167+
return [tuple(math.floor(c) for c in stage) for stage in approx_stages[1:-1]]
169168

170169

171170
def _count_intermediate_chunks(
@@ -191,12 +190,14 @@ def _count_intermediate_chunks(
191190
>>> _count_intermediate_chunks(5, 7, 20)
192191
6
193192
"""
194-
multiple = lcm(source_chunk, target_chunk)
193+
multiple = math.lcm(source_chunk, target_chunk)
195194
splits_per_lcm = multiple // source_chunk + multiple // target_chunk - 1
196195
lcm_count, remainder = divmod(size, multiple)
197196
if remainder:
198197
splits_in_remainder = (
199-
ceil(remainder / source_chunk) + ceil(remainder / target_chunk) - 1
198+
math.ceil(remainder / source_chunk)
199+
+ math.ceil(remainder / target_chunk)
200+
- 1
200201
)
201202
else:
202203
splits_in_remainder = 0
@@ -207,7 +208,9 @@ def calculate_single_stage_io_ops(
207208
shape: Sequence[int], in_chunks: Sequence[int], out_chunks: Sequence[int]
208209
) -> int:
209210
"""Count the number of read/write operations required for rechunking."""
210-
return prod(map(_count_intermediate_chunks, in_chunks, out_chunks, shape))
211+
return math.prod(
212+
map(_count_intermediate_chunks, in_chunks, out_chunks, shape)
213+
)
211214

212215

213216
# not a tight upper bound, but ensures that the loop in
@@ -244,8 +247,8 @@ def multistage_rechunking_plan(
244247
if len(target_chunks) != ndim:
245248
raise ValueError(f"target_chunks {target_chunks} must have length {ndim}")
246249

247-
source_chunk_mem = itemsize * prod(source_chunks)
248-
target_chunk_mem = itemsize * prod(target_chunks)
250+
source_chunk_mem = itemsize * math.prod(source_chunks)
251+
target_chunk_mem = itemsize * math.prod(target_chunks)
249252

250253
if source_chunk_mem > max_mem:
251254
raise ValueError(
@@ -309,7 +312,7 @@ def multistage_rechunking_plan(
309312
]
310313
plan = list(zip(pre_chunks, int_chunks, post_chunks))
311314

312-
int_mem = min(itemsize * prod(chunks) for chunks in int_chunks)
315+
int_mem = min(itemsize * math.prod(chunks) for chunks in int_chunks)
313316
if int_mem >= min_mem:
314317
return plan # success!
315318

@@ -357,27 +360,20 @@ def rechunking_plan(
357360
) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...]]:
358361
"""Calculate a plan for rechunking arrays.
359362
360-
Parameters
361-
----------
362-
shape : Tuple
363-
Array shape
364-
source_chunks : Tuple
365-
Original chunk shape (must be in form (5, 10, 20), no irregular chunks)
366-
target_chunks : Tuple
367-
Target chunk shape (must be in form (5, 10, 20), no irregular chunks)
368-
itemsize: int
369-
Number of bytes used to represent a single array element
370-
max_mem : int
371-
Maximum permissible chunk memory size, measured in units of itemsize
372-
consolidate_reads: bool, optional
373-
Whether to apply read chunk consolidation
374-
consolidate_writes: bool, optional
375-
Whether to apply write chunk consolidation
376-
377-
Returns
378-
-------
379-
new_chunks : tuple
380-
The new chunks, size guaranteed to be <= mam_mem
363+
Args:
364+
shape: Array shape.
365+
source_chunks: Original chunk shape (must be in form (5, 10, 20), no
366+
irregular chunks).
367+
target_chunks: Target chunk shape (must be in form (5, 10, 20), no
368+
irregular chunks).
369+
itemsize: Number of bytes used to represent a single array element.
370+
max_mem: Maximum permissible chunk memory size, measured in units of
371+
itemsize.
372+
consolidate_reads: Whether to apply read chunk consolidation.
373+
consolidate_writes: Whether to apply write chunk consolidation.
374+
375+
Returns:
376+
The new chunks, size guaranteed to be <= max_mem.
381377
"""
382378
(stage,) = multistage_rechunking_plan(
383379
shape,

0 commit comments

Comments
 (0)