Skip to content

Commit 0d2f958

Browse files
shoyerXarray-Beam authors
authored andcommitted
zarr_chunks_per_shard
PiperOrigin-RevId: 813445392
1 parent 9257375 commit 0d2f958

3 files changed

Lines changed: 129 additions & 3 deletions

File tree

docs/high-level.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@
166166
},
167167
"cell_type": "markdown",
168168
"source": [
169-
"Weather/climate datasets are typically generated and stored in pancake chunks, but pencil chunks are more useful for most analytics queries, which requires large histories of weather at a single location. Intermediate \"compromise\" chunks can sometimes be a good idea, although if performance and flexibility are critical it may be worth storing multiple copies of your data in different formats.\n",
169+
"Weather/climate datasets are typically generated and stored in pancake chunks, but pencil chunks are more useful for most analytics queries, which requires large histories of weather at a single location. Intermediate \"compromise\" chunks can sometimes be a good idea, although if performance and flexibility are critical it may be worth storing multiple copies of your data in different formats. Using Zarr v3's sharding feature to group smaller chunks into shards can also help mitigate the challenges of picking an optimal chunk size.\n",
170170
"\n",
171171
"Using the right chunks is *absolutely essentially* for efficient operations with Xarray-Beam and Zarr. For example, reading data from a single location across all times (a \"pencil\" query) is extremely inefficient for a dataset stored in \"pancake\" chunks -- it would require loading the entire dataset from disk!\n",
172172
"\n",

xarray_beam/_src/dataset.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,12 +257,70 @@ def _check_shards_or_chunks(
257257
def to_zarr(
258258
self,
259259
path: str,
260+
*,
261+
zarr_chunks_per_shard: Mapping[str, int] | None = None,
260262
zarr_chunks: Mapping[str, int] | None = None,
261263
zarr_shards: Mapping[str, int] | None = None,
262264
zarr_format: int | None = None,
263265
) -> beam.PTransform:
264-
"""Write to a Zarr file."""
265-
if zarr_chunks is None:
266+
"""Write this dataset to a Zarr file.
267+
268+
The extensive options for controlling chunking and sharding are intended for
269+
power users:
270+
271+
* If you are happy with the existing chunk sizes of your dataset
272+
and just want to write it to disk, you can omit all of them.
273+
* Consider specifying only ``zarr_chunks_per_shard`` to allow for more
274+
flexible efficient reading of data from disk. This allows for dividing
275+
dataset chunks into much smaller Zarr chunks on disk, with each chunk
276+
stored in a single Zarr shard.
277+
278+
Args:
279+
path: path to write to.
280+
zarr_chunks_per_shard: If provided, write this dataset into Zarr shards,
281+
each with at most this many Zarr chunks per shard (requires Zarr v3).
282+
zarr_chunks: Explicit chunk sizes to use for storing data in Zarr, as an
283+
alternative to specifying ``zarr_chunks_per_shard``. Zarr chunk sizes
284+
must evenly divide the existing chunk sizes of this dataset.
285+
zarr_shards: Explicit shards to use for storing data in Zarr, which must
286+
evenly divide the existing chunk sizes of this dataset, and be even
287+
multiples of chunk sizes. Requires Zarr v3. By default, Zarr sharding is
288+
not used unless ``zarr_chunks_per_shard`` is provided, in which case
289+
Zarr shards default to the chunk sizes of this dataset.
290+
zarr_format: optional integer specifying the explicit Zarr format to use.
291+
Defaults to Zarr v3 if using shards, or the default format for your
292+
installed version of Zarr.
293+
294+
Returns:
295+
Beam PTransform that writes the dataset to a Zarr file.
296+
"""
297+
if zarr_chunks_per_shard is not None:
298+
if zarr_chunks is not None:
299+
raise ValueError(
300+
'cannot supply both zarr_chunks_per_shard and zarr_chunks'
301+
)
302+
if zarr_shards is None:
303+
zarr_shards = {}
304+
zarr_shards = {**self.chunks, **zarr_shards}
305+
zarr_chunks = {}
306+
for dim, existing_chunk_size in zarr_shards.items():
307+
multiple = zarr_chunks_per_shard.get(dim)
308+
if multiple is None:
309+
raise ValueError(
310+
f'cannot write a dataset with chunks {self.chunks} to Zarr with '
311+
f'{zarr_chunks_per_shard=}, which does not contain a value for '
312+
f'dimension {dim!r}'
313+
)
314+
zarr_chunks[dim], remainder = divmod(
315+
existing_chunk_size, multiple
316+
)
317+
if remainder != 0:
318+
raise ValueError(
319+
f'cannot write a dataset with chunks {self.chunks} to Zarr with '
320+
f'{zarr_chunks_per_shard=}, which do not evenly divide into '
321+
'chunks'
322+
)
323+
elif zarr_chunks is None:
266324
if zarr_shards is not None:
267325
raise ValueError('cannot supply zarr_shards without zarr_chunks')
268326
zarr_chunks = {}
@@ -274,6 +332,9 @@ def to_zarr(
274332
else:
275333
self._check_shards_or_chunks(zarr_chunks, 'chunks')
276334

335+
if zarr_shards is not None and zarr_format is None:
336+
zarr_format = 3 # required for shards
337+
277338
return self.ptransform | _get_label('to_zarr') >> zarr.ChunksToZarr(
278339
path,
279340
self.template,

xarray_beam/_src/dataset_test.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,71 @@ def test_to_zarr_shards(self):
177177
temp_dir, zarr_chunks={'x': 3}, zarr_shards={'x': 9}, zarr_format=3
178178
)
179179

180+
def test_to_zarr_chunks_per_shard(self):
181+
temp_dir = self.create_tempdir().full_path
182+
ds = xarray.Dataset({'foo': ('x', np.arange(12))})
183+
beam_ds = xbeam.Dataset.from_xarray(ds, {'x': 6})
184+
185+
with self.subTest('simple'):
186+
with beam.Pipeline() as p:
187+
p |= beam_ds.to_zarr(
188+
temp_dir, zarr_chunks_per_shard={'x': 2}
189+
)
190+
opened, chunks = xbeam.open_zarr(temp_dir)
191+
xarray.testing.assert_identical(ds, opened)
192+
self.assertEqual(chunks, {'x': 3})
193+
self.assertEqual(opened['foo'].encoding['chunks'], (3,))
194+
self.assertEqual(opened['foo'].encoding['shards'], (6,))
195+
196+
with self.subTest('explicit_shards'):
197+
temp_dir = self.create_tempdir().full_path
198+
ds = xarray.Dataset({'foo': ('x', np.arange(24))})
199+
beam_ds = xbeam.Dataset.from_xarray(ds, {'x': 12})
200+
with beam.Pipeline() as p:
201+
p |= beam_ds.to_zarr(
202+
temp_dir,
203+
zarr_chunks_per_shard={'x': 2},
204+
zarr_shards={'x': 6},
205+
)
206+
opened, chunks = xbeam.open_zarr(temp_dir)
207+
xarray.testing.assert_identical(ds, opened)
208+
self.assertEqual(chunks, {'x': 3})
209+
self.assertEqual(opened['foo'].encoding['chunks'], (3,))
210+
self.assertEqual(opened['foo'].encoding['shards'], (6,))
211+
212+
with self.subTest('chunks_and_chunks_per_shard_error'):
213+
ds = xarray.Dataset({'foo': ('x', np.arange(12))})
214+
beam_ds = xbeam.Dataset.from_xarray(ds, {'x': 6})
215+
with self.assertRaisesWithLiteralMatch(
216+
ValueError,
217+
'cannot supply both zarr_chunks_per_shard and zarr_chunks',
218+
):
219+
beam_ds.to_zarr(
220+
temp_dir, zarr_chunks_per_shard={'x': 2}, zarr_chunks={'x': 3}
221+
)
222+
223+
with self.subTest('missing_dim_error'):
224+
ds = xarray.Dataset({'foo': ('x', np.arange(12))})
225+
beam_ds = xbeam.Dataset.from_xarray(ds, {'x': 6})
226+
with self.assertRaisesWithLiteralMatch(
227+
ValueError,
228+
"cannot write a dataset with chunks {'x': 6} to Zarr with "
229+
"zarr_chunks_per_shard={'y': 2}, which does not contain a value for "
230+
"dimension 'x'",
231+
):
232+
beam_ds.to_zarr(temp_dir, zarr_chunks_per_shard={'y': 2})
233+
234+
with self.subTest('uneven_division_error'):
235+
ds = xarray.Dataset({'foo': ('x', np.arange(12))})
236+
beam_ds = xbeam.Dataset.from_xarray(ds, {'x': 6})
237+
with self.assertRaisesWithLiteralMatch(
238+
ValueError,
239+
"cannot write a dataset with chunks {'x': 6} to Zarr with "
240+
"zarr_chunks_per_shard={'x': 5}, which do not evenly divide into "
241+
'chunks',
242+
):
243+
beam_ds.to_zarr(temp_dir, zarr_chunks_per_shard={'x': 5})
244+
180245
def test_to_zarr_default_chunks(self):
181246
temp_dir = self.create_tempdir().full_path
182247
ds = xarray.Dataset({'foo': (('x', 'y'), np.arange(20).reshape(10, 2))})

0 commit comments

Comments
 (0)