From f45c6d7b9386583021dbaffb19270e594747d540 Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Wed, 20 May 2026 20:06:08 -0500 Subject: [PATCH 1/2] docs: dedicated how-to page for staged insert `staged_insert1` was buried in `use-object-storage.md` (subsection "Write Directly to Object Storage") with no nav entry, so readers scanning the How-To sidebar couldn't find it and `insert-data.md` didn't link to it. Promote it to its own how-to: - New `src/how-to/staged-insert.md` covering overview, atomicity guarantees, full API (`rec`, `store`, `open`, `fs`), Zarr/HDF5/ streaming patterns, limitations, and troubleshooting. - Add nav entry under Object Storage in `mkdocs.yaml`. - Replace the section in `use-object-storage.md` with a short pointer and link to the new page. - Cross-link from `insert-data.md` (inline after "Insert with Blobs" and in See Also). --- mkdocs.yaml | 1 + src/how-to/insert-data.md | 3 + src/how-to/staged-insert.md | 190 +++++++++++++++++++++++++++++++ src/how-to/use-object-storage.md | 43 +------ 4 files changed, 198 insertions(+), 39 deletions(-) create mode 100644 src/how-to/staged-insert.md diff --git a/mkdocs.yaml b/mkdocs.yaml index 89e7ca9c..7428be7e 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -80,6 +80,7 @@ nav: - Overview: how-to/object-storage-overview.md - Choose Storage Type: how-to/choose-storage-type.md - Use Object Storage: how-to/use-object-storage.md + - Staged Insert: how-to/staged-insert.md - Use NPY Codec: how-to/use-npy-codec.md - Use Plugin Codecs: how-to/use-plugin-codecs.md - Create Custom Codecs: how-to/create-custom-codec.md diff --git a/src/how-to/insert-data.md b/src/how-to/insert-data.md index dd68ed97..e46da3e2 100644 --- a/src/how-to/insert-data.md +++ b/src/how-to/insert-data.md @@ -108,6 +108,8 @@ ImageData.insert1({ }) ``` +For multi-GB arrays, Zarr stores, HDF5 files, or any object too large to hold in memory before insert, use [Staged Insert](staged-insert.md) instead — it writes directly to object storage and commits the row atomically. + ## Insert Options Summary | Option | Default | Description | @@ -146,6 +148,7 @@ if Subject.validate(rows): ## See Also +- [Staged Insert](staged-insert.md) — Atomic insert for large objects written directly to object storage - [Master-Part Tables](master-part.ipynb) — Atomic insertion of master and parts - [Define Tables](define-tables.md) — Table definition syntax - [Delete Data](delete-data.md) — Removing data from tables diff --git a/src/how-to/staged-insert.md b/src/how-to/staged-insert.md new file mode 100644 index 00000000..5e916e75 --- /dev/null +++ b/src/how-to/staged-insert.md @@ -0,0 +1,190 @@ +# Staged Insert + +Write large objects directly to object storage as part of an atomic insert. + +## Overview + +`staged_insert1` is a context manager for inserting rows whose object-typed fields are too large to copy through local storage first. It writes directly to the destination object store while the row is being built, then finalizes the database insert when the block exits cleanly. If an exception is raised inside the block, the staged objects are cleaned up and no row is inserted. + +This pattern is the right choice when: + +- Objects are large (multi-GB arrays, long recordings, image stacks) +- You want to stream or write in chunks rather than buffer in memory +- You want all-or-nothing semantics across object storage and the database + +It is only available for object-typed fields (`<...@>` syntax) and codecs that support direct storage handles — primarily `` (Zarr / HDF5 / multi-file) and `` written via a file handle. For ordinary inserts of small or in-memory objects, use [`insert` / `insert1`](insert-data.md). + +## Quick Start + +```python +import zarr +import datajoint as dj + +schema = dj.Schema('imaging') + +@schema +class ImagingSession(dj.Manual): + definition = """ + subject_id : int32 + session_id : int32 + --- + n_frames : int32 + frame_rate : float32 + frames : + """ + +with ImagingSession.staged_insert1 as staged: + # 1. Set primary key values first + staged.rec['subject_id'] = 1 + staged.rec['session_id'] = 1 + + # 2. Get a storage handle for the object field + store = staged.store('frames', '.zarr') + + # 3. Write directly to object storage (no local copy) + z = zarr.open(store, mode='w', shape=(1000, 512, 512), + chunks=(10, 512, 512), dtype='int32') + for i in range(1000): + z[i] = acquire_frame() + + # 4. Set remaining record attributes + staged.rec['n_frames'] = 1000 + staged.rec['frame_rate'] = 30.0 + +# On clean exit: metadata is computed, row is inserted +# On exception: staged objects are removed, no row is inserted +``` + +## How It Works + +Inside the `with` block, the row is a draft — `staged.rec` collects attribute values, and `staged.store(field, ext)` / `staged.open(field, ext)` return handles that write directly to the destination object store. + +When the block exits without an exception, DataJoint: + +1. Computes object metadata (size, manifest, content hash for hash-addressed codecs) from the staged objects. +2. Inserts the row into the database with the populated metadata. + +When the block raises, DataJoint: + +1. Removes any objects that were written inside the block. +2. Skips the database insert. + +This gives the same atomicity guarantee as an ordinary `insert1` — readers never see a row whose object data is partial. + +## API Reference + +### `Table.staged_insert1` + +```python +with Table.staged_insert1 as staged: + ... +``` + +Context manager property on every `dj.Table` subclass. Yields a `StagedInsert` object scoped to one row. + +### `staged.rec` + +A dict for the row's attribute values. Set primary key fields **before** calling `staged.store()` or `staged.open()` — the storage path is derived from the primary key. + +```python +staged.rec['subject_id'] = 1 +staged.rec['session_id'] = 1 +``` + +### `staged.store(field, ext='')` + +Returns an `fsspec.FSMap` for an object field. Suitable for Zarr, xarray, or any library that takes a mapping-style store. + +```python +store = staged.store('frames', '.zarr') +z = zarr.open(store, mode='w', shape=..., dtype=...) +``` + +### `staged.open(field, ext='', mode='wb')` + +Returns a file-like object for an object field. Suitable for HDF5, raw binary, or any library that takes a file handle. + +```python +with staged.open('recording', '.h5') as f: + h5py.File(f, mode='w').create_dataset('data', data=arr) +``` + +### `staged.fs` + +The underlying `fsspec.AbstractFileSystem` for advanced operations (listing, deleting, custom paths). Most users won't need this. + +## Patterns + +### Zarr arrays + +```python +with Recording.staged_insert1 as staged: + staged.rec['recording_id'] = recording_id + z = zarr.open(staged.store('frames', '.zarr'), mode='w', + shape=(n_frames, h, w), chunks=(1, h, w), dtype='uint16') + for i, frame in enumerate(stream): + z[i] = frame +``` + +### HDF5 files + +```python +import h5py + +with Recording.staged_insert1 as staged: + staged.rec['recording_id'] = recording_id + with staged.open('raw', '.h5') as f: + with h5py.File(f, 'w') as h5: + h5.create_dataset('signal', data=signal, chunks=True) + h5.attrs['fs'] = sampling_rate +``` + +### Streaming from an instrument + +Set the primary key, get the handle, then write as data arrives. The block exits — and the row commits — only after the stream is fully captured: + +```python +with ImagingSession.staged_insert1 as staged: + staged.rec['subject_id'] = subject_id + staged.rec['session_id'] = session_id + + z = zarr.open(staged.store('frames', '.zarr'), mode='w', ...) + for i in range(n_frames): + z[i] = camera.grab() + + staged.rec['n_frames'] = n_frames +``` + +If the camera errors out mid-stream, the partial Zarr is removed and the row is not inserted. + +## Error Handling and Atomicity + +A `staged_insert1` block is atomic across object storage and the database: + +- **Object storage**: anything written via `staged.store()` / `staged.open()` is staged under a path derived from the primary key. On exception inside the block, those staged objects are removed. +- **Database**: the row is only inserted on clean exit. + +If the database insert itself fails on exit (e.g., duplicate primary key), the staged objects are also removed. + +## Limitations + +- Only one row per block — use a loop of `with` blocks for many rows, or use the standard `insert` for batches that fit in memory. +- The block must set all primary key fields before calling `store()` or `open()`. +- Requires `stores.default` configured, or a named store referenced by the field's type spec. + +## Troubleshooting + +### `Storage is not configured` + +Set `stores.default` and `stores.` in `datajoint.json` or via `dj.config`. See [Configure Object Storage](configure-storage.md). + +### `Primary key not set` when calling `staged.store()` + +Set primary key attributes on `staged.rec` before calling `staged.store()` or `staged.open()`. The object path depends on the primary key. + +## See Also + +- [Insert Data](insert-data.md) — Standard insert for ordinary rows +- [Use Object Storage](use-object-storage.md) — Object-augmented schemas and storage types +- [Configure Object Storage](configure-storage.md) — Store configuration +- [Use the `` Codec](use-npy-codec.md) — NumPy array storage with lazy fetch diff --git a/src/how-to/use-object-storage.md b/src/how-to/use-object-storage.md index af714209..d3753462 100644 --- a/src/how-to/use-object-storage.md +++ b/src/how-to/use-object-storage.md @@ -157,52 +157,17 @@ Use schema-addressed storage for: ## Write Directly to Object Storage -For large datasets like multi-GB imaging recordings, avoid intermediate copies by writing directly to object storage with `staged_insert1`: +For multi-GB imaging recordings, Zarr arrays, HDF5 files, or any object too large to round-trip through local storage, use [Staged Insert](staged-insert.md). It writes directly to the destination object store inside a context manager and commits the database row atomically on clean exit: ```python -import zarr - -@schema -class ImagingSession(dj.Manual): - definition = """ - subject_id : int32 - session_id : int32 - --- - n_frames : int32 - frame_rate : float32 - frames : - """ - -# Write Zarr directly to object storage with ImagingSession.staged_insert1 as staged: - # 1. Set primary key values first staged.rec['subject_id'] = 1 staged.rec['session_id'] = 1 - - # 2. Get storage handle - store = staged.store('frames', '.zarr') - - # 3. Write directly (no local copy) - z = zarr.open(store, mode='w', shape=(1000, 512, 512), - chunks=(10, 512, 512), dtype='int32') - for i in range(1000): - z[i] = acquire_frame() # Write frame-by-frame - - # 4. Set remaining attributes - staged.rec['n_frames'] = 1000 - staged.rec['frame_rate'] = 30.0 - -# Record inserted with computed metadata on successful exit + z = zarr.open(staged.store('frames', '.zarr'), mode='w', ...) + ... ``` -The `staged_insert1` context manager: - -- Writes directly to the object store (no intermediate files) -- Computes metadata (size, manifest) automatically on exit -- Cleans up storage if an error occurs (atomic) -- Requires primary key values before calling `store()` or `open()` - -Use `staged.store(field, ext)` for FSMap access (Zarr), or `staged.open(field, ext)` for file-like access. +See [Staged Insert](staged-insert.md) for the full API, atomicity guarantees, and patterns for Zarr, HDF5, and streaming sources. ## Attachments From e0f4de01b76e09000e2becbc555ee87043987bbc Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Thu, 21 May 2026 09:07:22 -0500 Subject: [PATCH 2/2] docs: address review feedback on staged-insert page From @MilagrosMarin's review on #175: - Drop the inaccurate ' written via a file handle' claim. staged_insert.py:100-101 explicitly rejects anything except codec name == 'object', so only is supported. Note the actual error behavior instead. - Drop 'content hash for hash-addressed codecs' from the metadata list. _compute_metadata always sets hash: None for both directory and single-file branches; no hash is ever computed. - Mention the named-store form '' alongside '' in the Table.staged_insert1 API reference. - Add a Limitations bullet noting that cleanup catches Exception not BaseException, so KeyboardInterrupt mid-write can leave staged objects behind; point to the garbage-collection how-to. --- src/how-to/staged-insert.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/how-to/staged-insert.md b/src/how-to/staged-insert.md index 5e916e75..cc50182a 100644 --- a/src/how-to/staged-insert.md +++ b/src/how-to/staged-insert.md @@ -12,7 +12,7 @@ This pattern is the right choice when: - You want to stream or write in chunks rather than buffer in memory - You want all-or-nothing semantics across object storage and the database -It is only available for object-typed fields (`<...@>` syntax) and codecs that support direct storage handles — primarily `` (Zarr / HDF5 / multi-file) and `` written via a file handle. For ordinary inserts of small or in-memory objects, use [`insert` / `insert1`](insert-data.md). +It is only available for `` fields — the schema-addressed codec used for Zarr arrays, HDF5 files, and other multi-file objects. Attempting `staged.store()` or `staged.open()` on a field of any other type raises `DataJointError`. For ordinary inserts of small or in-memory objects, use [`insert` / `insert1`](insert-data.md). ## Quick Start @@ -61,7 +61,7 @@ Inside the `with` block, the row is a draft — `staged.rec` collects attribute When the block exits without an exception, DataJoint: -1. Computes object metadata (size, manifest, content hash for hash-addressed codecs) from the staged objects. +1. Computes object metadata (size, manifest) from the staged objects. 2. Inserts the row into the database with the populated metadata. When the block raises, DataJoint: @@ -80,7 +80,7 @@ with Table.staged_insert1 as staged: ... ``` -Context manager property on every `dj.Table` subclass. Yields a `StagedInsert` object scoped to one row. +Context manager property on every `dj.Table` subclass. Yields a `StagedInsert` object scoped to one row. Writes go to the store referenced by the field's type spec — `` uses `stores.default`, and `` uses the named store. ### `staged.rec` @@ -171,6 +171,7 @@ If the database insert itself fails on exit (e.g., duplicate primary key), the s - Only one row per block — use a loop of `with` blocks for many rows, or use the standard `insert` for batches that fit in memory. - The block must set all primary key fields before calling `store()` or `open()`. - Requires `stores.default` configured, or a named store referenced by the field's type spec. +- Cleanup only runs for ordinary exceptions. `KeyboardInterrupt` (Ctrl+C) and other `BaseException` subclasses bypass the cleanup path, so a process killed mid-write may leave staged objects behind. Run the garbage collector to reclaim them — see [Clean Up Storage](garbage-collection.md). ## Troubleshooting