Skip to content

API Reference

This page is split in two parts:

  • a short handwritten guide to the public surface
  • generated API reference from mkdocstrings

If the generated reference feels terse, read Architecture first, then Storage & Runs. The generated section reflects the code docstrings, so it is best for signatures and symbol discovery rather than architectural explanation.

Start here

Most users only need a small subset of the package:

  • BatchItem: input item wrapper
  • BatchJob: execution definition
  • BatchRunner: start, resume, and run orchestration
  • Run: refresh, wait, inspect, export, and prune
  • OpenAIProviderConfig: built-in provider config
  • SQLiteStorage and PostgresStorage: durable control-plane backends
  • CompositeItemSource, CsvItemSource, JsonlItemSource, and ParquetItemSource: deterministic item streaming

Recent additions worth checking in the generated reference:

  • ArtifactPolicy
  • RunControlState
  • Run.pause(), Run.resume(), Run.cancel()
  • Run.read_terminal_results(...)
  • Run.export_terminal_results(...)
  • CheckpointedItemSource
  • CompositeItemSource
  • ParquetItemSource

These APIs are currently library-first. The CLI still focuses on CSV/JSONL operator workflows and does not yet expose pause/resume/cancel or incremental terminal-result commands.

Public package

Use imports from batchor first. That is the intended consumer surface.

batchor

Batchor: durable OpenAI Batch runner with typed Pydantic results.

This package provides a library-first API for running OpenAI Batch jobs durably, with SQLite-backed state, resumable item sources, replayable request artifacts, and structured Pydantic outputs.

Typical usage::

import batchor

runner = batchor.BatchRunner()
job = batchor.BatchJob(
    items=[batchor.BatchItem(item_id="1", payload="hello")],
    build_prompt=lambda item: batchor.PromptParts(prompt=item.payload),
    provider_config=batchor.OpenAIProviderConfig(model="gpt-4.1"),
)
run = runner.run_and_wait(job)
for result in run.results():
    print(result.output_text)

ArtifactStore

Bases: ABC

Abstract interface for reading, writing, and deleting run artifacts.

All paths are relative keys — the store implementation is responsible for mapping them to a concrete location (e.g. a local filesystem subtree or an object-storage bucket).

write_text(key, content, *, encoding='utf-8') abstractmethod

Write text content to the artifact store under the given key.

PARAMETER DESCRIPTION
key

Relative path identifying the artifact.

TYPE: str

content

Text content to write.

TYPE: str

encoding

Text encoding. Defaults to "utf-8".

TYPE: str DEFAULT: 'utf-8'

read_text(key, *, encoding='utf-8') abstractmethod

Read text content for the given key.

PARAMETER DESCRIPTION
key

Relative path identifying the artifact.

TYPE: str

encoding

Text encoding. Defaults to "utf-8".

TYPE: str DEFAULT: 'utf-8'

RETURNS DESCRIPTION
str

The stored text content.

delete(key) abstractmethod

Delete the artifact at the given key.

PARAMETER DESCRIPTION
key

Relative path identifying the artifact.

TYPE: str

RETURNS DESCRIPTION
bool

True if the artifact existed and was deleted; False if it

bool

was not found.

stage_local_copy(key) abstractmethod

Return a context manager that exposes a local filesystem path.

Inside the context, the returned :class:~pathlib.Path is guaranteed to point to a readable copy of the artifact. The caller must not modify the file.

PARAMETER DESCRIPTION
key

Relative path identifying the artifact.

TYPE: str

RETURNS DESCRIPTION
AbstractContextManager[Path]

A context manager yielding a :class:~pathlib.Path to a local

AbstractContextManager[Path]

copy of the artifact.

export_to_directory(key, destination_root) abstractmethod

Copy an artifact to a local directory, mirroring the key structure.

PARAMETER DESCRIPTION
key

Relative path identifying the artifact.

TYPE: str

destination_root

Target directory. The artifact is written to destination_root / key, creating parent directories as needed.

TYPE: str | Path

RETURNS DESCRIPTION
Path

The absolute :class:~pathlib.Path where the artifact was written.

LocalArtifactStore(root)

Bases: ArtifactStore

File-backed artifact store rooted at a single directory.

The store is created with restrictive permissions (0o700) on first use. Key validation prevents path traversal: keys must be relative and must not contain .. components.

ATTRIBUTE DESCRIPTION
root

Absolute :class:~pathlib.Path to the root directory.

Initialise the store, creating the root directory if necessary.

PARAMETER DESCRIPTION
root

Path to the root directory. ~ is expanded and the path is resolved to an absolute form.

TYPE: str | Path

write_text(key, content, *, encoding='utf-8')

Write text content to a file at root / key.

Parent directories are created automatically.

PARAMETER DESCRIPTION
key

Relative artifact key (must not be absolute or contain ..).

TYPE: str

content

Text to write.

TYPE: str

encoding

File encoding. Defaults to "utf-8".

TYPE: str DEFAULT: 'utf-8'

RAISES DESCRIPTION
ValueError

If key is absolute, empty, or traverses outside the root.

read_text(key, *, encoding='utf-8')

Read text content from root / key.

PARAMETER DESCRIPTION
key

Relative artifact key.

TYPE: str

encoding

File encoding. Defaults to "utf-8".

TYPE: str DEFAULT: 'utf-8'

RETURNS DESCRIPTION
str

The file's text contents.

RAISES DESCRIPTION
ValueError

If key is invalid.

FileNotFoundError

If the artifact does not exist.

delete(key)

Delete the artifact file at root / key.

Empty parent directories up to (but not including) the root are removed after deletion.

PARAMETER DESCRIPTION
key

Relative artifact key.

TYPE: str

RETURNS DESCRIPTION
bool

True if the file was found and deleted; False if not found.

RAISES DESCRIPTION
IsADirectoryError

If the resolved path is a directory.

ValueError

If key is invalid.

stage_local_copy(key)

Return a context manager yielding the resolved local path directly.

Because this is a local store, no copy is needed; the artifact's actual path is returned inside the context.

PARAMETER DESCRIPTION
key

Relative artifact key.

TYPE: str

RETURNS DESCRIPTION
AbstractContextManager[Path]

A context manager that yields the local :class:~pathlib.Path.

export_to_directory(key, destination_root)

Copy an artifact to destination_root / key.

PARAMETER DESCRIPTION
key

Relative artifact key identifying the source file.

TYPE: str

destination_root

Target directory; the artifact is written to destination_root / key with parent directories created as needed.

TYPE: str | Path

RETURNS DESCRIPTION
Path

Absolute path to the copied file.

resolve_path(key)

Resolve an artifact key to an absolute filesystem path.

PARAMETER DESCRIPTION
key

Relative artifact key.

TYPE: str

RETURNS DESCRIPTION
Absolute

class:~pathlib.Path under the store root.

TYPE: Path

RAISES DESCRIPTION
ValueError

If key is invalid.

ItemStatus

Bases: StrEnum

Lifecycle status for a single batch item.

ATTRIBUTE DESCRIPTION
PENDING

Item is waiting to be claimed for submission.

QUEUED_LOCAL

Item has been claimed locally but not yet submitted to the provider.

SUBMITTED

Item has been included in an active provider batch.

COMPLETED

Item reached a terminal success state.

FAILED_RETRYABLE

Item failed on this attempt but will be retried.

FAILED_PERMANENT

Item exceeded the maximum retry limit and will not be retried.

OpenAIEndpoint

Bases: StrEnum

OpenAI API endpoint path used in Batch request lines.

ATTRIBUTE DESCRIPTION
RESPONSES

The /v1/responses endpoint (default).

CHAT_COMPLETIONS

The /v1/chat/completions endpoint.

OpenAIModel

Bases: StrEnum

Known OpenAI model identifiers.

Use these constants or pass a plain string to :attr:~batchor.OpenAIProviderConfig.model for forward-compatibility with models released after this version.

ATTRIBUTE DESCRIPTION
GPT_5_2

gpt-5.2

GPT_5_1

gpt-5.1

GPT_5

gpt-5

GPT_5_MINI

gpt-5-mini

GPT_5_NANO

gpt-5-nano

GPT_4_1

gpt-4.1

GPT_4_1_MINI

gpt-4.1-mini

GPT_4_1_NANO

gpt-4.1-nano

OpenAIReasoningEffort

Bases: StrEnum

Reasoning effort level passed to supporting OpenAI models.

ATTRIBUTE DESCRIPTION
NONE

Disable extended reasoning.

MINIMAL

Minimal reasoning tokens.

LOW

Low reasoning effort.

MEDIUM

Medium reasoning effort (balanced).

HIGH

High reasoning effort.

XHIGH

Maximum reasoning effort.

ProviderKind

Bases: StrEnum

Stable identifier for a batch execution provider.

ATTRIBUTE DESCRIPTION
OPENAI

The built-in OpenAI Batch API provider.

RunControlState

Bases: StrEnum

Operator-controlled state used to pause, resume, or cancel a run.

ATTRIBUTE DESCRIPTION
RUNNING

