Tags: linkedin/iceberg-python
Tags
Fix deepcopy for And, Or, and Not expressions (#79) This PR addresses failures when `copy.deepcopy()` is applied to PyIceberg boolean expression trees under Pydantic v2, particularly for compound expressions (`And`/`Or`/`Not`) with `__new__` signatures requiring arguments, and for `Singleton`-based expressions that must not be reconstructed.
feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent rea… …ds (#44) > Backport of apache/iceberg-python#3046 ## Summary Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables. This PR adds a new `order` parameter to `to_arrow_batch_reader()` with two implementations: - `TaskOrder` (default) — preserves existing behavior: batches grouped by file in task submission order, each file fully materialized before proceeding to the next. - `ArrivalOrder` — yields batches as they are produced across files without materializing entire files into memory. Accepts three sub-parameters: - `concurrent_streams: int` — number of files to read concurrently (default: 8). A per-scan `ThreadPoolExecutor(max_workers=concurrent_streams)` bounds concurrency. - `batch_size: int | None` — number of rows per batch passed to PyArrow's ds.Scanner (default: PyArrow's built-in 131,072). - `max_buffered_batches: int` — size of the bounded queue between producers and consumer (default: 16), providing backpressure to cap memory usage. ## Problem The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer. ## Solution ### Before: OOM on large tables ```python batches = table.scan().to_arrow_batch_reader() ``` ### After: bounded memory, tunable parallelism ```python from pyiceberg.table import ArrivalOrder batches = table.scan().to_arrow_batch_reader( order=ArrivalOrder(concurrent_streams=4, batch_size=10000), ) ``` Default behavior is unchanged — `TaskOrder` preserves the existing executor.map + list() path for backwards compatibility. ## Architecture When `order=ArrivalOrder(...)`, batches flow through `_bounded_concurrent_batches`: 1. All file tasks are submitted to a per-scan `ThreadPoolExecutor(max_workers=concurrent_streams)` 2. Workers push batches into a bounded `Queue(maxsize=max_buffered_batches)` — when full, workers block (backpressure) 3. The consumer yields batches from the queue via blocking `queue.get()` 4. A sentinel value signals completion — no timeout-based polling 5. On early termination (consumer stops), a cancel event is set and the queue is drained until the sentinel to unblock all stuck workers 6. The executor context manager handles deterministic shutdown Refactored `to_record_batches` into helpers: `_prepare_tasks_and_deletes`, `_iter_batches_arrival`, `_iter_batches_materialized`, `_apply_limit`. ## Ordering semantics | Configuration | File ordering | Within-file ordering | |---|---|---| | `TaskOrder()` (default) | Batches grouped by file, in task submission order | Row order | | `ArrivalOrder(concurrent_streams=1)` | Grouped by file, sequential | Row order | | `ArrivalOrder(concurrent_streams>1)` | **Interleaved** (no grouping guarantee) | Row order within each file | ## Benchmark results 32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default): | Config | Throughput (rows/s) | TTFR (ms) | Peak Arrow Memory | |---|---|---|---| | default (TaskOrder) | 190,250,192 | 73.4 | 642.2 MB | | ArrivalOrder(cs=1) | 59,317,085 | 27.7 | 10.3 MB | | ArrivalOrder(cs=2) | 105,414,909 | 28.8 | 42.0 MB | | ArrivalOrder(cs=4) | 175,840,782 | 28.4 | 105.5 MB | | ArrivalOrder(cs=8) | 211,922,538 | 32.3 | 271.7 MB | | ArrivalOrder(cs=16) | 209,011,424 | 45.0 | 473.3 MB | *TTFR = Time to First Record, cs = concurrent_streams* ## Are these changes tested? Yes. 25 new unit tests across two test files, plus a micro-benchmark. ## Are there any user-facing changes? Yes. New `order` parameter on `DataScan.to_arrow_batch_reader()`: - `order: ScanOrder | None` — controls batch ordering. Pass `TaskOrder()` (default) or `ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M)`. New public classes `TaskOrder` and `ArrivalOrder` (subclasses of `ScanOrder`) exported from `pyiceberg.table`. All parameters are optional with backwards-compatible defaults. Existing code is unaffected. Documentation updated in `mkdocs/docs/api.md` with usage examples, ordering semantics, and configuration guidance table.
Add JFrog publish workflow for pyiceberg (#45) # Rationale for this change Publish the pyiceberg package to the same JFrog Artifactory PyPI repository (`openhouse-pypi`) used by the OpenHouse data loader. The package is renamed from `pyiceberg` to `li-pyiceberg` to distinguish it from the upstream Apache release and follow LinkedIn's `li*` naming convention. The Python import (`import pyiceberg`) is unchanged. ## How it works 1. **tag** — reads the version from `pyproject.toml` as the initial version, then auto-increments the patch via git tags on each push to `li-0.11` (e.g. `v0.11.1`, `v0.11.2`, ...) 2. **build** — reuses `pypi-build-artifacts.yml` to build sdist and multi-platform wheels (Linux, Linux ARM, Windows, macOS Intel, macOS ARM) across Python 3.10–3.13 via cibuildwheel. Each wheel is validated by running `pytest tests/avro/test_decoder.py`. 3. **publish** — downloads all build artifacts and publishes to JFrog using `pypa/gh-action-pypi-publish`. Jobs are sequenced via `needs:` so publish waits for all matrix builds to complete. This does not publish to PyPI or TestPyPI. The existing upstream workflows (`nightly-pypi-build.yml`, `python-release.yml`) are gated by `github.repository == 'apache/iceberg-python'` and will not run in this fork. Requires `JFROG_USERNAME` and `JFROG_PYPI_API_TOKEN` secrets to be configured in this repo. ## Are these changes tested? Will be validated on the first push to `li-0.11` after secrets are configured. ## Are there any user-facing changes? No.