diff --git a/mkdocs.yaml b/mkdocs.yaml index 89e7ca9c..7428be7e 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -80,6 +80,7 @@ nav: - Overview: how-to/object-storage-overview.md - Choose Storage Type: how-to/choose-storage-type.md - Use Object Storage: how-to/use-object-storage.md + - Staged Insert: how-to/staged-insert.md - Use NPY Codec: how-to/use-npy-codec.md - Use Plugin Codecs: how-to/use-plugin-codecs.md - Create Custom Codecs: how-to/create-custom-codec.md diff --git a/src/how-to/insert-data.md b/src/how-to/insert-data.md index dd68ed97..e46da3e2 100644 --- a/src/how-to/insert-data.md +++ b/src/how-to/insert-data.md @@ -108,6 +108,8 @@ ImageData.insert1({ }) ``` +For multi-GB arrays, Zarr stores, HDF5 files, or any object too large to hold in memory before insert, use [Staged Insert](staged-insert.md) instead — it writes directly to object storage and commits the row atomically. + ## Insert Options Summary | Option | Default | Description | @@ -146,6 +148,7 @@ if Subject.validate(rows): ## See Also +- [Staged Insert](staged-insert.md) — Atomic insert for large objects written directly to object storage - [Master-Part Tables](master-part.ipynb) — Atomic insertion of master and parts - [Define Tables](define-tables.md) — Table definition syntax - [Delete Data](delete-data.md) — Removing data from tables diff --git a/src/how-to/staged-insert.md b/src/how-to/staged-insert.md new file mode 100644 index 00000000..cc50182a --- /dev/null +++ b/src/how-to/staged-insert.md @@ -0,0 +1,191 @@ +# Staged Insert + +Write large objects directly to object storage as part of an atomic insert. + +## Overview + +`staged_insert1` is a context manager for inserting rows whose object-typed fields are too large to copy through local storage first. It writes directly to the destination object store while the row is being built, then finalizes the database insert when the block exits cleanly. If an exception is raised inside the block, the staged objects are cleaned up and no row is inserted. + +This pattern is the right choice when: + +- Objects are large (multi-GB arrays, long recordings, image stacks) +- You want to stream or write in chunks rather than buffer in memory +- You want all-or-nothing semantics across object storage and the database + +It is only available for `` fields — the schema-addressed codec used for Zarr arrays, HDF5 files, and other multi-file objects. Attempting `staged.store()` or `staged.open()` on a field of any other type raises `DataJointError`. For ordinary inserts of small or in-memory objects, use [`insert` / `insert1`](insert-data.md). + +## Quick Start + +```python +import zarr +import datajoint as dj + +schema = dj.Schema('imaging') + +@schema +class ImagingSession(dj.Manual): + definition = """ + subject_id : int32 + session_id : int32 + --- + n_frames : int32 + frame_rate : float32 + frames : + """ + +with ImagingSession.staged_insert1 as staged: + # 1. Set primary key values first + staged.rec['subject_id'] = 1 + staged.rec['session_id'] = 1 + + # 2. Get a storage handle for the object field + store = staged.store('frames', '.zarr') + + # 3. Write directly to object storage (no local copy) + z = zarr.open(store, mode='w', shape=(1000, 512, 512), + chunks=(10, 512, 512), dtype='int32') + for i in range(1000): + z[i] = acquire_frame() + + # 4. Set remaining record attributes + staged.rec['n_frames'] = 1000 + staged.rec['frame_rate'] = 30.0 + +# On clean exit: metadata is computed, row is inserted +# On exception: staged objects are removed, no row is inserted +``` + +## How It Works + +Inside the `with` block, the row is a draft — `staged.rec` collects attribute values, and `staged.store(field, ext)` / `staged.open(field, ext)` return handles that write directly to the destination object store. + +When the block exits without an exception, DataJoint: + +1. Computes object metadata (size, manifest) from the staged objects. +2. Inserts the row into the database with the populated metadata. + +When the block raises, DataJoint: + +1. Removes any objects that were written inside the block. +2. Skips the database insert. + +This gives the same atomicity guarantee as an ordinary `insert1` — readers never see a row whose object data is partial. + +## API Reference + +### `Table.staged_insert1` + +```python +with Table.staged_insert1 as staged: + ... +``` + +Context manager property on every `dj.Table` subclass. Yields a `StagedInsert` object scoped to one row. Writes go to the store referenced by the field's type spec — `` uses `stores.default`, and `` uses the named store. + +### `staged.rec` + +A dict for the row's attribute values. Set primary key fields **before** calling `staged.store()` or `staged.open()` — the storage path is derived from the primary key. + +```python +staged.rec['subject_id'] = 1 +staged.rec['session_id'] = 1 +``` + +### `staged.store(field, ext='')` + +Returns an `fsspec.FSMap` for an object field. Suitable for Zarr, xarray, or any library that takes a mapping-style store. + +```python +store = staged.store('frames', '.zarr') +z = zarr.open(store, mode='w', shape=..., dtype=...) +``` + +### `staged.open(field, ext='', mode='wb')` + +Returns a file-like object for an object field. Suitable for HDF5, raw binary, or any library that takes a file handle. + +```python +with staged.open('recording', '.h5') as f: + h5py.File(f, mode='w').create_dataset('data', data=arr) +``` + +### `staged.fs` + +The underlying `fsspec.AbstractFileSystem` for advanced operations (listing, deleting, custom paths). Most users won't need this. + +## Patterns + +### Zarr arrays + +```python +with Recording.staged_insert1 as staged: + staged.rec['recording_id'] = recording_id + z = zarr.open(staged.store('frames', '.zarr'), mode='w', + shape=(n_frames, h, w), chunks=(1, h, w), dtype='uint16') + for i, frame in enumerate(stream): + z[i] = frame +``` + +### HDF5 files + +```python +import h5py + +with Recording.staged_insert1 as staged: + staged.rec['recording_id'] = recording_id + with staged.open('raw', '.h5') as f: + with h5py.File(f, 'w') as h5: + h5.create_dataset('signal', data=signal, chunks=True) + h5.attrs['fs'] = sampling_rate +``` + +### Streaming from an instrument + +Set the primary key, get the handle, then write as data arrives. The block exits — and the row commits — only after the stream is fully captured: + +```python +with ImagingSession.staged_insert1 as staged: + staged.rec['subject_id'] = subject_id + staged.rec['session_id'] = session_id + + z = zarr.open(staged.store('frames', '.zarr'), mode='w', ...) + for i in range(n_frames): + z[i] = camera.grab() + + staged.rec['n_frames'] = n_frames +``` + +If the camera errors out mid-stream, the partial Zarr is removed and the row is not inserted. + +## Error Handling and Atomicity + +A `staged_insert1` block is atomic across object storage and the database: + +- **Object storage**: anything written via `staged.store()` / `staged.open()` is staged under a path derived from the primary key. On exception inside the block, those staged objects are removed. +- **Database**: the row is only inserted on clean exit. + +If the database insert itself fails on exit (e.g., duplicate primary key), the staged objects are also removed. + +## Limitations + +- Only one row per block — use a loop of `with` blocks for many rows, or use the standard `insert` for batches that fit in memory. +- The block must set all primary key fields before calling `store()` or `open()`. +- Requires `stores.default` configured, or a named store referenced by the field's type spec. +- Cleanup only runs for ordinary exceptions. `KeyboardInterrupt` (Ctrl+C) and other `BaseException` subclasses bypass the cleanup path, so a process killed mid-write may leave staged objects behind. Run the garbage collector to reclaim them — see [Clean Up Storage](garbage-collection.md). + +## Troubleshooting + +### `Storage is not configured` + +Set `stores.default` and `stores.` in `datajoint.json` or via `dj.config`. See [Configure Object Storage](configure-storage.md). + +### `Primary key not set` when calling `staged.store()` + +Set primary key attributes on `staged.rec` before calling `staged.store()` or `staged.open()`. The object path depends on the primary key. + +## See Also + +- [Insert Data](insert-data.md) — Standard insert for ordinary rows +- [Use Object Storage](use-object-storage.md) — Object-augmented schemas and storage types +- [Configure Object Storage](configure-storage.md) — Store configuration +- [Use the `` Codec](use-npy-codec.md) — NumPy array storage with lazy fetch diff --git a/src/how-to/use-object-storage.md b/src/how-to/use-object-storage.md index af714209..d3753462 100644 --- a/src/how-to/use-object-storage.md +++ b/src/how-to/use-object-storage.md @@ -157,52 +157,17 @@ Use schema-addressed storage for: ## Write Directly to Object Storage -For large datasets like multi-GB imaging recordings, avoid intermediate copies by writing directly to object storage with `staged_insert1`: +For multi-GB imaging recordings, Zarr arrays, HDF5 files, or any object too large to round-trip through local storage, use [Staged Insert](staged-insert.md). It writes directly to the destination object store inside a context manager and commits the database row atomically on clean exit: ```python -import zarr - -@schema -class ImagingSession(dj.Manual): - definition = """ - subject_id : int32 - session_id : int32 - --- - n_frames : int32 - frame_rate : float32 - frames : - """ - -# Write Zarr directly to object storage with ImagingSession.staged_insert1 as staged: - # 1. Set primary key values first staged.rec['subject_id'] = 1 staged.rec['session_id'] = 1 - - # 2. Get storage handle - store = staged.store('frames', '.zarr') - - # 3. Write directly (no local copy) - z = zarr.open(store, mode='w', shape=(1000, 512, 512), - chunks=(10, 512, 512), dtype='int32') - for i in range(1000): - z[i] = acquire_frame() # Write frame-by-frame - - # 4. Set remaining attributes - staged.rec['n_frames'] = 1000 - staged.rec['frame_rate'] = 30.0 - -# Record inserted with computed metadata on successful exit + z = zarr.open(staged.store('frames', '.zarr'), mode='w', ...) + ... ``` -The `staged_insert1` context manager: - -- Writes directly to the object store (no intermediate files) -- Computes metadata (size, manifest) automatically on exit -- Cleans up storage if an error occurs (atomic) -- Requires primary key values before calling `store()` or `open()` - -Use `staged.store(field, ext)` for FSMap access (Zarr), or `staged.open(field, ext)` for file-like access. +See [Staged Insert](staged-insert.md) for the full API, atomicity guarantees, and patterns for Zarr, HDF5, and streaming sources. ## Attachments