Normal execution — the runner processes items.

PAUSED

Execution is suspended; refresh() returns immediately without polling or submitting.

CANCEL_REQUESTED

Cancellation has been requested; the runner drains in-flight batches and marks remaining items as cancelled.

RunLifecycleStatus

Bases: StrEnum

Terminal or active lifecycle status for a batch run.

ATTRIBUTE DESCRIPTION
RUNNING

Run is active and processing items.

COMPLETED

All items completed successfully.

COMPLETED_WITH_FAILURES

Run finished but at least one item reached FAILED_PERMANENT status.

StorageKind

Bases: StrEnum

Identifier for a state-store backend.

ATTRIBUTE DESCRIPTION
SQLITE

File-backed SQLite store (default, zero-dependency).

POSTGRES

Shared PostgreSQL control-plane store.

MEMORY

Ephemeral in-memory store (testing / single-process use).

ModelResolutionError(module_name, qualname)

Bases: RuntimeError

Raised when a structured-output model class cannot be re-imported.

This happens when a run is resumed in a process that does not have the same Python environment or import path as the process that created it.

ATTRIBUTE DESCRIPTION
module_name

The __module__ of the model class.

qualname

The __qualname__ of the model class.

Initialise the error with the unresolvable model location.

PARAMETER DESCRIPTION
module_name

Module path stored for the structured output class.

TYPE: str

qualname

Qualified name stored for the structured output class.

TYPE: str

RunNotFinishedError(run_id)

Bases: RuntimeError

Raised when a terminal-only operation is called on a still-running run.

ATTRIBUTE DESCRIPTION
run_id

The identifier of the run that is not yet finished.

Initialise the error with the run identifier.

PARAMETER DESCRIPTION
run_id

Identifier of the run that is not yet in a terminal state.

TYPE: str

RunPausedError(run_id)

Bases: RuntimeError

Raised when :meth:~batchor.Run.wait encounters a paused run.

ATTRIBUTE DESCRIPTION
run_id

The identifier of the paused run.

Initialise the error with the paused run's identifier.

PARAMETER DESCRIPTION
run_id

Identifier of the run that is in a paused control state.

TYPE: str

StructuredOutputSchemaError

Bases: ValueError

Raised when a Pydantic model produces a schema incompatible with OpenAI.

OpenAI structured output requires strict JSON Schema objects with no anyOf at the root and additionalProperties: false on every object type. This error is raised during job construction so invalid schemas are caught before any API calls are made.

ArtifactExportResult(run_id, destination_dir, manifest_path, results_path, exported_artifact_paths) dataclass

Result returned after exporting retained artifacts for a terminal run.

ATTRIBUTE DESCRIPTION
run_id

The run whose artifacts were exported.

TYPE: str

destination_dir

Absolute path to the export root directory (<destination>/<run_id>/).

TYPE: str

manifest_path

Absolute path to the generated manifest.json file.

TYPE: str

results_path

Absolute path to the generated results.jsonl file.

TYPE: str

exported_artifact_paths

Relative artifact paths that were copied.

TYPE: list[str]

ArtifactPolicy(persist_raw_output_artifacts=True) dataclass

Controls which provider artifacts are retained after a batch completes.

ATTRIBUTE DESCRIPTION
persist_raw_output_artifacts

When True (default), the raw provider output and error JSONL files are written to the artifact store and their paths recorded in state. Set to False to skip retention and reduce disk usage.

TYPE: bool

to_payload()

Serialise the policy to a JSON-compatible dictionary.

RETURNS DESCRIPTION
JSONObject

A JSONObject suitable for durable storage.

from_payload(payload) classmethod

Deserialise a previously persisted policy payload.

PARAMETER DESCRIPTION
payload

A JSONObject produced by :meth:to_payload.

TYPE: JSONObject

RETURNS DESCRIPTION
ArtifactPolicy

A reconstructed :class:ArtifactPolicy instance.

RAISES DESCRIPTION
TypeError

If persist_raw_output_artifacts is not a bool.

ArtifactPruneResult(run_id, removed_artifact_paths, missing_artifact_paths, cleared_item_pointers, cleared_batch_pointers=0) dataclass

Result returned after pruning retained artifacts for a terminal run.

ATTRIBUTE DESCRIPTION
run_id

The run whose artifacts were pruned.

TYPE: str

removed_artifact_paths

Relative paths of files successfully deleted.

TYPE: list[str]

missing_artifact_paths

Relative paths that were recorded in state but not found on disk (already deleted or never written).

TYPE: list[str]

cleared_item_pointers

Number of per-item artifact pointer records cleared in state.

TYPE: int

cleared_batch_pointers

Number of per-batch artifact pointer records cleared in state (only non-zero when raw output artifacts are pruned).

TYPE: int

BatchItem(item_id, payload, metadata=dict()) dataclass

Bases: Generic[PayloadT]

One logical unit of work inside a batch run.

ATTRIBUTE DESCRIPTION
item_id

Caller-assigned identifier, unique within the run. Used as the basis for the custom_id in provider request lines and for correlating results back to items.

TYPE: str

payload

Arbitrary caller-defined data passed to build_prompt.

TYPE: PayloadT

metadata

Optional key-value pairs stored alongside the item in state. The batchor_lineage key is reserved for source tracing when items originate from a file-backed :class:~batchor.ItemSource.

TYPE: JSONObject

BatchJob(items, build_prompt, provider_config, structured_output=None, schema_name=None, chunk_policy=ChunkPolicy(), retry_policy=RetryPolicy(), batch_metadata=dict(), artifact_policy=ArtifactPolicy()) dataclass

Bases: Generic[PayloadT, ModelT]

Declarative description of a batch run.

Passed to :meth:~batchor.BatchRunner.start or :meth:~batchor.BatchRunner.run_and_wait to create or resume a durable run.

ATTRIBUTE DESCRIPTION
items

An iterable of :class:BatchItem objects or an :class:~batchor.ItemSource that streams them.

TYPE: BatchItems[PayloadT]

build_prompt

Callable that converts a :class:BatchItem to a :class:PromptParts (or a plain string for simple cases).

TYPE: PromptBuilder[PayloadT]

provider_config

Provider-specific configuration, e.g. :class:OpenAIProviderConfig.

TYPE: ProviderConfig

structured_output

Optional Pydantic model class used to parse and validate each item's response as structured JSON.

TYPE: type[ModelT] | None

schema_name

Optional override for the JSON schema name sent to the provider. Defaults to a snake_case version of the model class name.

TYPE: str | None

chunk_policy

Controls how pending items are split into provider batch files.

TYPE: ChunkPolicy

retry_policy

Controls retry behaviour for transient failures.

TYPE: RetryPolicy

batch_metadata

Arbitrary string key-value pairs attached to every provider batch created for this run.

TYPE: dict[str, str]

artifact_policy

Controls which raw provider artifacts are retained.

TYPE: ArtifactPolicy

ChunkPolicy(max_requests=50000, max_file_bytes=150 * 1024 * 1024, chars_per_token=4) dataclass

Submission chunking limits applied before provider batches are created.

A batch of pending items is split into one or more provider batches such that each chunk respects all three limits simultaneously.

ATTRIBUTE DESCRIPTION
max_requests

