Add support for logical and physical codecs#1541
Open
timsaucer wants to merge 10 commits into
Open
Conversation
Introduces a single composable codec layer that every serializer reads from the session, replacing the hardcoded `DefaultLogicalExtensionCodec` / `DefaultPhysicalExtensionCodec` calls scattered across PyLogicalPlan, PyExecutionPlan, and the Rust-wrapped Python provider plumbing. Key changes: * New `PythonLogicalCodec` and `PythonPhysicalCodec` (crates/core/src/codec.rs) wrap any inner `LogicalExtensionCodec` / `PhysicalExtensionCodec`. Both share a `DFPYUDF1` magic-prefix path for in-band cloudpickle encoding of Python scalar UDFs, so an `ExecutionPlan` / `PhysicalExpr` referencing a Python `ScalarUDF` round-trips through either layer. Magic-prefix registry table (DFPYUDF1 in use; DFPYUDA1 / DFPYUDW1 / DFPYPE1 reserved) documented in the module header. * `PySessionContext` stores `Arc<PythonLogicalCodec>` and `Arc<PythonPhysicalCodec>` directly. FFI wrappers are built on demand via `ffi_logical_codec()` / `ffi_physical_codec()` for capsule export and downstream `RustWrappedPy*` consumers. Adds `__datafusion_physical_extension_codec__` getter + `with_physical_extension_codec` setter (symmetric with the logical pair). * `PyLogicalPlan.to_proto` / `from_proto` renamed to `to_bytes` / `from_bytes`, now reading the session's logical codec. `to_proto` / `from_proto` survive as deprecated thin wrappers emitting `DeprecationWarning`. * `PyExecutionPlan` gains the same `to_bytes` / `from_bytes` rename + deprecated aliases, plus `__datafusion_execution_plan__` capsule getter and `from_pycapsule` (ported from poc_ffi_query_planner). * New `PyPhysicalExpr` class with `to_bytes` / `from_bytes` / `from_pycapsule` / `__datafusion_physical_expr__`. `from_bytes` takes an input pyarrow Schema for column-reference resolution. * `datafusion-python-util` gains `from_pycapsule!` / `try_from_pycapsule!` macros + `physical_codec_from_pycapsule`, `task_context_from_pycapsule`, `create_physical_extension_capsule` (ported from poc_ffi_query_planner). * `PythonFunctionScalarUDF` exposes `func()`, `input_fields()`, `return_field()`, `volatility()`, `from_parts()` accessors needed by the codec. Python wrapper updates: `LogicalPlan` / `ExecutionPlan` add `to_bytes` / `from_bytes` + deprecate `to_proto` / `from_proto`; `ExecutionPlan` adds capsule getter + `from_pycapsule`; new `PhysicalExpr` wrapper class exported from the top-level package; `SessionContext` exposes the physical codec capsule + setter. Test coverage in python/tests/test_plans.py: round-trip via new API, deprecation warnings on old API, capsule protocol getters, session-routed codec on both layers. `PyLogicalPlan` PyCapsule protocol is intentionally not added — `datafusion-ffi` does not expose `FFI_LogicalPlan`, so there is no stable cross-crate shape to publish. Round-tripping a `LogicalPlan` goes through `to_bytes` / `from_bytes` only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds four downstream-crate fixtures in `datafusion-ffi-example` so the new PR1 surface can be tested with the same FFI-handoff pattern used for table providers, UDFs, etc. Existing tests prove the API exists; these tests prove it composes with code that lives in another crate. New Rust types in `examples/datafusion-ffi-example/src/`: * `MyLogicalExtensionCodec` — delegates to `DefaultLogicalExtensionCodec` and bumps atomic counters on the UDF encode/decode entry points. Exported via `__datafusion_logical_extension_codec__`. Installed onto a session with `ctx.with_logical_extension_codec(my_codec)`. * `MyPhysicalExtensionCodec` — mirror for `PhysicalExtensionCodec`. * `MyExecutionPlan` — wraps a one-column `EmptyExec`, exposes `__datafusion_execution_plan__`. Lets the receiver consume an `ExecutionPlan` capsule that did not originate in datafusion-python. * `MyPhysicalExpr` — wraps `Literal(Int32(42))`, exposes `__datafusion_physical_expr__`. Same FFI handoff for physical expressions. New tests: * `_test_logical_extension_codec.py` — codec installs cleanly, the session re-exports its capsule, and `try_encode_udf` fires on the user codec when serializing a plan that references a `ScalarUDF`. The decode counterpart is a round-trip check rather than a counter assertion: when the UDF is in the receiver's function registry, `parse_expr` resolves by name before consulting the codec. * `_test_physical_extension_codec.py` — symmetric. * `_test_execution_plan.py` — parametrized over typed-class vs raw-capsule input; verifies `ExecutionPlan.from_pycapsule` consumes the downstream capsule. * `_test_physical_expr.py` — same for `PhysicalExpr.from_pycapsule`. API changes forced by the new tests: * `PyLogicalPlan.to_bytes`, `PyExecutionPlan.to_bytes`, `PyPhysicalExpr.to_bytes` now accept an optional `ctx` parameter. When supplied, encoding routes through the session's installed codec instead of a fresh default. `ctx=None` preserves the previous default-codec behavior used by the deprecated `to_proto` shims. * The util `from_pycapsule!` / `try_from_pycapsule!` macros now validate the capsule name via `pointer_checked(Some(c"..."))` rather than `pointer_checked(None)`. The latter rejects named capsules outright with CPython's "incorrect name" error. * `SessionContext.with_logical_extension_codec` and `with_physical_extension_codec` now wrap the returned internal context in `SessionContext` so the result has the full Python surface. The pre-existing logical setter was returning a raw internal object that lacked `sql()` and friends. `examples/datafusion-ffi-example/Cargo.toml` gains `datafusion` and `datafusion-proto` workspace dependencies for the new Rust impls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Review feedback pass. PR1 is now strictly the composable codec layer + session routing + class-method serialization API. Anything that touches actual Python UDF inline encoding or Python expression wrapping moves to PR2 alongside the pickle work. Dropped: * `encode_python_scalar_udf` / `decode_python_scalar_udf` helpers from `crates/core/src/codec.rs`, along with cloudpickle and pyarrow imports. The wrapper codecs now delegate every method to `inner`. `DFPYUDF1` magic constant is kept (marked `dead_code` for now) as a reservation so PR2 has a single definition site. * `udf.rs` reverted to pre-PR1 shape. The codec no longer needs `func()` / `input_fields()` / `volatility()` / `from_parts()` accessors. Re-added by PR2 when scalar-UDF inlining lands. * `PyPhysicalExpr` class + Python wrapper + `__init__` export + `MyPhysicalExpr` FFI fixture + `_test_physical_expr.py`. No consumer in PR1 or PR2 plan documents; symmetry with `PyExecutionPlan` is not enough to justify the surface area. * Rust-side `PyLogicalPlan::to_proto` / `from_proto` and `PyExecutionPlan::to_proto` / `from_proto` deprecated wrappers. The deprecation lives entirely in the Python wrapper layer, which emits `DeprecationWarning` and forwards to `to_bytes` / `from_bytes`. Less Rust duplication. * `PythonLogicalCodec::with_default_inner` / `PythonPhysicalCodec::with_default_inner` — redundant with `impl Default`. Logic moved into `Default::default`. * `PySessionContext::default_logical_codec` / `default_physical_codec` helpers. Inlined as `Arc::new(PythonLogicalCodec::default())` at the three call sites. Tests (root: 1076, FFI example: 36) all green after the cuts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous doc-block framed PythonLogicalCodec / PythonPhysicalCodec in terms of "PR1 delegates, PR2 will add encoding" — useful for review, useless for someone reading the code later. Reframed in terms of what the codecs exist to do: encode Python-side plan references (pure-Python UDFs, etc.) into the proto wire format so plans can cross process boundaries without the receiver having to pre-register every callable. The wrappers sit at the top of the session's codec stack and delegate non-Python encoding to a composable inner codec. Magic-prefix registry table loses the "reserved" column. Doc still notes that the in-module impls currently delegate and that encoder/decoder hooks land alongside the corresponding Python-side serialization work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
PhysicalExtensionCodec method to inner PythonLogicalCodec previously only overrode the four required methods on the trait plus the scalar UDF pair, so the default trait impls (returning "LogicalExtensionCodec is not provided") shadowed any downstream FFI codec for file formats, aggregate UDFs, and window UDFs. A user installing their own codec via `SessionContext.with_logical_extension_codec(...)` would silently lose access to its `try_*_file_format`, `try_*_udaf`, `try_*_udwf` implementations. Forward every trait method to `inner` so the user-installed codec is fully reachable. Same change on the physical side, including `try_*_expr`, `try_*_udaf`, `try_*_udwf` — the corresponding Python-aware paths can layer on later by intercepting before delegation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
timsaucer
commented
May 14, 2026
Comment on lines
+95
to
+96
| #[allow(dead_code)] | ||
| pub(crate) const PY_SCALAR_UDF_MAGIC: &[u8] = b"DFPYUDF1"; |
Member
Author
There was a problem hiding this comment.
We are allowing dead code so that we have this ready to go as the preferred format. The immediate next PR will use this.
The previous docstrings claimed the tests verify "PythonLogicalCodec delegates non-Python UDFs to the inner codec." That's forward-looking — the codecs currently delegate every UDF unconditionally, so the test would behave identically for Python and non-Python UDFs. Rewrite to describe what the test actually proves: the dispatch chain `PyLogicalPlan.to_bytes -> session.logical_codec -> PythonLogicalCodec -> FFI -> user impl` (and the physical mirror) forwards correctly, observable via the user codec's atomic counter incrementing after one encode pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
MemorySourceConfig Was a one-column `EmptyExec` stub useful only as a capsule-handoff target. Promoted to a minimal reference impl that a downstream Rust crate can copy when exposing a custom `ExecutionPlan` to datafusion-python: configurable `num_rows`, produces a single batch of sequential `Int32` values under column `value`, wrapped in `DataSourceExec` via `MemorySourceConfig::try_new_exec`. Header comment explains the typical use case (remote backend, streaming source, synthetic data generator) and the `__datafusion_execution_plan__` capsule shape downstream crates should follow. Test asserts the schema-bearing plan survives the FFI hop: a `DataSourceExec` arrives with the expected partitioning and no children. Schema details are not surfaced through the FFI display path (only the wrapping `ForeignExecutionPlan` name + inner plan name appear), so the test does not assert the column name. `to_bytes` round-trip of an FFI-imported plan is not exercised: encoding requires a physical codec that knows how to serialize `ForeignExecutionPlan`, which the default codec does not. A downstream user round-tripping such a plan must install their own codec via `with_physical_extension_codec`. Documented in the test file rather than asserted on. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`PyExecutionPlan::from_pycapsule` and the matching `__datafusion_execution_plan__` exporter have no consumer in this repo, on the POC `poc_ffi_query_planner` branch, or on any sibling branch (`testing/datafusion-distributed`, `testing/ffi-library-marker`, `tmp/ffi-with-codecs`). The pair was wired up speculatively for FFI plan handoff that no Python code path actually performs today. Drop the whole capsule round-trip for `ExecutionPlan`: * Rust `PyExecutionPlan::from_pycapsule` and `__datafusion_execution_plan__`. * Python `ExecutionPlan.from_pycapsule` and `__datafusion_execution_plan__` wrappers. * `MyExecutionPlan` FFI fixture + `_test_execution_plan.py` + lib.rs registration. Was solely a test fixture for the dropped path. * `test_execution_plan_pycapsule_protocol` in `python/tests/test_plans.py`. `PyExecutionPlan.to_bytes` / `from_bytes` survive — they encode through the session's physical codec and have real coverage. Capsule round-trip can be re-added when a concrete consumer (distributed worker, bridge library) lands. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors PyLogicalPlan / PyExecutionPlan: encode through the session's installed `LogicalExtensionCodec` (or a default-inner `PythonLogicalCodec` when no `ctx` is supplied), decode against the session's function registry + codec via `parse_expr`. Rust side calls `datafusion_proto::logical_plan::to_proto::serialize_expr` and `from_proto::parse_expr`. Python wrapper threads an optional `SessionContext` through. Tests cover the session-routed roundtrip and the no-ctx default-codec encode path. Adds a third consumer of `session.logical_codec()` alongside `PyLogicalPlan` and the codec dispatch tests in the FFI example, broadening coverage of the codec stack. This is the last piece of the PR1 codec surface — follow-up pickle work (`Expr.__reduce__`, worker-scoped context, multiprocessing) can build on this without bundling the byte-level serialization API. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Member
Author
|
I'm not sure what's wrong with the CI runner, but it's clearly unrelated to this PR |
Member
Author
|
@ntjohnson1 This is a precursor for the pickling approach I'm recommending. I can open the second PR so you can see how it will fit in. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #1181
Rationale for this change
We have support for upstream FFI logical and physical codecs, but we need to make this available in datafusion-python. Also we need to add support for serializing and deserializing Python UDFs.
What changes are included in this PR?
Are there any user-facing changes?
Some methods are deprecated and need to point to the new signatures. Otherwise this is addition only.