Skip to content

Tags: linkedin/iceberg-python

Tags

v0.11.5

Toggle v0.11.5's commit message

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Fix deepcopy for And, Or, and Not expressions (#79)

This PR addresses failures when `copy.deepcopy()` is applied to PyIceberg boolean expression trees under Pydantic v2, particularly for compound expressions (`And`/`Or`/`Not`) with `__new__` signatures requiring arguments, and for `Singleton`-based expressions that must not be reconstructed.

v0.11.4

Toggle v0.11.4's commit message

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Merge pull request #53 from sumedhsakdeo/ssakdeo/fix-ci-flakes

fix: Fix flaky test and pin REST catalog image to unblock CI

test only fixes.

v0.11.3

Toggle v0.11.3's commit message

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent rea…

…ds (#44)

> Backport of apache/iceberg-python#3046

## Summary

  Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.

  This PR adds a new `order` parameter to `to_arrow_batch_reader()` with two implementations:

  - `TaskOrder` (default) — preserves existing behavior: batches grouped by file in task submission order, each file fully materialized before proceeding to the next.
  - `ArrivalOrder` — yields batches as they are produced across files without materializing entire files into memory. Accepts three sub-parameters:
    - `concurrent_streams: int` — number of files to read concurrently (default: 8). A per-scan `ThreadPoolExecutor(max_workers=concurrent_streams)` bounds concurrency.
    - `batch_size: int | None` — number of rows per batch passed to PyArrow's ds.Scanner (default: PyArrow's built-in 131,072).
    - `max_buffered_batches: int` — size of the bounded queue between producers and consumer (default: 16), providing backpressure to cap memory usage.

## Problem

  The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer.

## Solution

  ### Before: OOM on large tables
  ```python
  batches = table.scan().to_arrow_batch_reader()
  ```

  ### After: bounded memory, tunable parallelism
  ```python
  from pyiceberg.table import ArrivalOrder

  batches = table.scan().to_arrow_batch_reader(
      order=ArrivalOrder(concurrent_streams=4, batch_size=10000),
  )
  ```

  Default behavior is unchanged — `TaskOrder` preserves the existing executor.map + list() path for backwards compatibility.

## Architecture

  When `order=ArrivalOrder(...)`, batches flow through `_bounded_concurrent_batches`:

  1. All file tasks are submitted to a per-scan `ThreadPoolExecutor(max_workers=concurrent_streams)`
  2. Workers push batches into a bounded `Queue(maxsize=max_buffered_batches)` — when full, workers block (backpressure)
  3. The consumer yields batches from the queue via blocking `queue.get()`
  4. A sentinel value signals completion — no timeout-based polling
  5. On early termination (consumer stops), a cancel event is set and the queue is drained until the sentinel to unblock all stuck workers
  6. The executor context manager handles deterministic shutdown

  Refactored `to_record_batches` into helpers: `_prepare_tasks_and_deletes`, `_iter_batches_arrival`, `_iter_batches_materialized`, `_apply_limit`.

## Ordering semantics

  | Configuration | File ordering | Within-file ordering |
  |---|---|---|
  | `TaskOrder()` (default) | Batches grouped by file, in task submission order | Row order |
  | `ArrivalOrder(concurrent_streams=1)` | Grouped by file, sequential | Row order |
  | `ArrivalOrder(concurrent_streams>1)` | **Interleaved** (no grouping guarantee) | Row order within each file |

## Benchmark results

 32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):

  | Config | Throughput (rows/s) | TTFR (ms) | Peak Arrow Memory |
  |---|---|---|---|
  | default (TaskOrder) | 190,250,192 | 73.4 | 642.2 MB |
  | ArrivalOrder(cs=1) | 59,317,085 | 27.7 | 10.3 MB |
  | ArrivalOrder(cs=2) | 105,414,909 | 28.8 | 42.0 MB |
  | ArrivalOrder(cs=4) | 175,840,782 | 28.4 | 105.5 MB |
  | ArrivalOrder(cs=8) | 211,922,538 | 32.3 | 271.7 MB |
  | ArrivalOrder(cs=16) | 209,011,424 | 45.0 | 473.3 MB |

  *TTFR = Time to First Record, cs = concurrent_streams*

## Are these changes tested?

Yes. 25 new unit tests across two test files, plus a micro-benchmark.

## Are there any user-facing changes?

 Yes. New `order` parameter on `DataScan.to_arrow_batch_reader()`:

  - `order: ScanOrder | None` — controls batch ordering. Pass `TaskOrder()` (default) or `ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M)`.

  New public classes `TaskOrder` and `ArrivalOrder` (subclasses of `ScanOrder`) exported from `pyiceberg.table`.

  All parameters are optional with backwards-compatible defaults. Existing code is unaffected.

  Documentation updated in `mkdocs/docs/api.md` with usage examples, ordering semantics, and configuration guidance table.

v0.11.2

Toggle v0.11.2's commit message

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Add JFrog publish workflow for pyiceberg (#45)

# Rationale for this change

Publish the pyiceberg package to the same JFrog Artifactory PyPI repository (`openhouse-pypi`) used by the OpenHouse data loader.

The package is renamed from `pyiceberg` to `li-pyiceberg` to distinguish it from the upstream Apache release and follow LinkedIn's `li*` naming convention. The Python import (`import pyiceberg`) is unchanged.

## How it works

1. **tag** — reads the version from `pyproject.toml` as the initial version, then auto-increments the patch via git tags on each push to `li-0.11` (e.g. `v0.11.1`, `v0.11.2`, ...)
2. **build** — reuses `pypi-build-artifacts.yml` to build sdist and multi-platform wheels (Linux, Linux ARM, Windows, macOS Intel, macOS ARM) across Python 3.10–3.13 via cibuildwheel. Each wheel is validated by running `pytest tests/avro/test_decoder.py`.
3. **publish** — downloads all build artifacts and publishes to JFrog using `pypa/gh-action-pypi-publish`. Jobs are sequenced via `needs:` so publish waits for all matrix builds to complete.

This does not publish to PyPI or TestPyPI. The existing upstream workflows (`nightly-pypi-build.yml`, `python-release.yml`) are gated by `github.repository == 'apache/iceberg-python'` and will not run in this fork.

Requires `JFROG_USERNAME` and `JFROG_PYPI_API_TOKEN` secrets to be configured in this repo.

## Are these changes tested?

Will be validated on the first push to `li-0.11` after secrets are configured.

## Are there any user-facing changes?

No.