Maximum number of request lines per provider batch file. Defaults to 50_000 (OpenAI's maximum).

TYPE: int

max_file_bytes

Maximum size in bytes of a single batch input file. Defaults to 150 MiB (OpenAI's maximum).

TYPE: int

chars_per_token

Fallback characters-per-token ratio used for token estimation when tiktoken is unavailable.

TYPE: int

ItemFailure(error_class, message, retryable, raw_error=None) dataclass

Structured failure payload attached to a terminal item result.

ATTRIBUTE DESCRIPTION
error_class

Short machine-readable failure category, e.g. "provider_item_error" or "structured_output_validation_failed".

TYPE: str

message

Human-readable description of the failure.

TYPE: str

retryable

True if the failure consumed an attempt and the item will be retried; False if the item is immediately permanent.

TYPE: bool

raw_error

Optional raw error payload from the provider or parser, preserved verbatim for debugging.

TYPE: JSONValue | None

OpenAIEnqueueLimitConfig(enqueued_token_limit=0, target_ratio=0.7, headroom=0, max_batch_enqueued_tokens=0) dataclass

OpenAI-specific token budgeting controls for batch submission.

When enqueued_token_limit is non-zero, the runner estimates the token footprint of each submitted batch and refuses to enqueue more tokens than the effective budget allows. This prevents hitting the OpenAI enqueued-token-limit API error.

ATTRIBUTE DESCRIPTION
enqueued_token_limit

Hard cap (in tokens) on the OpenAI account's enqueue limit. 0 disables token-budget enforcement.

TYPE: int

target_ratio

Fraction of enqueued_token_limit to use as the effective inflight budget. Defaults to 0.7 (70 %).

TYPE: float

headroom

Absolute token buffer subtracted from the limit before computing the effective budget. Defaults to 0.

TYPE: int

max_batch_enqueued_tokens

Optional per-batch token ceiling. 0 means no per-batch limit beyond the inflight budget.

TYPE: int

to_payload()

Serialise the config to a JSON-compatible dictionary.

RETURNS DESCRIPTION
JSONObject

A JSONObject suitable for durable storage.

from_payload(payload) classmethod

Deserialise a previously persisted config payload.

PARAMETER DESCRIPTION
payload

A JSONObject produced by :meth:to_payload.

TYPE: JSONObject

RETURNS DESCRIPTION
OpenAIEnqueueLimitConfig

A reconstructed :class:OpenAIEnqueueLimitConfig instance.

RAISES DESCRIPTION
TypeError

If any field has the wrong type in payload.

OpenAIProviderConfig(model, api_key='', endpoint=OpenAIEndpoint.RESPONSES, completion_window='24h', request_timeout_sec=30, poll_interval_sec=1.0, reasoning_effort=None, enqueue_limits=OpenAIEnqueueLimitConfig()) dataclass

Bases: ProviderConfig

Configuration for the built-in OpenAI Batch provider.

ATTRIBUTE DESCRIPTION
model

OpenAI model name (e.g. "gpt-4.1" or an :class:~batchor.OpenAIModel constant).

TYPE: OpenAIModelName

api_key

OpenAI API key. When empty the runner falls back to the OPENAI_API_KEY environment variable.

TYPE: str

endpoint

API endpoint path to use for batch requests.

TYPE: OpenAIEndpoint

completion_window

Maximum time the OpenAI batch is allowed to run, e.g. "24h".

TYPE: str

request_timeout_sec

Timeout in seconds for individual API calls.

TYPE: int

poll_interval_sec

Seconds to sleep between polling cycles when :meth:~batchor.Run.wait is used without a custom interval.

TYPE: float

reasoning_effort

Reasoning effort level for supporting models. None omits the field from the request.

TYPE: OpenAIReasoningLevel | None

enqueue_limits

Token-budget configuration that constrains how many tokens may be enqueued at once.

TYPE: OpenAIEnqueueLimitConfig

to_public_payload()

Serialise the config without secret material.

RETURNS DESCRIPTION
JSONObject

A JSONObject identical to :meth:to_payload but with

JSONObject

api_key removed, safe to persist or log.

from_payload(payload) classmethod

Deserialise a previously persisted provider config payload.

PARAMETER DESCRIPTION
payload

A JSONObject produced by :meth:to_payload or :meth:to_public_payload.

TYPE: JSONObject

RETURNS DESCRIPTION
OpenAIProviderConfig

A reconstructed :class:OpenAIProviderConfig instance.

RAISES DESCRIPTION
TypeError

If any field has the wrong type in payload.

PromptParts(prompt, system_prompt=None) dataclass

Prompt text sent to the provider, with an optional system prompt.

ATTRIBUTE DESCRIPTION
prompt

The user-turn or primary input text.

TYPE: str

system_prompt

Optional system/instructions text. Mapped to instructions for the Responses endpoint and to a system message for the Chat Completions endpoint.

TYPE: str | None

RetryPolicy(max_attempts=3, base_backoff_sec=1.0, max_backoff_sec=300.0) dataclass

Retry limits for item-level and batch-control-plane recovery.

Applies to both individual item failures (retryable provider errors) and transient batch control-plane failures (rate limits, timeouts). Backoff uses exponential doubling capped at max_backoff_sec.

ATTRIBUTE DESCRIPTION
max_attempts

Total attempt budget per item before marking it FAILED_PERMANENT. Defaults to 3.

TYPE: int

base_backoff_sec

Starting backoff delay in seconds. Subsequent failures double this up to max_backoff_sec.

TYPE: float

max_backoff_sec

Ceiling on the computed backoff delay in seconds.

TYPE: float

RunEvent(event_type, run_id, provider_kind=None, data=dict()) dataclass

Observer event emitted by the runner during lifecycle transitions.

Passed to the observer callable supplied to :class:~batchor.BatchRunner on each notable state change.

ATTRIBUTE DESCRIPTION
event_type

Short identifier for the event kind, e.g. "batch_submitted", "items_completed", "run_resumed".

TYPE: str

run_id

The run that produced this event.

TYPE: str

provider_kind

Provider that triggered the event, or None for storage-only events.

TYPE: ProviderKind | None

data

Optional extra fields specific to the event type.

TYPE: JSONObject

RunSnapshot(run_id, status, control_state, total_items, completed_items, failed_items, status_counts, active_batches, backoff_remaining_sec, items) dataclass

Expanded durable run state including current terminal item payloads.

A superset of :class:RunSummary that additionally includes the list of terminal item results available at query time. Returned by :meth:~batchor.Run.snapshot.

ATTRIBUTE DESCRIPTION
run_id

Stable run identifier.

TYPE: str

status

Current lifecycle status of the run.

TYPE: RunLifecycleStatus

control_state

Operator control state.

TYPE: RunControlState

total_items

Total number of items registered for this run.

TYPE: int

completed_items

Number of items in COMPLETED status.

TYPE: int

failed_items

Number of items in FAILED_PERMANENT status.

TYPE: int

status_counts

Full per-status item count breakdown.

TYPE: dict[ItemStatus, int]

active_batches

Number of provider batches currently in-flight.

TYPE: int

backoff_remaining_sec

Seconds until the next submission is permitted.

TYPE: float

items

All terminal item results available at query time.

TYPE: list[BatchResultItem]

RunSummary(run_id, status, control_state, total_items, completed_items, failed_items, status_counts, active_batches, backoff_remaining_sec) dataclass

Aggregated durable run state without item payload expansion.

Returned by :meth:~batchor.Run.summary and :meth:~batchor.Run.refresh. Contains counters and lifecycle flags but not individual item results.

ATTRIBUTE DESCRIPTION
run_id

Stable run identifier.

TYPE: str

status

Current lifecycle status of the run.

TYPE: RunLifecycleStatus

control_state

Operator control state (running / paused / cancelling).

TYPE: RunControlState

total_items

Total number of items registered for this run.

TYPE: int

completed_items

Number of items in COMPLETED status.

TYPE: int

failed_items

Number of items in FAILED_PERMANENT status.

TYPE: int

status_counts

Full per-status item count breakdown.

TYPE: dict[ItemStatus, int]

active_batches

Number of provider batches currently in-flight.

TYPE: int

backoff_remaining_sec

Seconds until the next submission attempt is permitted (0.0 when not in backoff).

TYPE: float

StructuredItemResult(item_id, status, attempt_count, output=None, output_text=None, raw_response=None, error=None, metadata=dict()) dataclass

Bases: Generic[ModelT]

Terminal result for one structured-output item.

ATTRIBUTE DESCRIPTION
item_id

Identifier matching the originating :class:BatchItem.

TYPE: str

status

Final item status (COMPLETED or FAILED_PERMANENT).

TYPE: ItemStatus

attempt_count

Number of provider attempts consumed.

TYPE: int

output

Validated Pydantic model instance, or None on failure.

TYPE: ModelT | None

output_text

Raw text from the provider response before parsing.

TYPE: str | None

raw_response

Full provider response record for debugging.

TYPE: JSONObject | None

error

Populated when the item reached FAILED_PERMANENT.

TYPE: ItemFailure | None

metadata

Item metadata carried through from the source.

TYPE: JSONObject

TerminalResultsExportResult(run_id, destination_path, exported_count, next_after_sequence) dataclass

Result returned after exporting terminal results to a JSONL file.

ATTRIBUTE DESCRIPTION
run_id

The run whose results were exported.

TYPE: str

destination_path

Absolute path to the JSONL file written.

TYPE: str

exported_count

Number of result records written in this call.

TYPE: int

next_after_sequence

Cursor value for continuing an incremental export.

TYPE: int

TerminalResultsPage(run_id, items, next_after_sequence) dataclass

A page of terminal item results for cursor-based streaming.

ATTRIBUTE DESCRIPTION
run_id

The run these results belong to.

TYPE: str

items

Terminal item results in this page.

TYPE: list[BatchResultItem]

next_after_sequence

Opaque cursor to pass as after_sequence in the next call to retrieve the following page.

TYPE: int

TextItemResult(item_id, status, attempt_count, output_text=None, raw_response=None, error=None, metadata=dict()) dataclass

Terminal result for one text-output item.

ATTRIBUTE DESCRIPTION
item_id

Identifier matching the originating :class:BatchItem.

TYPE: str

status

Final item status (COMPLETED or FAILED_PERMANENT).

TYPE: ItemStatus

attempt_count

Number of provider attempts consumed.

TYPE: int

output_text

Extracted text from the provider response, or None on failure.

TYPE: str | None

raw_response

Full provider response record for debugging.

TYPE: JSONObject | None

error

Populated when the item reached FAILED_PERMANENT.

TYPE: ItemFailure | None

metadata

Item metadata carried through from the source.

TYPE: JSONObject

BatchProvider

Bases: ABC

Abstract adapter between the batchor runtime and a batch API provider.

Each method maps to one step of the batch submission/polling lifecycle. The :class:~batchor.OpenAIBatchProvider is the built-in implementation.

build_request_line(*, custom_id, prompt_parts, structured_output=None) abstractmethod

Build one JSONL request line for a batch input file.

PARAMETER DESCRIPTION
custom_id

Unique identifier for this request within the batch.

TYPE: str

prompt_parts

Prompt text and optional system prompt.

TYPE: PromptParts

structured_output

Optional structured output schema to embed.

TYPE: StructuredOutputSchema | None DEFAULT: None

RETURNS DESCRIPTION
A

class:~batchor.core.types.BatchRequestLine dict ready to be

TYPE: BatchRequestLine

BatchRequestLine

serialised as a JSONL line.

upload_input_file(input_path) abstractmethod

Upload a prepared JSONL input file to the provider.

PARAMETER DESCRIPTION
input_path

Local path to the JSONL batch input file.

TYPE: str | Path

RETURNS DESCRIPTION
str

The provider-assigned file identifier.

delete_input_file(file_id) abstractmethod

Best-effort deletion of an uploaded input file.

Implementations should swallow errors — this is a clean-up path.

PARAMETER DESCRIPTION
file_id

Provider-assigned file identifier to delete.

TYPE: str

create_batch(*, input_file_id, metadata=None) abstractmethod

Create a remote batch from a previously uploaded input file.

PARAMETER DESCRIPTION
input_file_id

Provider file identifier for the batch input.

TYPE: str

metadata

Optional key-value metadata attached to the batch.

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
A

class:~batchor.core.types.BatchRemoteRecord reflecting the

TYPE: BatchRemoteRecord

BatchRemoteRecord

initial remote batch state.

get_batch(batch_id) abstractmethod

Fetch the current remote state for one batch.

PARAMETER DESCRIPTION
batch_id

Provider-assigned batch identifier.

TYPE: str

RETURNS DESCRIPTION
A

class:~batchor.core.types.BatchRemoteRecord with the current

TYPE: BatchRemoteRecord

BatchRemoteRecord

status and file identifiers.

download_file_content(file_id) abstractmethod

Download a provider-hosted file and return its text content.

PARAMETER DESCRIPTION
file_id

Provider-assigned file identifier.

TYPE: str

RETURNS DESCRIPTION
str

The file's text content (typically JSONL).

parse_batch_output(*, output_content, error_content) abstractmethod

Split raw batch output/error JSONL into success and error maps.

PARAMETER DESCRIPTION
output_content

Raw text of the provider output JSONL file, or None if unavailable.

TYPE: str | None

error_content

Raw text of the provider error JSONL file, or None if unavailable.

TYPE: str | None

RETURNS DESCRIPTION
dict[str, JSONObject]

A 3-tuple (successes, errors, all_records) where successes

dict[str, JSONObject]

and errors are dicts keyed by custom_id and all_records

list[JSONObject]

is the flat list of all parsed records.

estimate_request_tokens(request_line, *, chars_per_token) abstractmethod

Estimate the token count for a single request line.

Used by the token-budget enforcement logic to avoid submitting batches that exceed the enqueued-token limit.

PARAMETER DESCRIPTION
request_line

The request line whose token count to estimate.

TYPE: BatchRequestLine

chars_per_token

Fallback ratio when tiktoken is unavailable.

TYPE: int

RETURNS DESCRIPTION
int

Estimated token count as a non-negative integer.

ProviderConfig

Bases: ABC

Abstract configuration for a specific batch provider.

Sub-classes must implement serialisation and expose the stable :attr:provider_kind identifier used for runtime dispatch and durable storage.

ATTRIBUTE DESCRIPTION
poll_interval_sec

Default seconds between polling cycles when :meth:~batchor.Run.wait is used.

TYPE: float

provider_kind abstractmethod property

Stable provider identifier used for runtime dispatch and persistence.

to_payload() abstractmethod

Serialise provider-specific config to JSON for durable storage.

RETURNS DESCRIPTION
JSONObject

A JSON-serialisable dictionary including all configuration fields,

JSONObject

including secrets. Use :meth:to_public_payload to strip secrets.

to_public_payload()

Serialise provider config without secret material for durable storage.

RETURNS DESCRIPTION
JSONObject

A JSON-serialisable dictionary safe to persist or log. The default

JSONObject

implementation delegates to :meth:to_payload; sub-classes should

JSONObject

override this to omit credentials.

StructuredOutputSchema(name, schema) dataclass

Validated JSON Schema for a structured output model.

ATTRIBUTE DESCRIPTION
name

Schema name sent to the provider (snake_case by convention).

TYPE: str

schema

Strict JSON Schema object with additionalProperties: false on all nested objects.

TYPE: JSONObject

OpenAIBatchProvider(config, client=None)

Bases: BatchProvider

Built-in provider that adapts batchor jobs to the OpenAI Batch API.

build_request_line(*, custom_id, prompt_parts, structured_output=None)

Build one OpenAI batch JSONL request row for a logical item.

upload_input_file(input_path)

Upload a prepared local JSONL file to OpenAI and return the file id.

delete_input_file(file_id)

Best-effort deletion of an uploaded input file.

create_batch(*, input_file_id, metadata=None)

Create an OpenAI batch from a previously uploaded input file.

get_batch(batch_id)

Fetch the current remote state for one OpenAI batch.

download_file_content(file_id)

Download a provider file and normalize it to text.

parse_jsonl(content) staticmethod

Parse provider JSONL payloads into JSON objects.

parse_batch_output(*, output_content, error_content)

Split OpenAI output/error payloads into success and error maps.

estimate_request_tokens(request_line, *, chars_per_token)

Estimate submitted tokens for one provider request line.

ProviderRegistry()

Dispatch table mapping provider kinds to factory and loader callables.

Use :func:build_default_provider_registry to obtain a registry pre-loaded with the built-in OpenAI provider, or construct one manually to register custom providers.

register(*, kind, factory, loader)

Register a provider factory and config loader for kind.

PARAMETER DESCRIPTION
kind

The :class:~batchor.ProviderKind this registration covers.

TYPE: ProviderKind

factory

Callable (config) -> BatchProvider that constructs a live provider instance from its config.

TYPE: ProviderFactory

loader

Callable (payload) -> ProviderConfig that deserialises a persisted config payload produced by :meth:~batchor.ProviderConfig.to_payload.

TYPE: ProviderConfigLoader

create(config)

Create a live provider instance from config.

PARAMETER DESCRIPTION
config

A :class:~batchor.ProviderConfig instance whose provider_kind has been registered.

TYPE: ProviderConfig

RETURNS DESCRIPTION
BatchProvider

A ready-to-use :class:~batchor.BatchProvider.

RAISES DESCRIPTION
ValueError

If config.provider_kind is not registered.

dump_config(config, *, include_secrets=True)

Serialise config to a JSON object with a provider_kind envelope.

PARAMETER DESCRIPTION
config

Provider config to serialise.

TYPE: ProviderConfig

include_secrets

When True (default), uses :meth:~batchor.ProviderConfig.to_payload. When False, uses :meth:~batchor.ProviderConfig.to_public_payload.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
JSONObject

A JSONObject with "provider_kind" and "config" keys.

load_config(payload)

Deserialise a persisted config payload produced by :meth:dump_config.

PARAMETER DESCRIPTION
payload

A JSONObject with "provider_kind" and "config" keys as produced by :meth:dump_config.

TYPE: JSONObject

RETURNS DESCRIPTION
ProviderConfig

A reconstructed :class:~batchor.ProviderConfig instance.

RAISES DESCRIPTION
TypeError

If provider_kind or config have incorrect types.

ValueError

If the provider_kind is not registered.

BatchRunner(*, storage=None, provider_registry=None, storage_registry=None, provider_factory=None, observer=None, sleep=None, artifact_store=None, temp_root=None)

Durable orchestrator for creating, resuming, and inspecting batch runs.

Initialize the runner facade and its internal runtime layers.

PARAMETER DESCRIPTION
storage

Storage backend instance or registered storage kind.

TYPE: str | StateStore | None DEFAULT: None

provider_registry

Provider registry used for config dumping and provider construction.

TYPE: ProviderRegistry | None DEFAULT: None

storage_registry

Storage registry used for named storage backends.

TYPE: StorageRegistry | None DEFAULT: None

provider_factory

Optional provider factory override for tests.

TYPE: Callable[[Any], BatchProvider] | None DEFAULT: None

observer

Optional run event observer callback.

TYPE: Callable[[RunEvent], None] | None DEFAULT: None

sleep

Optional sleep function override for polling loops.

TYPE: Callable[[float], None] | None DEFAULT: None

artifact_store

Artifact store override.

TYPE: ArtifactStore | None DEFAULT: None

temp_root

Root used for the default local artifact store.

TYPE: str | Path | None DEFAULT: None

start(job, *, run_id=None)

Create or resume a durable run for the given job.

PARAMETER DESCRIPTION
job

Declarative batch job to start or resume.

TYPE: BatchJob[Any, BaseModel]

run_id

Optional durable run identifier.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Run

Durable run handle for the run.

run_and_wait(job, *, run_id=None)

Start a run and block until it reaches a terminal state.

PARAMETER DESCRIPTION
job

Declarative batch job to start.

TYPE: BatchJob[Any, BaseModel]

run_id

Optional durable run identifier.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Run

Durable run handle after it becomes terminal.

get_run(run_id)

Rehydrate an existing durable run handle from storage.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Rehydrated durable run handle.

pause_run(run_id)

Set a run's control state to paused.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Durable run handle with updated control state.

resume_run(run_id)

Resume a previously paused run.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Durable run handle with updated control state.

RAISES DESCRIPTION
ValueError

If the run is already cancelling.

cancel_run(run_id)

Request cancellation of an active run.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Durable run handle with updated control state.

read_terminal_results(run_id, *, after_sequence=0, limit=None)

Read a page of terminal item results for cursor-based streaming.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

after_sequence

Terminal-result cursor from the previous page.

TYPE: int DEFAULT: 0

limit

Optional maximum number of items to return.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
TerminalResultsPage

Terminal results page.

RAISES DESCRIPTION
ValueError

If after_sequence is negative.

export_terminal_results(run_id, *, destination, after_sequence=0, append=True, limit=None)

Export terminal item results to a JSONL file.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

destination

Destination JSONL path.

TYPE: str | Path

after_sequence

Terminal-result cursor from the previous page.

TYPE: int DEFAULT: 0

append

Whether to append to the destination file.

TYPE: bool DEFAULT: True

limit

Optional maximum number of items to export.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
TerminalResultsExportResult

Terminal results export report.

export_artifacts(run_id, *, destination_dir)

Export retained run artifacts and a results manifest for a terminal run.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

destination_dir

Directory that will receive the exported run bundle.

TYPE: str | Path

RETURNS DESCRIPTION
ArtifactExportResult

Artifact export report.

prune_artifacts(run_id, *, include_raw_output_artifacts=False)

Remove retained artifacts for a terminal run and clear stored pointers.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

include_raw_output_artifacts

Whether raw provider payloads should also be pruned.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
ArtifactPruneResult

Artifact prune report.

Run(*, runner, run_id, summary)

Durable handle for polling, waiting on, and inspecting one batch run.

Initialize the durable run handle.

PARAMETER DESCRIPTION
runner

Owning runner facade.

TYPE: BatchRunner

run_id

Durable run identifier.

TYPE: str

summary

Cached summary for the run.

TYPE: RunSummary

status property

Return the cached lifecycle status for the run.

RETURNS DESCRIPTION
RunLifecycleStatus

Cached lifecycle status.

control_state property

Return the cached operator control state for the run.

RETURNS DESCRIPTION
RunControlState

Cached control state.

is_finished property

Return whether the run is in a terminal lifecycle state.

RETURNS DESCRIPTION
bool

True if the run is terminal.

refresh()

Perform one poll-and-submit cycle and return the updated summary.

RETURNS DESCRIPTION
RunSummary

Updated run summary.

wait(*, timeout=None, poll_interval=None)

Block until the run is terminal or the optional timeout expires.

PARAMETER DESCRIPTION
timeout

Optional timeout in seconds.

TYPE: float | None DEFAULT: None

poll_interval

Optional polling interval override in seconds.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Run

The same run handle after it becomes terminal.

RAISES DESCRIPTION
RunPausedError

If the run is paused while waiting.

TimeoutError

If the timeout elapses before the run is terminal.

summary()

Read the latest persisted summary for the run from storage.

RETURNS DESCRIPTION
RunSummary

Latest persisted run summary.

snapshot()

Return the current summary plus expanded terminal item payloads.

RETURNS DESCRIPTION
RunSnapshot

Snapshot containing the latest summary plus materialized results.

results()

Return terminal item results for the run.

RETURNS DESCRIPTION
list[BatchResultItem]

Terminal item results in durable item order.

RAISES DESCRIPTION
RunNotFinishedError

If the run is not terminal yet.

prune_artifacts(*, include_raw_output_artifacts=False)

Prune retained artifacts for this terminal run.

PARAMETER DESCRIPTION
include_raw_output_artifacts

Whether raw provider payloads should also be pruned.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
ArtifactPruneResult

Artifact prune report.

export_artifacts(destination_dir)

Export retained artifacts for this terminal run.

PARAMETER DESCRIPTION
destination_dir

Directory that will receive the exported run bundle.

TYPE: str

RETURNS DESCRIPTION
ArtifactExportResult

Artifact export report.

pause()

Suspend execution of this run.

RETURNS DESCRIPTION
Updated

class:~batchor.RunSummary with control state

TYPE: RunSummary

RunSummary

PAUSED.

resume()

Resume this run after it has been paused.

RETURNS DESCRIPTION
Updated

class:~batchor.RunSummary with control state

TYPE: RunSummary

RunSummary

RUNNING.

RAISES DESCRIPTION
ValueError

If the run is in CANCEL_REQUESTED state.

cancel()

Request cancellation of this run.

RETURNS DESCRIPTION
Updated

class:~batchor.RunSummary with control state

TYPE: RunSummary

RunSummary

CANCEL_REQUESTED.

read_terminal_results(*, after_sequence=0, limit=None)

Read a page of terminal item results using cursor-based pagination.

PARAMETER DESCRIPTION
after_sequence

Cursor from the previous page's next_after_sequence. Pass 0 to start from the beginning.

TYPE: int DEFAULT: 0

limit

Maximum number of results to return per call.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
A

class:~batchor.TerminalResultsPage with items and an updated

TYPE: TerminalResultsPage

TerminalResultsPage

cursor.

export_terminal_results(destination, *, after_sequence=0, append=True, limit=None)

Export terminal item results to a JSONL file.

PARAMETER DESCRIPTION
destination

Path to the output JSONL file.

TYPE: str

after_sequence

Cursor from a previous call.

TYPE: int DEFAULT: 0

append

When True (default), the file is opened in append mode.

TYPE: bool DEFAULT: True

limit

Maximum number of results to export per call.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
A

class:~batchor.TerminalResultsExportResult with the file

TYPE: TerminalResultsExportResult

TerminalResultsExportResult

path, export count, and updated cursor.

StructuredOutputError(error_class, message, *, raw_error=None)

Bases: ValueError

Raised when a provider response cannot be parsed as the expected model.

ATTRIBUTE DESCRIPTION
error_class

Short machine-readable error category (e.g. "invalid_json", "structured_output_validation_failed").

message

Human-readable error description.

raw_error

Optional raw payload captured for debugging.

Initialise the error.

PARAMETER DESCRIPTION
error_class

Short machine-readable error category.

TYPE: str

message

Human-readable description of the parse failure.

TYPE: str

raw_error

Optional raw payload (parsed JSON or raw text) for debugging.

TYPE: JSONValue | None DEFAULT: None

CheckpointedItemSource

Bases: ItemSource[PayloadT], Generic[PayloadT]

Item source with durable checkpoint support for mid-stream resumption.

The runner persists the checkpoint returned by each :class:CheckpointedBatchItem to state. On resume it calls :meth:iter_from_checkpoint with the last persisted checkpoint so iteration continues exactly where it left off.

source_identity() abstractmethod

Return a stable identity used to validate resume compatibility.

initial_checkpoint() abstractmethod

Return the checkpoint that represents the start of the source.

RETURNS DESCRIPTION
JSONValue

An opaque :data:~batchor.core.types.JSONValue accepted by

JSONValue

meth:iter_from_checkpoint to begin iteration from the first item.

iter_from_checkpoint(checkpoint) abstractmethod

Yield items plus the next opaque checkpoint after each durable item.

PARAMETER DESCRIPTION
checkpoint

The checkpoint at which to resume. Must be a value previously returned by a :class:CheckpointedBatchItem.

TYPE: JSONValue

YIELDS DESCRIPTION
CheckpointedBatchItem[PayloadT]

class:CheckpointedBatchItem instances in source order.

ItemSource

Bases: ABC, Generic[PayloadT]

Abstract iterable source of :class:~batchor.BatchItem objects.

Implement this class when your source does not need resume support. For durable checkpointing use :class:CheckpointedItemSource instead.

CompositeItemSource(sources)

Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]

Compose ordered checkpointed sources into one logical deterministic source.

CsvItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')

Bases: ResumableItemSource[PayloadT], Generic[PayloadT]

Stream BatchItem values from a CSV file with durable resume support.

JsonlItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')

Bases: ResumableItemSource[PayloadT], Generic[PayloadT]

Stream BatchItem values from a JSONL file with durable resume support.

ParquetItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, columns=None)

Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]

