Skip to content

fix(spark): Replace mapInArrow with foreachPartition for materialization#6370

Draft
abhijeet-dhumal wants to merge 7 commits intofeast-dev:masterfrom
abhijeet-dhumal:fix/spark-vector-store-materialization
Draft

fix(spark): Replace mapInArrow with foreachPartition for materialization#6370
abhijeet-dhumal wants to merge 7 commits intofeast-dev:masterfrom
abhijeet-dhumal:fix/spark-vector-store-materialization

Conversation

@abhijeet-dhumal
Copy link
Copy Markdown
Contributor

@abhijeet-dhumal abhijeet-dhumal commented May 6, 2026

What this PR does / why we need it

Fixes materialization failures when using BatchFeatureView + SparkComputeEngine with vector stores and Spark 3.5+.

1. Replace mapInArrow with foreachPartition (main fix)

Spark 3.5 inserts WindowGroupLimitExec upstream of MapInArrowExec when UDFs use Window operations, routing the Python worker through the wrong serialiser:

AttributeError: 'list' object has no attribute 'dtype'

foreachPartition uses pickle — no Arrow UDF bridge — so the mismatch cannot occur.

2. Re-apply session configs on reuse

SparkSession.getOrCreate() silently drops new spark.sql.* / spark.hadoop.* overrides when a session already exists, causing S3 access failures. Now explicitly re-applied after getOrCreate().

3. Fix map_in_pandas dummy yield

Changed pd.DataFrame([pd.Series(range(1, 2))])pd.DataFrame({"status": [0]}) to match the declared return schema.

4. Remove redundant _apply_bfv_transformations

Superseded by SparkFeatureBuilder DAG pipeline (#6357) which handles BFV transformations for both materialization and historical retrieval.

5. Add spark_embed utility

Helper for @batch_feature_view UDFs that generate embeddings via SentenceTransformer. Uses localCheckpoint(eager=True) to sever Python lineage before downstream writes.

Which issue(s) this PR fixes

Fixes BatchFeatureView materialization with vector stores (Milvus/Redis), Spark 3.5+ Window operations, and K8s-mode session config drift.

Testing Strategy

  • Unit tests: session config forwarding, map_in_pandas schema, empty partition handling, S3 event log dir
  • Manual: Materialized BatchFeatureView with PySpark + RAPIDS GPU on K8s (4 executors), 26M+ keys to Redis

Checks

  • Tests passing
  • Commits signed off
  • PR title follows conventional commits

…config forwarding for vector store materialization

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@abhijeet-dhumal abhijeet-dhumal force-pushed the fix/spark-vector-store-materialization branch from 60fe5cf to 2cee78e Compare May 6, 2026 08:57
@abhijeet-dhumal abhijeet-dhumal marked this pull request as ready for review May 6, 2026 09:09
@abhijeet-dhumal abhijeet-dhumal requested a review from a team as a code owner May 6, 2026 09:10
@abhijeet-dhumal abhijeet-dhumal changed the title fix(spark): replace mapInArrow with foreachPartition and fix session … fix(spark): Replace mapInArrow with foreachPartition for materialization May 6, 2026
abhijeet-dhumal and others added 4 commits May 6, 2026 14:45
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
SparkSource previously required exactly one of table/query/path.
This relaxes the constraint to allow query + path together:
- query: used for reading raw data during materialization
- path: used for offline write-back (offline=True) and as
  pre-computed read source in get_historical_features

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
@abhijeet-dhumal abhijeet-dhumal marked this pull request as draft May 7, 2026 08:01
abhijeet-dhumal and others added 2 commits May 7, 2026 13:32
Co-authored-by: Cursor <cursoragent@cursor.com>
…MemoryError/OOMKill on large feature views

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant