|
| 1 | +# Spec: Parquet Dump/Restore for Subgraph Data |
| 2 | + |
| 3 | +## Problem |
| 4 | + |
| 5 | +Subgraph entity data lives exclusively in PostgreSQL. There's no way to export a subgraph's data for backup, migration between environments, or sharing. We need a file-based dump/restore mechanism. |
| 6 | + |
| 7 | +## Goals |
| 8 | + |
| 9 | +1. **Dump** a subgraph's entity data to parquet files |
| 10 | +2. **Restore** a subgraph from parquet files |
| 11 | +3. **Incremental append** -- add newly arrived data to an existing dump without rewriting it |
| 12 | +4. Long-term: make dumping an **ongoing process** (not just a one-off CLI operation) |
| 13 | + |
| 14 | +## Non-goals (for now) |
| 15 | + |
| 16 | +- S3/GCS output |
| 17 | +- Schema evolution / migration between different schema versions |
| 18 | + |
| 19 | +## Data Model Recap |
| 20 | + |
| 21 | +Each subgraph deployment has: |
| 22 | +- A PostgreSQL schema (e.g., `sgd123`) |
| 23 | +- One table per entity type, with columns: |
| 24 | + - `vid` (bigserial) -- row version ID, primary key |
| 25 | + - `block_range` (int4range) for mutable entities OR `block$` (int) for immutable |
| 26 | + - `causality_region` (int, optional) -- for offchain data sources |
| 27 | + - Data columns matching GraphQL fields (text, int, bigint, numeric, bytea, bool, timestamptz, arrays, enums) |
| 28 | +- `data_sources$` table -- dynamic data sources created at runtime (defined in `dynds/private.rs`, separate from Layout): |
| 29 | + - `vid` (int, identity PK), `block_range` (int4range), `causality_region` (int) |
| 30 | + - `manifest_idx` (int), `parent` (int, self-ref FK), `id` (bytea) |
| 31 | + - `param` (bytea, nullable), `context` (jsonb, nullable), `done_at` (int, nullable) |
| 32 | + - Note: `parent` and `id` exist in the DDL but are currently unused by `insert()`, `load()`, and `copy_to()`. The dump should include them for completeness (they may contain data in older deployments). |
| 33 | +- `poi2$` table -- Proof of Indexing data (a **mutable** entity table in the Layout, i.e. it uses `block_range` not `block$`; has `digest` (bytea), `id` (text), optionally `block_time$` (int8); `has_causality_region: false`). Conditionally created when `catalog.use_poi` is true. Excluded from some Layout operations like `find_changes()`. |
| 34 | +- Metadata in `subgraphs.subgraph_manifest`, `subgraphs.deployment`, and `subgraphs.head` tables |
| 35 | + |
| 36 | +## Dump Format |
| 37 | + |
| 38 | +### Directory layout |
| 39 | + |
| 40 | +``` |
| 41 | +<dump-dir>/ |
| 42 | + metadata.json -- deployment metadata + per-table state |
| 43 | + schema.graphql -- raw GraphQL schema text |
| 44 | + subgraph.yaml -- raw subgraph manifest YAML (optional) |
| 45 | + <EntityType>/ |
| 46 | + chunk_000000.parquet -- rows ordered by vid |
| 47 | + chunk_000001.parquet -- incremental append |
| 48 | + ... |
| 49 | + data_sources$/ |
| 50 | + chunk_000000.parquet -- dynamic data sources |
| 51 | +``` |
| 52 | + |
| 53 | +One parquet file per entity type (each has a different columnar schema). The `data_sources$` table is dumped alongside entity tables. The `poi2$` table, when present, is a regular entity table in the `Layout` (conditionally created when `catalog.use_poi` is true) and appears as any other entity type directory. Incremental dumps produce new chunk files rather than rewriting existing ones. |
| 54 | + |
| 55 | +The GraphQL schema and subgraph manifest YAML are stored as separate files rather than embedded in `metadata.json`. This matches the existing dump code in `dump.rs` and keeps the files human-readable and diffable. |
| 56 | + |
| 57 | +### Parquet schema per entity type |
| 58 | + |
| 59 | +System columns (always present): |
| 60 | +- `vid` -> Int64 |
| 61 | +- Immutable entities: `block$` -> Int32 |
| 62 | +- Mutable entities: `block_range_start` -> Int32, `block_range_end` -> Int32 (nullable; null = unbounded/current) |
| 63 | +- `causality_region` -> Int32 (only if table has it) |
| 64 | + |
| 65 | +Data columns mapped from `ColumnType` (all 10 variants defined in `relational.rs:1342`): |
| 66 | + |
| 67 | +| ColumnType | Arrow DataType | Notes | |
| 68 | +|--------------------|---------------------------|-------------------------------| |
| 69 | +| Boolean | Boolean | | |
| 70 | +| Int | Int32 | | |
| 71 | +| Int8 | Int64 | | |
| 72 | +| Bytes | Binary | Raw bytes | |
| 73 | +| BigInt | Utf8 | Arbitrary precision as string | |
| 74 | +| BigDecimal | Utf8 | Arbitrary precision as string | |
| 75 | +| Timestamp | TimestampMicrosecond(None) | Matches Value::Timestamp | |
| 76 | +| String | Utf8 | | |
| 77 | +| Enum(EnumType) | Utf8 | String value of enum variant | |
| 78 | +| TSVector(FulltextConfig) | **Skip** | Generated; rebuild on restore | |
| 79 | + |
| 80 | +**List/array columns:** `List(T)` is not a `ColumnType` variant. Whether a column is an array is determined by `Column.is_list()` (delegates to the GraphQL `field_type`). A `[String]` field has `column_type: ColumnType::String` with a list-typed `field_type`. For Arrow mapping, check `column.is_list()` and wrap the base Arrow type in `List<mapped T>`. In `OidValue`, arrays have separate variants (`StringArray`, `BytesArray`, `BoolArray`, `Ints`, `Int8Array`, `BigDecimalArray`, `TimestampArray`). |
| 81 | + |
| 82 | +Nullability follows the GraphQL schema (non-null fields -> non-nullable Arrow columns). |
| 83 | + |
| 84 | +### metadata.json |
| 85 | + |
| 86 | +Contains everything needed to reconstruct the deployment's table structure, plus diagnostic information (health, indexes) captured at dump time. The GraphQL schema and manifest YAML are stored in separate files (`schema.graphql`, `subgraph.yaml`), not embedded here. |
| 87 | + |
| 88 | +The struct backing this file is `Metadata` (evolved from the existing `Control` struct in `dump.rs`). |
| 89 | + |
| 90 | +```json |
| 91 | +{ |
| 92 | + "version": 1, |
| 93 | + "deployment": "Qm...", |
| 94 | + "network": "mainnet", |
| 95 | + |
| 96 | + "manifest": { |
| 97 | + "spec_version": "1.0.0", |
| 98 | + "description": "Optional subgraph description", |
| 99 | + "repository": "https://github.com/...", |
| 100 | + "features": ["..."], |
| 101 | + "entities_with_causality_region": ["EntityType1"], |
| 102 | + "history_blocks": 2147483647 |
| 103 | + }, |
| 104 | + |
| 105 | + "earliest_block_number": 12345, |
| 106 | + "start_block": { "number": 12345, "hash": "0xabc..." }, |
| 107 | + "head_block": { "number": 99999, "hash": "0xdef..." }, |
| 108 | + "entity_count": 150000, |
| 109 | + |
| 110 | + "graft_base": null, |
| 111 | + "graft_block": null, |
| 112 | + "debug_fork": null, |
| 113 | + |
| 114 | + "health": { |
| 115 | + "failed": false, |
| 116 | + "health": "healthy", |
| 117 | + "fatal_error": null, |
| 118 | + "non_fatal_errors": [] |
| 119 | + }, |
| 120 | + |
| 121 | + "indexes": { |
| 122 | + "token": [ |
| 123 | + "CREATE INDEX CONCURRENTLY IF NOT EXISTS attr_0_0_id ON sgd.token USING btree (id)" |
| 124 | + ] |
| 125 | + }, |
| 126 | + |
| 127 | + "tables": { |
| 128 | + "Token": { |
| 129 | + "immutable": true, |
| 130 | + "has_causality_region": false, |
| 131 | + "chunks": [ |
| 132 | + { "file": "Token/chunk_000000.parquet", "min_vid": 0, "max_vid": 50000, "row_count": 50000 } |
| 133 | + ], |
| 134 | + "max_vid": 50000 |
| 135 | + }, |
| 136 | + "data_sources$": { |
| 137 | + "immutable": false, |
| 138 | + "has_causality_region": true, |
| 139 | + "chunks": [ |
| 140 | + { "file": "data_sources$/chunk_000000.parquet", "min_vid": 0, "max_vid": 100, "row_count": 100 } |
| 141 | + ], |
| 142 | + "max_vid": 100 |
| 143 | + } |
| 144 | + } |
| 145 | +} |
| 146 | +``` |
| 147 | + |
| 148 | +**Field sources:** |
| 149 | + |
| 150 | +| Field | Source | Code path | |
| 151 | +|-------|--------|-----------| |
| 152 | +| `manifest.*` | `subgraphs.subgraph_manifest` | `SubgraphManifestEntity` via `deployment_entity()` in `detail.rs` | |
| 153 | +| `start_block` | `subgraphs.subgraph_manifest` | `start_block_number`, `start_block_hash` columns; available via `StoredSubgraphManifest` in `detail.rs:542-543`, assembled into `SubgraphDeploymentEntity.start_block` | |
| 154 | +| `earliest_block_number` | `subgraphs.deployment` | `SubgraphDeploymentEntity.earliest_block_number` | |
| 155 | +| `graft_base`, `graft_block` | `subgraphs.deployment` | `SubgraphDeploymentEntity.graft_base`, `.graft_block` | |
| 156 | +| `debug_fork` | `subgraphs.deployment` | `SubgraphDeploymentEntity.debug_fork` | |
| 157 | +| `head_block` | `subgraphs.head` | `SubgraphDeploymentEntity.latest_block` | |
| 158 | +| `entity_count` | `subgraphs.head` | `DeploymentDetail.entity_count` (i64 in DB, usize in Rust) | |
| 159 | +| `health.*` | `subgraphs.deployment` + `subgraph_error` | `SubgraphDeploymentEntity.{failed, health, fatal_error, non_fatal_errors}` | |
| 160 | +| `indexes` | `pg_indexes` catalog | `IndexList::load()` → `CreateIndex::to_sql()` (existing code in `dump.rs:163-179`) | |
| 161 | +| `network` | `deployment_schemas` | `Site.network` | |
| 162 | +| `tables.*` | `Layout.tables` | `Table.{immutable, has_causality_region}` | |
| 163 | + |
| 164 | +**Notes:** |
| 165 | +- `use_bytea_prefix` is not stored in the dump. It is hardcoded to `true` in `create_deployment` (deployment.rs:1302) and will always be set to `true` on restore. |
| 166 | +- `health` and `indexes` are point-in-time diagnostic snapshots. They are not used during restore (a restored deployment starts healthy; indexes are auto-created by `Layout::create_relational_schema()`). They are included for inspection and debugging. |
| 167 | +- `indexes` are serialized as SQL strings using `CreateIndex::with_nsp("sgd")` + `to_sql(true, true)`, producing `CREATE INDEX CONCURRENTLY IF NOT EXISTS` statements with a normalized `sgd` namespace. |
| 168 | +- The `manifest` fields mirror the existing `Manifest` struct in `dump.rs` (derived from `SubgraphManifestEntity`). The `schema` and `raw_yaml` fields of `SubgraphManifestEntity` are written to separate files instead. |
| 169 | +- The `poi2$` table, when present, is a regular mutable entity table in `Layout.tables` and appears in the `tables` map like any other entity. It does not need special handling. |
| 170 | + |
| 171 | +The raw GraphQL schema (in `schema.graphql`) is sufficient to reconstruct the full relational layout via `InputSchema::parse(spec_version, schema, deployment_hash)` → `Layout::new()`. The `InputSchema::parse()` call requires `manifest.spec_version` for version-specific parsing logic. |
| 172 | + |
| 173 | +## Dump Process |
| 174 | + |
| 175 | +**Existing code:** There is already a metadata-only dump in `store/postgres/src/relational/dump.rs` (`Layout::dump()`) that writes `control.json`, `schema.graphql`, and `subgraph.yaml`. It is called via `DeploymentStore::dump()` (deployment_store.rs:901) which loads `Layout` + `IndexList` and passes both to `Layout::dump()`. The connection is `AsyncPgConnection` via `pool.get_permitted()`. The new parquet dump extends this to include entity data. |
| 176 | + |
| 177 | +The existing `Control` struct is renamed to `Metadata` and extended with the fields described above. The existing `Manifest`, `BlockPtr`, `Health`, and `Error` structs in `dump.rs` are reused and extended. |
| 178 | + |
| 179 | +1. Resolve the deployment (by name, hash, or sgdN) |
| 180 | +2. Read deployment metadata from `subgraph_manifest` + `deployment` + `head` tables (via `deployment_entity()` in `detail.rs`) |
| 181 | +3. Write `schema.graphql` and `subgraph.yaml` (existing behavior) |
| 182 | +4. For each entity type table in `Layout.tables` (sorted by name for determinism; includes `poi2$` when present): |
| 183 | + a. Query rows in vid order, batched (adaptive sizing like `VidBatcher`) |
| 184 | + b. Convert PG rows directly to Arrow `RecordBatch` (no JSON intermediate) |
| 185 | + c. Write batches to parquet file |
| 186 | + d. Record chunk info (file path, min_vid, max_vid, row_count) |
| 187 | +5. Dump `data_sources$` table (fixed schema, same batch approach; include all DDL columns: `vid`, `block_range`, `causality_region`, `manifest_idx`, `parent`, `id`, `param`, `context`, `done_at`). Note: `parent` and `id` are in the DDL (`private.rs:68-69`) but not in the `DataSourcesTable` struct — dumping them requires raw SQL or extending the struct. |
| 188 | +6. Write `metadata.json` atomically (write to tmp file, rename) |
| 189 | + |
| 190 | +### Incremental append |
| 191 | + |
| 192 | +- Read existing `metadata.json` to get `max_vid` per entity type |
| 193 | +- Query rows with `vid > max_vid` |
| 194 | +- Write as new chunk files (`chunk_000001.parquet`, etc.) |
| 195 | +- Update metadata atomically |
| 196 | + |
| 197 | +## Restore Process |
| 198 | + |
| 199 | +1. Read `metadata.json` and `schema.graphql` |
| 200 | +2. Parse schema via `InputSchema::parse(manifest.spec_version, schema_text, deployment_hash)` |
| 201 | +3. Create a `Site` entry in `deployment_schemas` (needed for the deployment to be discoverable) |
| 202 | +4. Create deployment via `create_deployment(conn, site, DeploymentCreate { .. })` -- this populates three tables: |
| 203 | + - `subgraphs.head` (block pointers, entity count -- initially null/0) |
| 204 | + - `subgraphs.deployment` (deployment hash, earliest_block, graft info, health) |
| 205 | + - `subgraphs.subgraph_manifest` (schema from file, features/spec_version/etc. from metadata) |
| 206 | +5. Create tables via `Layout::create_relational_schema()` -- this generates DDL from the parsed schema and creates all entity tables with default indexes |
| 207 | +6. Restore `data_sources$` table via DDL from `DataSourcesTable::new().as_ddl()` + batch-insert |
| 208 | +7. For each entity type (including `poi2$` if present), read all parquet chunks in order, batch-insert into PG |
| 209 | +8. Reset vid sequences to `max_vid + 1` for all entity tables and data_sources$ |
| 210 | +9. Update `subgraphs.head` with `head_block.number`, `head_block.hash`, and `entity_count` from dump metadata |
| 211 | + |
| 212 | +## PG Read Strategy: OidValue-based Dynamic Columns |
| 213 | + |
| 214 | +Use the existing `dsl::Table::select_cols()` + `DynamicRow<OidValue>` pattern (see `store/postgres/src/relational/dsl.rs` and `store/postgres/src/relational/value.rs`). This already solves dynamic-schema typed extraction through the connection pool: |
| 215 | + |
| 216 | +1. `select_cols()` builds a typed SELECT using `DynamicSelectClause` for any set of columns |
| 217 | +2. Results are `DynamicRow<OidValue>` where `OidValue` dispatches on PG OID at runtime |
| 218 | +3. `OidValue` captures all needed types: String, Bytes, Bool, Int, Int8, BigDecimal, Timestamp, plus array variants |
| 219 | +4. Convert `OidValue` -> Arrow `ArrayBuilder` (analogous to existing `OidValue` -> `Entity` in `FromOidRow`) |
| 220 | + |
| 221 | +No JSON, no separate connection. The existing `DeploymentStore::dump()` already uses `AsyncPgConnection` via `pool.get_permitted()`. |
| 222 | + |
| 223 | +**Block range handling:** Add `OidValue::Int4Range(Bound<i32>, Bound<i32>)` variant (OID 3904). Diesel already has `FromSql<Range<Integer>, Pg>` for `(Bound<i32>, Bound<i32>)` which parses the binary format. ~15 lines of code in `value.rs` + fix the `BLOCK_RANGE_COL` placeholder in `dsl.rs:46-49` (currently `ColumnType::Bytes`, with comment "we can't deserialize in4range"). This resolves the existing TODO at dsl.rs line 294. |
| 224 | + |
| 225 | +**Key existing code:** |
| 226 | +- `dsl::Table::select_cols()` (`store/postgres/src/relational/dsl.rs:305`) |
| 227 | +- `OidValue` enum and `FromSql<Any, Pg>` impl (`store/postgres/src/relational/value.rs:33`) |
| 228 | +- `FromOidRow` trait for result deserialization (`value.rs:206`) |
| 229 | +- `selected_columns()` for building column list with system columns (`dsl.rs:246`) |
| 230 | + |
| 231 | +### Vid continuity on restore |
| 232 | + |
| 233 | +Preserve original vid values. Needed for incremental consistency and simpler to implement. Reset vid sequence to `max_vid + 1` after restore. |
| 234 | + |
| 235 | +## Where Code Lives |
| 236 | + |
| 237 | +- `store/postgres/src/relational/dump.rs` -- **Existing** metadata-only dump. Contains `Manifest`, `BlockPtr`, `Error`, `Health`, `Control` structs. `Control` will be renamed to `Metadata` and extended. The existing helper structs (`Manifest`, `BlockPtr`, `Health`, `Error`) are reused. Called via `DeploymentStore::dump()` → `Layout::dump()`. |
| 238 | +- `store/postgres/src/parquet/` -- New module for parquet read/write/schema mapping |
| 239 | +- `node/src/manager/commands/dump.rs` -- **Existing** CLI command skeleton (resolves deployment, calls `SubgraphStore::dump()`) |
| 240 | +- `node/src/manager/commands/restore.rs` -- New CLI command for restore |
| 241 | +- Expose via `command_support` in `store/postgres/src/lib.rs` |
| 242 | + |
| 243 | +## Dependencies to Add |
| 244 | + |
| 245 | +- `parquet = "=57.3.0"` (same version as existing `arrow`) to workspace and `store/postgres` |
| 246 | +- `arrow` workspace dep to `store/postgres` |
| 247 | + |
| 248 | +## Existing Code to Reuse |
| 249 | + |
| 250 | +- `Layout`, `Table`, `Column`, `ColumnType` (`store/postgres/src/relational.rs`) -- schema introspection; `poi2$` is in Layout as a mutable entity table (conditionally, when `catalog.use_poi` is true) |
| 251 | +- `Column.is_list()` (`relational.rs:1574`) -- determines if a column is an array type (delegates to GraphQL `field_type.is_list()`) |
| 252 | +- `DataSourcesTable` (`store/postgres/src/dynds/private.rs`) -- `data_sources$` DDL via `as_ddl()` method; note that `parent` and `id` columns are in the DDL but not in the struct's typed fields |
| 253 | +- `VidBatcher` (`store/postgres/src/vid_batcher.rs`) -- adaptive batch iteration using PG histogram statistics |
| 254 | +- `copy.rs` pattern -- progress reporting, batch operation lifecycle; already handles both entity tables and `data_sources$` copying |
| 255 | +- `InputSchema::parse(spec_version, raw, id)` (`graph/src/schema/input/mod.rs:965`) -- schema reconstruction from text |
| 256 | +- `DeploymentSearch` (`node/src/manager/deployment.rs`) -- CLI deployment resolution (supports name, Qm hash, sgdN namespace) |
| 257 | +- `create_deployment` (`store/postgres/src/deployment.rs:1224`) -- populates `head`, `deployment`, and `subgraph_manifest` tables |
| 258 | +- `DeploymentCreate` + `SubgraphManifestEntity` (`graph/src/data/subgraph/schema.rs:103`) -- structs needed by `create_deployment` |
| 259 | +- `deployment_entity()` (`store/postgres/src/detail.rs`) -- reads deployment metadata into `SubgraphDeploymentEntity`; note that `start_block_*` is in the DB table (`StoredSubgraphManifest`) but only partially exposed through `SubgraphDeploymentEntity` |
| 260 | +- `IndexList::load()` + `CreateIndex::to_sql()` (`store/postgres/src/relational/index.rs`) -- loads and serializes indexes |
| 261 | +- Existing `dump.rs` (`store/postgres/src/relational/dump.rs`) -- metadata serialization types: `Manifest`, `BlockPtr`, `Health`, `Error`, `Control` (to be renamed `Metadata`) |
| 262 | + |
| 263 | +## Implementation Order |
| 264 | + |
| 265 | +1. Schema mapping + metadata types (foundation, unit-testable) |
| 266 | +2. Parquet writer (dump from PG) + graphman `dump` command |
| 267 | +3. Incremental append support |
| 268 | +4. Parquet reader (restore to PG) + graphman `restore` command |
| 269 | +5. Ongoing dump integration (run as part of graph-node, not just CLI) |
0 commit comments