Stream :class:~batchor.BatchItem values from a Parquet file.

Uses pyarrow to read row groups lazily, checkpointing at the (row_group_index, row_index_within_group) level to support efficient resumption within large files.

ATTRIBUTE DESCRIPTION
path

Path to the Parquet file.

item_id_from_row

Callable that derives a unique item ID from a row.

payload_from_row

Callable that converts a row to the item payload.

metadata_from_row

Optional callable for per-item metadata.

columns

Optional list of column names to read (projection push-down).

StorageRegistry()

Dispatch table mapping storage kinds to factory callables.

Use :func:build_default_storage_registry to obtain a registry pre-loaded with SQLite, Postgres, and in-memory backends, or construct one manually to register custom backends.

register(*, kind, factory)

Register a storage factory for kind.

PARAMETER DESCRIPTION
kind

The :class:~batchor.StorageKind this factory handles.

TYPE: StorageKind

factory

Zero-argument callable that returns a :class:~batchor.StateStore instance.

TYPE: StorageFactory

create(kind)

Create a storage backend of the given kind.

PARAMETER DESCRIPTION
kind

A :class:~batchor.StorageKind value or equivalent string.

TYPE: StorageKind | str

RETURNS DESCRIPTION
StateStore

A ready-to-use :class:~batchor.StateStore.

RAISES DESCRIPTION
ValueError

If kind is not registered or is not a valid :class:~batchor.StorageKind.

SQLiteStorage(*, name='default', path=None, now=None, engine=None, provider_registry=None)

Bases: SQLiteResultsMixin, SQLiteLifecycleMixin, SQLiteQueryMixin, StateStore

SQLite-backed :class:~batchor.StateStore.

The default state store for batchor runs. Uses a single SQLite database file with WAL journal mode for concurrent reads and a companion artifact directory for JSONL files.

ATTRIBUTE DESCRIPTION
path

Absolute path to the SQLite database file.

engine

SQLAlchemy engine used for all database operations.

provider_registry

Registry used to deserialise provider configs on run resume.

Initialise or open a SQLite storage backend.

PARAMETER DESCRIPTION
name

Logical name used to derive the default file path (~/.batchor/<name>.sqlite3). Ignored when path is provided.

TYPE: str DEFAULT: 'default'

path

Explicit path to the database file. ~ is expanded.

TYPE: str | Path | None DEFAULT: None

now

Optional clock override for testing. Defaults to datetime.now(timezone.utc).

TYPE: Callable[[], datetime] | None DEFAULT: None

engine

Optional pre-built SQLAlchemy engine. When provided, WAL and busy-timeout pragmas are not configured automatically.

TYPE: Engine | None DEFAULT: None

provider_registry

Registry for deserialising provider configs on run resume. Defaults to the built-in registry.

TYPE: ProviderRegistry | None DEFAULT: None

default_path(name) staticmethod

Return the default database file path for a given store name.

PARAMETER DESCRIPTION
name

Logical store name. Whitespace-only names fall back to "default".

TYPE: str

RETURNS DESCRIPTION
Path

~/.batchor/<name>.sqlite3

close()

Dispose the SQLAlchemy connection pool and release resources.

MemoryStateStore(*, now=None)

Bases: StateStore

Ephemeral in-memory implementation of :class:~batchor.StateStore.

All state is held in plain Python dictionaries. No persistence occurs; data is lost when the store is garbage-collected or the process exits.

Thread safety: individual method calls are not locked. Do not share a single instance across threads without external synchronisation.

StateStore

Bases: ABC

Abstract interface for durable run state storage.

All methods are synchronous and transactional within the scope of a single call. Implementations must be safe to use from multiple threads provided callers do not share a connection outside a method call.

The two primary implementations are:

  • :class:~batchor.SQLiteStorage — file-backed, zero-dependency (default).
  • :class:~batchor.PostgresStorage — shared Postgres control plane.

has_run(*, run_id) abstractmethod

Return True if a run with run_id exists in storage.

PARAMETER DESCRIPTION
run_id

Run identifier to check.

TYPE: str

create_run(*, run_id, config, items) abstractmethod

Create a new run record in storage.

PARAMETER DESCRIPTION
run_id

Unique identifier for the new run.

TYPE: str

config

Serialised run configuration.

TYPE: PersistedRunConfig

items

Optional initial batch of materialized items to insert.

TYPE: list[MaterializedItem]

append_items(*, run_id, items) abstractmethod

Append additional items to an existing run.

PARAMETER DESCRIPTION
run_id

Identifier of the run to extend.

TYPE: str

items

Materialized items to append.

TYPE: list[MaterializedItem]

set_ingest_checkpoint(*, run_id, checkpoint) abstractmethod

Persist the initial ingest checkpoint for a checkpointed source.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

checkpoint

Checkpoint representing the start-of-source state.

TYPE: IngestCheckpoint

get_ingest_checkpoint(*, run_id) abstractmethod

Return the current ingest checkpoint, or None if not set.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

update_ingest_checkpoint(*, run_id, next_item_index, checkpoint_payload=None, ingestion_complete) abstractmethod

Advance the ingest checkpoint after a batch of items is ingested.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

next_item_index

0-based index of the next item to ingest.

TYPE: int

checkpoint_payload

Updated source-specific checkpoint payload.

TYPE: JSONValue | None DEFAULT: None

ingestion_complete

True once all items have been ingested.

TYPE: bool

get_run_config(*, run_id) abstractmethod

Return the persisted configuration for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

get_run_control_state(*, run_id) abstractmethod

Return the current operator control state for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

set_run_control_state(*, run_id, control_state) abstractmethod

Persist a new operator control state for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

control_state

The new control state to set.

TYPE: RunControlState

claim_items_for_submission(*, run_id, max_attempts, limit=None) abstractmethod

Atomically claim a batch of pending items for the submission cycle.

Items are moved from PENDING to QUEUED_LOCAL. Only items whose attempt count is less than max_attempts are eligible.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

max_attempts

Maximum attempts per item.

TYPE: int

limit

Maximum number of items to claim.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[ClaimedItem]

List of claimed items (may be empty if none are pending).

release_items_to_pending(*, run_id, item_ids) abstractmethod

Return locally-queued items to PENDING state.

Called when a submission cycle is interrupted before all claimed items could be submitted.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

item_ids

Identifiers of items to release.

TYPE: list[str]

requeue_local_items(*, run_id) abstractmethod

Re-queue any items stuck in QUEUED_LOCAL back to PENDING.

Called when a run handle is rehydrated to recover items claimed by a previous process that terminated unexpectedly.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

RETURNS DESCRIPTION
int

Number of items re-queued.

record_request_artifacts(*, run_id, pointers) abstractmethod

Persist per-item request artifact pointers.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

pointers

Artifact pointer records to persist.

TYPE: list[RequestArtifactPointer]

get_request_artifact_paths(*, run_id) abstractmethod

Return all distinct request artifact paths recorded for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

clear_request_artifact_pointers(*, run_id, artifact_paths) abstractmethod

Clear per-item request artifact pointers for the given paths.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

artifact_paths

Relative artifact paths to clear.

TYPE: list[str]

RETURNS DESCRIPTION
int

Number of pointer records cleared.

record_batch_artifacts(*, run_id, pointers) abstractmethod

Persist batch output and error artifact pointers.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

pointers

Batch artifact pointer records to persist.

TYPE: list[BatchArtifactPointer]

get_artifact_inventory(*, run_id) abstractmethod

Return the complete artifact inventory for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

clear_batch_artifact_pointers(*, run_id, artifact_paths) abstractmethod

Clear batch artifact pointers for the given paths.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

artifact_paths

Relative artifact paths to clear.

TYPE: list[str]

RETURNS DESCRIPTION
int

Number of pointer records cleared.

mark_artifacts_exported(*, run_id, export_root) abstractmethod

Record that artifacts have been exported to export_root.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

export_root

Absolute path to the export directory.

TYPE: str

register_batch(*, run_id, local_batch_id, provider_batch_id, status, custom_ids) abstractmethod

Record a newly submitted provider batch.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

local_batch_id

Locally generated batch identifier.

TYPE: str

provider_batch_id

Provider-assigned batch identifier.

TYPE: str

status

Initial batch status string.

TYPE: str

custom_ids

custom_id values for all items in this batch.

TYPE: list[str]

mark_items_submitted(*, run_id, provider_batch_id, submissions) abstractmethod

Transition items from QUEUED_LOCAL to SUBMITTED.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

provider_batch_id

Batch that now owns these items.

TYPE: str

submissions

Submission records including custom_id and token estimates.

TYPE: list[PreparedSubmission]

update_batch_status(*, run_id, provider_batch_id, status, output_file_id=None, error_file_id=None) abstractmethod

Persist the latest polled status for a provider batch.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

provider_batch_id

Batch to update.

TYPE: str

status

New status string from the provider.

TYPE: str

output_file_id

Provider file ID for output, if now available.

TYPE: str | None DEFAULT: None

error_file_id

Provider file ID for errors, if now available.

TYPE: str | None DEFAULT: None

get_active_batches(*, run_id) abstractmethod

Return all in-flight (non-terminal) batch records for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

get_submitted_custom_ids_for_batch(*, run_id, provider_batch_id) abstractmethod

Return all custom_id values submitted in provider_batch_id.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

provider_batch_id

Batch to query.

TYPE: str

mark_items_completed(*, run_id, completions) abstractmethod

Transition items to COMPLETED and persist their results.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

completions

Completed item records to persist.

TYPE: list[CompletedItemRecord]

mark_items_failed(*, run_id, failures, max_attempts) abstractmethod

Transition items after a provider-level failure.

Items whose attempt count reaches max_attempts are moved to FAILED_PERMANENT; others are moved to FAILED_RETRYABLE and will be re-queued on the next submission cycle.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

failures

Failure records identified by custom_id.

TYPE: list[ItemFailureRecord]

max_attempts

Attempt ceiling from the retry policy.

TYPE: int

mark_queued_items_failed(*, run_id, failures, max_attempts) abstractmethod

Fail items that are still in the local queue (pre-submission).

Used for token-budget rejections where items never received a custom_id.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

failures

Failure records identified by item_id.

TYPE: list[QueuedItemFailureRecord]

max_attempts

Attempt ceiling from the retry policy.

TYPE: int

reset_batch_items_to_pending(*, run_id, provider_batch_id, error) abstractmethod

Reset all items in a failed batch to pending for retry.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

provider_batch_id

The batch that failed or was cancelled.

TYPE: str

error

Failure details to record on each item.

TYPE: ItemFailure

get_active_submitted_token_estimate(*, run_id) abstractmethod

Return the total token estimate for all currently submitted items.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

record_batch_retry_failure(*, run_id, error_class, base_delay_sec, max_delay_sec) abstractmethod

Record a batch control-plane failure and advance the backoff state.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

error_class

Short error category from the failure.

TYPE: str

base_delay_sec

Base backoff delay.

TYPE: float

max_delay_sec

Maximum backoff delay.

TYPE: float

RETURNS DESCRIPTION
Updated

class:RetryBackoffState.

TYPE: RetryBackoffState

clear_batch_retry_backoff(*, run_id) abstractmethod

Reset the control-plane backoff state after a successful operation.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

get_batch_retry_backoff_remaining_sec(*, run_id) abstractmethod

Return the seconds remaining until the next retry is permitted.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

RETURNS DESCRIPTION
float

Remaining backoff in seconds, or 0.0 if no backoff is active.

get_run_summary(*, run_id) abstractmethod

Compute and return the aggregated summary for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

get_item_records(*, run_id) abstractmethod

Return all item records for a run.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

get_terminal_item_records(*, run_id, after_sequence, limit=None) abstractmethod

Return terminal item records using cursor-based pagination.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

after_sequence

Return only records whose terminal_result_sequence is greater than this value.

TYPE: int

limit

Maximum number of records to return.

TYPE: int | None DEFAULT: None

mark_nonterminal_items_cancelled(*, run_id, error) abstractmethod

Move all non-terminal items to FAILED_PERMANENT with error.

Called when a cancellation is finalised.

PARAMETER DESCRIPTION
run_id

Run identifier.

TYPE: str

error

Cancellation failure details to attach to each item.

TYPE: ItemFailure

RETURNS DESCRIPTION
int

Number of items that were cancelled.

build_default_provider_registry()

Create a :class:ProviderRegistry pre-loaded with the OpenAI provider.

RETURNS DESCRIPTION
A

class:ProviderRegistry with :attr:~batchor.ProviderKind.OPENAI

TYPE: ProviderRegistry

ProviderRegistry

registered.

default_schema_name(model)

Derive a snake_case schema name from a Pydantic model class.

Leading underscores are stripped before conversion. Double underscores produced by the conversion are collapsed.

PARAMETER DESCRIPTION
model

The Pydantic model class.

TYPE: type[BaseModel]

RETURNS DESCRIPTION
str

A snake_case string suitable for use as an OpenAI schema name.

Example

default_schema_name(MyOutputModel) 'my_output_model'

model_output_schema(model, *, schema_name=None)

Derive a validated strict JSON Schema from a Pydantic model.

Calls :func:validate_strict_json_schema to ensure the schema is compatible with OpenAI's structured output API before returning it.

PARAMETER DESCRIPTION
model

Pydantic model class to derive the schema from.

TYPE: type[BaseModel]

schema_name

Optional explicit schema name. When None the name is derived via :func:default_schema_name.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

A 2-tuple (schema_name, schema) where schema_name is the

JSONObject

resolved name and schema is the strict JSON Schema object.

RAISES DESCRIPTION
StructuredOutputSchemaError

If the schema is not compatible with OpenAI's strict structured output requirements.

build_default_storage_registry(*, provider_registry=None)

Create a :class:StorageRegistry pre-loaded with all built-in backends.

Registers:

  • StorageKind.SQLITE → :class:~batchor.SQLiteStorage (default store)
  • StorageKind.POSTGRES → :class:~batchor.PostgresStorage
  • StorageKind.MEMORY → :class:~batchor.MemoryStateStore
PARAMETER DESCRIPTION
provider_registry

Optional provider registry forwarded to backends that need it for config round-tripping on resume.

TYPE: ProviderRegistry | None DEFAULT: None

RETURNS DESCRIPTION
A

class:StorageRegistry with all built-in backends registered.

TYPE: StorageRegistry

Runner

BatchRunner is the main entrypoint for the Python API, and Run is the durable handle returned by start() or get_run().

batchor.runtime.runner

BatchRunner public facade over the internal runtime execution layers.

BatchRunner(*, storage=None, provider_registry=None, storage_registry=None, provider_factory=None, observer=None, sleep=None, artifact_store=None, temp_root=None)

Durable orchestrator for creating, resuming, and inspecting batch runs.

Initialize the runner facade and its internal runtime layers.

PARAMETER DESCRIPTION
storage

Storage backend instance or registered storage kind.

TYPE: str | StateStore | None DEFAULT: None

provider_registry

Provider registry used for config dumping and provider construction.

TYPE: ProviderRegistry | None DEFAULT: None

storage_registry

Storage registry used for named storage backends.

TYPE: StorageRegistry | None DEFAULT: None

provider_factory

Optional provider factory override for tests.

TYPE: Callable[[Any], BatchProvider] | None DEFAULT: None

observer

Optional run event observer callback.

TYPE: Callable[[RunEvent], None] | None DEFAULT: None

sleep

Optional sleep function override for polling loops.

TYPE: Callable[[float], None] | None DEFAULT: None

artifact_store

Artifact store override.

TYPE: ArtifactStore | None DEFAULT: None

temp_root

Root used for the default local artifact store.

TYPE: str | Path | None DEFAULT: None

start(job, *, run_id=None)

Create or resume a durable run for the given job.

PARAMETER DESCRIPTION
job

Declarative batch job to start or resume.

TYPE: BatchJob[Any, BaseModel]

run_id

Optional durable run identifier.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Run

Durable run handle for the run.

run_and_wait(job, *, run_id=None)

Start a run and block until it reaches a terminal state.

PARAMETER DESCRIPTION
job

Declarative batch job to start.

TYPE: BatchJob[Any, BaseModel]

run_id

Optional durable run identifier.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Run

Durable run handle after it becomes terminal.

get_run(run_id)

Rehydrate an existing durable run handle from storage.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Rehydrated durable run handle.

pause_run(run_id)

Set a run's control state to paused.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Durable run handle with updated control state.

resume_run(run_id)

Resume a previously paused run.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Durable run handle with updated control state.

RAISES DESCRIPTION
ValueError

If the run is already cancelling.

cancel_run(run_id)

Request cancellation of an active run.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

RETURNS DESCRIPTION
Run

Durable run handle with updated control state.

read_terminal_results(run_id, *, after_sequence=0, limit=None)

Read a page of terminal item results for cursor-based streaming.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

after_sequence

Terminal-result cursor from the previous page.

TYPE: int DEFAULT: 0

limit

Optional maximum number of items to return.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
TerminalResultsPage

Terminal results page.

RAISES DESCRIPTION
ValueError

If after_sequence is negative.

export_terminal_results(run_id, *, destination, after_sequence=0, append=True, limit=None)

Export terminal item results to a JSONL file.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

destination

Destination JSONL path.

TYPE: str | Path

after_sequence

Terminal-result cursor from the previous page.

TYPE: int DEFAULT: 0

append

Whether to append to the destination file.

TYPE: bool DEFAULT: True

limit

Optional maximum number of items to export.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
TerminalResultsExportResult

Terminal results export report.

export_artifacts(run_id, *, destination_dir)

Export retained run artifacts and a results manifest for a terminal run.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

destination_dir

Directory that will receive the exported run bundle.

TYPE: str | Path

RETURNS DESCRIPTION
ArtifactExportResult

Artifact export report.

prune_artifacts(run_id, *, include_raw_output_artifacts=False)

Remove retained artifacts for a terminal run and clear stored pointers.

PARAMETER DESCRIPTION
run_id

Durable run identifier.

TYPE: str

include_raw_output_artifacts

Whether raw provider payloads should also be pruned.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
ArtifactPruneResult

Artifact prune report.

OpenAI provider

This is the built-in provider implementation. Most consumers only need OpenAIProviderConfig, but the lower-level provider class is documented here for extension work and tests.

batchor.providers.openai

OpenAI Batch API provider implementation.

Adapts batchor's generic batch model to the OpenAI Batch API. Supports both the /v1/responses (default) and /v1/chat/completions endpoints, with optional structured JSON output via OpenAI's json_schema response format.

OpenAIBatchProvider(config, client=None)

Bases: BatchProvider

Built-in provider that adapts batchor jobs to the OpenAI Batch API.

build_request_line(*, custom_id, prompt_parts, structured_output=None)

Build one OpenAI batch JSONL request row for a logical item.

upload_input_file(input_path)

Upload a prepared local JSONL file to OpenAI and return the file id.

delete_input_file(file_id)

Best-effort deletion of an uploaded input file.

create_batch(*, input_file_id, metadata=None)

Create an OpenAI batch from a previously uploaded input file.

get_batch(batch_id)

Fetch the current remote state for one OpenAI batch.

download_file_content(file_id)

Download a provider file and normalize it to text.

parse_jsonl(content) staticmethod

Parse provider JSONL payloads into JSON objects.

parse_batch_output(*, output_content, error_content)

Split OpenAI output/error payloads into success and error maps.

estimate_request_tokens(request_line, *, chars_per_token)

Estimate submitted tokens for one provider request line.

resolve_openai_api_key(config)

Resolve credentials from explicit config first, then the environment.

Sources

These sources support durable resume through source fingerprints and checkpoints.

batchor.sources.base

Abstract base classes for item source adapters.

Item sources are responsible for streaming :class:~batchor.BatchItem objects to the runner. The hierarchy provides three levels of capability:

  • :class:ItemSource — a plain iterable (no resume support).
  • :class:CheckpointedItemSource — supports durable checkpoints for resuming mid-stream after a process restart.
  • :class:ResumableItemSource — a convenience sub-class of :class:CheckpointedItemSource that uses a monotonic integer index as the checkpoint (suitable for ordered, seekable sources like CSV and JSONL files).

SourceIdentity(source_kind, source_ref, source_fingerprint) dataclass

Stable identity for a checkpointed source, used to validate resumability.

When a run is resumed the runner compares the stored identity against the current source's identity to detect mismatches (e.g. the source file was replaced).

ATTRIBUTE DESCRIPTION
source_kind

Short identifier for the source type (e.g. "csv").

TYPE: str

source_ref

Human-readable reference to the source (e.g. absolute file path).

TYPE: str

source_fingerprint

Content fingerprint (e.g. SHA-256 of size + mtime) used to detect source mutations.

TYPE: str

IndexedBatchItem(item_index, item) dataclass

Bases: Generic[PayloadT]

A :class:~batchor.BatchItem paired with its 0-based source index.

ATTRIBUTE DESCRIPTION
item_index

0-based position of this item within the source.

TYPE: int

item

The batch item.

TYPE: BatchItem[PayloadT]

CheckpointedBatchItem(next_checkpoint, item) dataclass

Bases: Generic[PayloadT]

A :class:~batchor.BatchItem paired with an opaque resume checkpoint.

The checkpoint is an opaque :data:~batchor.core.types.JSONValue that, when passed back to :meth:~batchor.CheckpointedItemSource.iter_from_checkpoint, resumes iteration at the item after this one.

ATTRIBUTE DESCRIPTION
next_checkpoint

Checkpoint to persist after processing this item.

TYPE: JSONValue

item

The batch item.

TYPE: BatchItem[PayloadT]

ItemSource

Bases: ABC, Generic[PayloadT]

Abstract iterable source of :class:~batchor.BatchItem objects.

Implement this class when your source does not need resume support. For durable checkpointing use :class:CheckpointedItemSource instead.

CheckpointedItemSource

Bases: ItemSource[PayloadT], Generic[PayloadT]

Item source with durable checkpoint support for mid-stream resumption.

The runner persists the checkpoint returned by each :class:CheckpointedBatchItem to state. On resume it calls :meth:iter_from_checkpoint with the last persisted checkpoint so iteration continues exactly where it left off.

source_identity() abstractmethod

Return a stable identity used to validate resume compatibility.

initial_checkpoint() abstractmethod

Return the checkpoint that represents the start of the source.

RETURNS DESCRIPTION
JSONValue

An opaque :data:~batchor.core.types.JSONValue accepted by

JSONValue

meth:iter_from_checkpoint to begin iteration from the first item.

iter_from_checkpoint(checkpoint) abstractmethod

Yield items plus the next opaque checkpoint after each durable item.

PARAMETER DESCRIPTION
checkpoint

The checkpoint at which to resume. Must be a value previously returned by a :class:CheckpointedBatchItem.

TYPE: JSONValue

YIELDS DESCRIPTION
CheckpointedBatchItem[PayloadT]

class:CheckpointedBatchItem instances in source order.

ResumableItemSource

Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]

Checkpointed source where the checkpoint is a 0-based integer index.

Suitable for ordered seekable sources (CSV, JSONL) where iteration can efficiently skip ahead to a given row index. The checkpoint is simply the index of the next item to yield.

iter_from(item_index) abstractmethod

Yield items beginning at the given 0-based source item index.

PARAMETER DESCRIPTION
item_index

0-based index of the first item to yield. Items before this index are skipped.

TYPE: int

YIELDS DESCRIPTION
IndexedBatchItem[PayloadT]

class:IndexedBatchItem instances starting from item_index.

batchor.sources.composite

CompositeItemSource(sources)

Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]

Compose ordered checkpointed sources into one logical deterministic source.

batchor.sources.files

File-backed item source implementations (CSV, JSONL, Parquet).

Each source wraps a local file and streams :class:~batchor.BatchItem objects row by row. All three implementations support durable resumption:

  • :class:CsvItemSource and :class:JsonlItemSource use a monotonic row index as the checkpoint (via :class:~batchor.sources.base.ResumableItemSource).
  • :class:ParquetItemSource uses a (row_group_index, row_index_within_group) checkpoint to resume efficiently within large Parquet files.

Every emitted item carries batchor_lineage metadata recording the source path, row index, and (where applicable) a primary key and partition ID.

CsvItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')

Bases: ResumableItemSource[PayloadT], Generic[PayloadT]

Stream BatchItem values from a CSV file with durable resume support.

JsonlItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')

Bases: ResumableItemSource[PayloadT], Generic[PayloadT]

Stream BatchItem values from a JSONL file with durable resume support.

ParquetItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, columns=None)

Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]

Stream :class:~batchor.BatchItem values from a Parquet file.

Uses pyarrow to read row groups lazily, checkpointing at the (row_group_index, row_index_within_group) level to support efficient resumption within large files.

ATTRIBUTE DESCRIPTION
path

Path to the Parquet file.

item_id_from_row

Callable that derives a unique item ID from a row.

payload_from_row

Callable that converts a row to the item payload.

metadata_from_row

Optional callable for per-item metadata.

columns

Optional list of column names to read (projection push-down).

Storage backends

The generated reference below describes the built-in storage entrypoints. The state-store protocol and detailed storage lifecycle are explained in Storage & Runs.

batchor.storage.sqlite

Public re-export module for the SQLite storage backend.

Import :class:~batchor.SQLiteStorage from here (or from the top-level :mod:batchor package) rather than from :mod:batchor.storage.sqlite_store directly.

SQLiteStorage(*, name='default', path=None, now=None, engine=None, provider_registry=None)

Bases: SQLiteResultsMixin, SQLiteLifecycleMixin, SQLiteQueryMixin, StateStore

SQLite-backed :class:~batchor.StateStore.

The default state store for batchor runs. Uses a single SQLite database file with WAL journal mode for concurrent reads and a companion artifact directory for JSONL files.

ATTRIBUTE DESCRIPTION
path

Absolute path to the SQLite database file.

engine

SQLAlchemy engine used for all database operations.

provider_registry

Registry used to deserialise provider configs on run resume.

Initialise or open a SQLite storage backend.

PARAMETER DESCRIPTION
name

Logical name used to derive the default file path (~/.batchor/<name>.sqlite3). Ignored when path is provided.

TYPE: str DEFAULT: 'default'

path

Explicit path to the database file. ~ is expanded.

TYPE: str | Path | None DEFAULT: None

now

Optional clock override for testing. Defaults to datetime.now(timezone.utc).

TYPE: Callable[[], datetime] | None DEFAULT: None

engine

Optional pre-built SQLAlchemy engine. When provided, WAL and busy-timeout pragmas are not configured automatically.

TYPE: Engine | None DEFAULT: None

provider_registry

Registry for deserialising provider configs on run resume. Defaults to the built-in registry.

TYPE: ProviderRegistry | None DEFAULT: None

default_path(name) staticmethod

Return the default database file path for a given store name.

PARAMETER DESCRIPTION
name

Logical store name. Whitespace-only names fall back to "default".

TYPE: str

RETURNS DESCRIPTION
Path

~/.batchor/<name>.sqlite3

close()

Dispose the SQLAlchemy connection pool and release resources.

batchor.storage.postgres