API Reference¶
This page is split in two parts:
- a short handwritten guide to the public surface
- generated API reference from
mkdocstrings
If the generated reference feels terse, read Architecture first, then Storage & Runs. The generated section reflects the code docstrings, so it is best for signatures and symbol discovery rather than architectural explanation.
Start here¶
Most users only need a small subset of the package:
BatchItem: input item wrapperBatchJob: execution definitionBatchRunner: start, resume, and run orchestrationRun: refresh, wait, inspect, export, and pruneOpenAIProviderConfig: built-in provider configSQLiteStorageandPostgresStorage: durable control-plane backendsCompositeItemSource,CsvItemSource,JsonlItemSource, andParquetItemSource: deterministic item streaming
Recent additions worth checking in the generated reference:
ArtifactPolicyRunControlStateRun.pause(),Run.resume(),Run.cancel()Run.read_terminal_results(...)Run.export_terminal_results(...)CheckpointedItemSourceCompositeItemSourceParquetItemSource
These APIs are currently library-first. The CLI still focuses on CSV/JSONL operator workflows and does not yet expose pause/resume/cancel or incremental terminal-result commands.
Public package¶
Use imports from batchor first. That is the intended consumer surface.
batchor
¶
Batchor: durable OpenAI Batch runner with typed Pydantic results.
This package provides a library-first API for running OpenAI Batch jobs durably, with SQLite-backed state, resumable item sources, replayable request artifacts, and structured Pydantic outputs.
Typical usage::
import batchor
runner = batchor.BatchRunner()
job = batchor.BatchJob(
items=[batchor.BatchItem(item_id="1", payload="hello")],
build_prompt=lambda item: batchor.PromptParts(prompt=item.payload),
provider_config=batchor.OpenAIProviderConfig(model="gpt-4.1"),
)
run = runner.run_and_wait(job)
for result in run.results():
print(result.output_text)
ArtifactStore
¶
Bases: ABC
Abstract interface for reading, writing, and deleting run artifacts.
All paths are relative keys — the store implementation is responsible for mapping them to a concrete location (e.g. a local filesystem subtree or an object-storage bucket).
write_text(key, content, *, encoding='utf-8')
abstractmethod
¶
Write text content to the artifact store under the given key.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative path identifying the artifact.
TYPE:
|
content
|
Text content to write.
TYPE:
|
encoding
|
Text encoding. Defaults to
TYPE:
|
read_text(key, *, encoding='utf-8')
abstractmethod
¶
Read text content for the given key.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative path identifying the artifact.
TYPE:
|
encoding
|
Text encoding. Defaults to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
The stored text content. |
delete(key)
abstractmethod
¶
Delete the artifact at the given key.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative path identifying the artifact.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
bool
|
was not found. |
stage_local_copy(key)
abstractmethod
¶
Return a context manager that exposes a local filesystem path.
Inside the context, the returned :class:~pathlib.Path is guaranteed
to point to a readable copy of the artifact. The caller must not
modify the file.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative path identifying the artifact.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AbstractContextManager[Path]
|
A context manager yielding a :class: |
AbstractContextManager[Path]
|
copy of the artifact. |
export_to_directory(key, destination_root)
abstractmethod
¶
Copy an artifact to a local directory, mirroring the key structure.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative path identifying the artifact.
TYPE:
|
destination_root
|
Target directory. The artifact is written to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
The absolute :class: |
LocalArtifactStore(root)
¶
Bases: ArtifactStore
File-backed artifact store rooted at a single directory.
The store is created with restrictive permissions (0o700) on first use.
Key validation prevents path traversal: keys must be relative and must not
contain .. components.
| ATTRIBUTE | DESCRIPTION |
|---|---|
root |
Absolute :class:
|
Initialise the store, creating the root directory if necessary.
| PARAMETER | DESCRIPTION |
|---|---|
root
|
Path to the root directory.
TYPE:
|
write_text(key, content, *, encoding='utf-8')
¶
Write text content to a file at root / key.
Parent directories are created automatically.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative artifact key (must not be absolute or contain
TYPE:
|
content
|
Text to write.
TYPE:
|
encoding
|
File encoding. Defaults to
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If key is absolute, empty, or traverses outside the root. |
read_text(key, *, encoding='utf-8')
¶
Read text content from root / key.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative artifact key.
TYPE:
|
encoding
|
File encoding. Defaults to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
The file's text contents. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If key is invalid. |
FileNotFoundError
|
If the artifact does not exist. |
delete(key)
¶
Delete the artifact file at root / key.
Empty parent directories up to (but not including) the root are removed after deletion.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative artifact key.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
| RAISES | DESCRIPTION |
|---|---|
IsADirectoryError
|
If the resolved path is a directory. |
ValueError
|
If key is invalid. |
stage_local_copy(key)
¶
Return a context manager yielding the resolved local path directly.
Because this is a local store, no copy is needed; the artifact's actual path is returned inside the context.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative artifact key.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AbstractContextManager[Path]
|
A context manager that yields the local :class: |
export_to_directory(key, destination_root)
¶
Copy an artifact to destination_root / key.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative artifact key identifying the source file.
TYPE:
|
destination_root
|
Target directory; the artifact is written to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Absolute path to the copied file. |
resolve_path(key)
¶
Resolve an artifact key to an absolute filesystem path.
| PARAMETER | DESCRIPTION |
|---|---|
key
|
Relative artifact key.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Absolute
|
class:
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If key is invalid. |
ItemStatus
¶
Bases: StrEnum
Lifecycle status for a single batch item.
| ATTRIBUTE | DESCRIPTION |
|---|---|
PENDING |
Item is waiting to be claimed for submission.
|
QUEUED_LOCAL |
Item has been claimed locally but not yet submitted to the provider.
|
SUBMITTED |
Item has been included in an active provider batch.
|
COMPLETED |
Item reached a terminal success state.
|
FAILED_RETRYABLE |
Item failed on this attempt but will be retried.
|
FAILED_PERMANENT |
Item exceeded the maximum retry limit and will not be retried.
|
OpenAIEndpoint
¶
Bases: StrEnum
OpenAI API endpoint path used in Batch request lines.
| ATTRIBUTE | DESCRIPTION |
|---|---|
RESPONSES |
The
|
CHAT_COMPLETIONS |
The
|
OpenAIModel
¶
Bases: StrEnum
Known OpenAI model identifiers.
Use these constants or pass a plain string to
:attr:~batchor.OpenAIProviderConfig.model for forward-compatibility with
models released after this version.
| ATTRIBUTE | DESCRIPTION |
|---|---|
GPT_5_2 |
|
GPT_5_1 |
|
GPT_5 |
|
GPT_5_MINI |
|
GPT_5_NANO |
|
GPT_4_1 |
|
GPT_4_1_MINI |
|
GPT_4_1_NANO |
|
OpenAIReasoningEffort
¶
Bases: StrEnum
Reasoning effort level passed to supporting OpenAI models.
| ATTRIBUTE | DESCRIPTION |
|---|---|
NONE |
Disable extended reasoning.
|
MINIMAL |
Minimal reasoning tokens.
|
LOW |
Low reasoning effort.
|
MEDIUM |
Medium reasoning effort (balanced).
|
HIGH |
High reasoning effort.
|
XHIGH |
Maximum reasoning effort.
|
ProviderKind
¶
Bases: StrEnum
Stable identifier for a batch execution provider.
| ATTRIBUTE | DESCRIPTION |
|---|---|
OPENAI |
The built-in OpenAI Batch API provider.
|
RunControlState
¶
Bases: StrEnum
Operator-controlled state used to pause, resume, or cancel a run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
RUNNING |
Normal execution — the runner processes items.
|
PAUSED |
Execution is suspended;
|
CANCEL_REQUESTED |
Cancellation has been requested; the runner drains in-flight batches and marks remaining items as cancelled.
|
RunLifecycleStatus
¶
Bases: StrEnum
Terminal or active lifecycle status for a batch run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
RUNNING |
Run is active and processing items.
|
COMPLETED |
All items completed successfully.
|
COMPLETED_WITH_FAILURES |
Run finished but at least one item reached
|
StorageKind
¶
Bases: StrEnum
Identifier for a state-store backend.
| ATTRIBUTE | DESCRIPTION |
|---|---|
SQLITE |
File-backed SQLite store (default, zero-dependency).
|
POSTGRES |
Shared PostgreSQL control-plane store.
|
MEMORY |
Ephemeral in-memory store (testing / single-process use).
|
ModelResolutionError(module_name, qualname)
¶
Bases: RuntimeError
Raised when a structured-output model class cannot be re-imported.
This happens when a run is resumed in a process that does not have the same Python environment or import path as the process that created it.
| ATTRIBUTE | DESCRIPTION |
|---|---|
module_name |
The
|
qualname |
The
|
Initialise the error with the unresolvable model location.
| PARAMETER | DESCRIPTION |
|---|---|
module_name
|
Module path stored for the structured output class.
TYPE:
|
qualname
|
Qualified name stored for the structured output class.
TYPE:
|
RunNotFinishedError(run_id)
¶
Bases: RuntimeError
Raised when a terminal-only operation is called on a still-running run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
The identifier of the run that is not yet finished.
|
Initialise the error with the run identifier.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Identifier of the run that is not yet in a terminal state.
TYPE:
|
RunPausedError(run_id, control_reason=None)
¶
Bases: RuntimeError
Raised when :meth:~batchor.Run.wait encounters a paused run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
The identifier of the paused run.
|
control_reason |
Optional machine-readable reason the run is paused.
|
Initialise the error with the paused run's identifier.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Identifier of the run that is in a paused control state.
TYPE:
|
control_reason
|
Optional machine-readable pause reason.
TYPE:
|
StructuredOutputSchemaError
¶
Bases: ValueError
Raised when a Pydantic model produces a schema incompatible with OpenAI.
OpenAI structured output requires strict JSON Schema objects with no
anyOf at the root and additionalProperties: false on every object
type. This error is raised during job construction so invalid schemas are
caught before any API calls are made.
ArtifactExportResult(run_id, destination_dir, manifest_path, results_path, exported_artifact_paths)
dataclass
¶
Result returned after exporting retained artifacts for a terminal run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
The run whose artifacts were exported.
TYPE:
|
destination_dir |
Absolute path to the export root directory
(
TYPE:
|
manifest_path |
Absolute path to the generated
TYPE:
|
results_path |
Absolute path to the generated
TYPE:
|
exported_artifact_paths |
Relative artifact paths that were copied.
TYPE:
|
ArtifactPolicy(persist_raw_output_artifacts=True)
dataclass
¶
Controls which provider artifacts are retained after a batch completes.
| ATTRIBUTE | DESCRIPTION |
|---|---|
persist_raw_output_artifacts |
When
TYPE:
|
to_payload()
¶
Serialise the policy to a JSON-compatible dictionary.
| RETURNS | DESCRIPTION |
|---|---|
JSONObject
|
A |
from_payload(payload)
classmethod
¶
Deserialise a previously persisted policy payload.
| PARAMETER | DESCRIPTION |
|---|---|
payload
|
A
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactPolicy
|
A reconstructed :class: |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If |
ArtifactPruneResult(run_id, removed_artifact_paths, missing_artifact_paths, cleared_item_pointers, cleared_batch_pointers=0)
dataclass
¶
Result returned after pruning retained artifacts for a terminal run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
The run whose artifacts were pruned.
TYPE:
|
removed_artifact_paths |
Relative paths of files successfully deleted.
TYPE:
|
missing_artifact_paths |
Relative paths that were recorded in state but not found on disk (already deleted or never written).
TYPE:
|
cleared_item_pointers |
Number of per-item artifact pointer records cleared in state.
TYPE:
|
cleared_batch_pointers |
Number of per-batch artifact pointer records cleared in state (only non-zero when raw output artifacts are pruned).
TYPE:
|
BatchItem(item_id, payload, metadata=dict())
dataclass
¶
Bases: Generic[PayloadT]
One logical unit of work inside a batch run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
item_id |
Caller-assigned identifier, unique within the run. Used as
the basis for the
TYPE:
|
payload |
Arbitrary caller-defined data passed to
TYPE:
|
metadata |
Optional key-value pairs stored alongside the item in state.
The
TYPE:
|
BatchJob(items, build_prompt, provider_config, structured_output=None, schema_name=None, chunk_policy=ChunkPolicy(), retry_policy=RetryPolicy(), batch_metadata=dict(), artifact_policy=ArtifactPolicy())
dataclass
¶
Bases: Generic[PayloadT, ModelT]
Declarative description of a batch run.
Passed to :meth:~batchor.BatchRunner.start or
:meth:~batchor.BatchRunner.run_and_wait to create or resume a durable
run.
| ATTRIBUTE | DESCRIPTION |
|---|---|
items |
An iterable of :class:
TYPE:
|
build_prompt |
Callable that converts a :class:
TYPE:
|
provider_config |
Provider-specific configuration, e.g.
:class:
TYPE:
|
structured_output |
Optional Pydantic model class used to parse and validate each item's response as structured JSON.
TYPE:
|
schema_name |
Optional override for the JSON schema name sent to the provider. Defaults to a snake_case version of the model class name.
TYPE:
|
chunk_policy |
Controls how pending items are split into provider batch files.
TYPE:
|
retry_policy |
Controls retry behaviour for transient failures.
TYPE:
|
batch_metadata |
Arbitrary string key-value pairs attached to every provider batch created for this run.
TYPE:
|
artifact_policy |
Controls which raw provider artifacts are retained.
TYPE:
|
ChunkPolicy(max_requests=50000, max_file_bytes=150 * 1024 * 1024, chars_per_token=4)
dataclass
¶
Submission chunking limits applied before provider batches are created.
A batch of pending items is split into one or more provider batches such that each chunk respects all three limits simultaneously.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_requests |
Maximum number of request lines per provider batch file.
Defaults to
TYPE:
|
max_file_bytes |
Maximum size in bytes of a single batch input file.
Defaults to
TYPE:
|
chars_per_token |
Fallback characters-per-token ratio used for token estimation when tiktoken is unavailable.
TYPE:
|
ItemFailure(error_class, message, retryable, raw_error=None)
dataclass
¶
Structured failure payload attached to a terminal item result.
| ATTRIBUTE | DESCRIPTION |
|---|---|
error_class |
Short machine-readable failure category, e.g.
TYPE:
|
message |
Human-readable description of the failure.
TYPE:
|
retryable |
TYPE:
|
raw_error |
Optional raw error payload from the provider or parser, preserved verbatim for debugging.
TYPE:
|
OpenAIEnqueueLimitConfig(enqueued_token_limit=0, target_ratio=0.7, headroom=0, max_batch_enqueued_tokens=0)
dataclass
¶
OpenAI-specific token budgeting controls for batch submission.
When enqueued_token_limit is non-zero, the runner estimates the
token footprint of each submitted batch and refuses to enqueue more
tokens than the effective budget allows. This prevents hitting the
OpenAI enqueued-token-limit API error.
| ATTRIBUTE | DESCRIPTION |
|---|---|
enqueued_token_limit |
Hard cap (in tokens) on the OpenAI account's
enqueue limit.
TYPE:
|
target_ratio |
Fraction of
TYPE:
|
headroom |
Absolute token buffer subtracted from the limit before
computing the effective budget. Defaults to
TYPE:
|
max_batch_enqueued_tokens |
Optional per-batch token ceiling.
TYPE:
|
to_payload()
¶
Serialise the config to a JSON-compatible dictionary.
| RETURNS | DESCRIPTION |
|---|---|
JSONObject
|
A |
from_payload(payload)
classmethod
¶
Deserialise a previously persisted config payload.
| PARAMETER | DESCRIPTION |
|---|---|
payload
|
A
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OpenAIEnqueueLimitConfig
|
A reconstructed :class: |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If any field has the wrong type in payload. |
OpenAIProviderConfig(model, api_key='', endpoint=OpenAIEndpoint.RESPONSES, completion_window='24h', request_timeout_sec=30, poll_interval_sec=1.0, reasoning_effort=None, enqueue_limits=OpenAIEnqueueLimitConfig())
dataclass
¶
Bases: ProviderConfig
Configuration for the built-in OpenAI Batch provider.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model |
OpenAI model name (e.g.
TYPE:
|
api_key |
OpenAI API key. When empty the runner falls back to the
TYPE:
|
endpoint |
API endpoint path to use for batch requests.
TYPE:
|
completion_window |
Maximum time the OpenAI batch is allowed to run,
e.g.
TYPE:
|
request_timeout_sec |
Timeout in seconds for individual API calls.
TYPE:
|
poll_interval_sec |
Seconds to sleep between polling cycles when
:meth:
TYPE:
|
reasoning_effort |
Reasoning effort level for supporting models.
TYPE:
|
enqueue_limits |
Token-budget configuration that constrains how many tokens may be enqueued at once.
TYPE:
|
to_public_payload()
¶
Serialise the config without secret material.
| RETURNS | DESCRIPTION |
|---|---|
JSONObject
|
A |
JSONObject
|
|
from_payload(payload)
classmethod
¶
Deserialise a previously persisted provider config payload.
| PARAMETER | DESCRIPTION |
|---|---|
payload
|
A
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OpenAIProviderConfig
|
A reconstructed :class: |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If any field has the wrong type in payload. |
PromptParts(prompt, system_prompt=None)
dataclass
¶
Prompt text sent to the provider, with an optional system prompt.
| ATTRIBUTE | DESCRIPTION |
|---|---|
prompt |
The user-turn or primary input text.
TYPE:
|
system_prompt |
Optional system/instructions text. Mapped to
TYPE:
|
RetryPolicy(max_attempts=3, base_backoff_sec=1.0, max_backoff_sec=300.0)
dataclass
¶
Retry limits for item-level and batch-control-plane recovery.
Applies to both individual item failures (retryable provider errors) and
transient batch control-plane failures (rate limits, timeouts). Backoff
uses exponential doubling capped at max_backoff_sec.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_attempts |
Total attempt budget per item before marking it
TYPE:
|
base_backoff_sec |
Starting backoff delay in seconds. Subsequent
failures double this up to
TYPE:
|
max_backoff_sec |
Ceiling on the computed backoff delay in seconds.
TYPE:
|
RunEvent(event_type, run_id, provider_kind=None, data=dict())
dataclass
¶
Observer event emitted by the runner during lifecycle transitions.
Passed to the observer callable supplied to :class:~batchor.BatchRunner
on each notable state change.
| ATTRIBUTE | DESCRIPTION |
|---|---|
event_type |
Short identifier for the event kind, e.g.
TYPE:
|
run_id |
The run that produced this event.
TYPE:
|
provider_kind |
Provider that triggered the event, or
TYPE:
|
data |
Optional extra fields specific to the event type.
TYPE:
|
RunSnapshot(run_id, status, control_state, total_items, completed_items, failed_items, status_counts, active_batches, backoff_remaining_sec, items, control_reason=None)
dataclass
¶
Expanded durable run state including current terminal item payloads.
A superset of :class:RunSummary that additionally includes the list of
terminal item results available at query time. Returned by
:meth:~batchor.Run.snapshot.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
Stable run identifier.
TYPE:
|
status |
Current lifecycle status of the run.
TYPE:
|
control_state |
Operator control state.
TYPE:
|
total_items |
Total number of items registered for this run.
TYPE:
|
completed_items |
Number of items in
TYPE:
|
failed_items |
Number of items in
TYPE:
|
status_counts |
Full per-status item count breakdown.
TYPE:
|
active_batches |
Number of provider batches currently in-flight.
TYPE:
|
backoff_remaining_sec |
Seconds until the next submission is permitted.
TYPE:
|
control_reason |
Optional machine-readable reason for the current control state.
TYPE:
|
items |
All terminal item results available at query time.
TYPE:
|
RunSummary(run_id, status, control_state, total_items, completed_items, failed_items, status_counts, active_batches, backoff_remaining_sec, control_reason=None)
dataclass
¶
Aggregated durable run state without item payload expansion.
Returned by :meth:~batchor.Run.summary and
:meth:~batchor.Run.refresh. Contains counters and lifecycle flags but
not individual item results.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
Stable run identifier.
TYPE:
|
status |
Current lifecycle status of the run.
TYPE:
|
control_state |
Operator control state (running / paused / cancelling).
TYPE:
|
total_items |
Total number of items registered for this run.
TYPE:
|
completed_items |
Number of items in
TYPE:
|
failed_items |
Number of items in
TYPE:
|
status_counts |
Full per-status item count breakdown.
TYPE:
|
active_batches |
Number of provider batches currently in-flight.
TYPE:
|
backoff_remaining_sec |
Seconds until the next submission attempt is
permitted (
TYPE:
|
control_reason |
Optional machine-readable reason for the current
control state, e.g.
TYPE:
|
StructuredItemResult(item_id, status, attempt_count, output=None, output_text=None, raw_response=None, error=None, metadata=dict())
dataclass
¶
Bases: Generic[ModelT]
Terminal result for one structured-output item.
| ATTRIBUTE | DESCRIPTION |
|---|---|
item_id |
Identifier matching the originating :class:
TYPE:
|
status |
Final item status (
TYPE:
|
attempt_count |
Number of provider attempts consumed.
TYPE:
|
output |
Validated Pydantic model instance, or
TYPE:
|
output_text |
Raw text from the provider response before parsing.
TYPE:
|
raw_response |
Full provider response record for debugging.
TYPE:
|
error |
Populated when the item reached
TYPE:
|
metadata |
Item metadata carried through from the source.
TYPE:
|
TerminalResultsExportResult(run_id, destination_path, exported_count, next_after_sequence)
dataclass
¶
Result returned after exporting terminal results to a JSONL file.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
The run whose results were exported.
TYPE:
|
destination_path |
Absolute path to the JSONL file written.
TYPE:
|
exported_count |
Number of result records written in this call.
TYPE:
|
next_after_sequence |
Cursor value for continuing an incremental export.
TYPE:
|
TerminalResultsPage(run_id, items, next_after_sequence)
dataclass
¶
A page of terminal item results for cursor-based streaming.
| ATTRIBUTE | DESCRIPTION |
|---|---|
run_id |
The run these results belong to.
TYPE:
|
items |
Terminal item results in this page.
TYPE:
|
next_after_sequence |
Opaque cursor to pass as
TYPE:
|
TextItemResult(item_id, status, attempt_count, output_text=None, raw_response=None, error=None, metadata=dict())
dataclass
¶
Terminal result for one text-output item.
| ATTRIBUTE | DESCRIPTION |
|---|---|
item_id |
Identifier matching the originating :class:
TYPE:
|
status |
Final item status (
TYPE:
|
attempt_count |
Number of provider attempts consumed.
TYPE:
|
output_text |
Extracted text from the provider response, or
TYPE:
|
raw_response |
Full provider response record for debugging.
TYPE:
|
error |
Populated when the item reached
TYPE:
|
metadata |
Item metadata carried through from the source.
TYPE:
|
BatchProvider
¶
Bases: ABC
Abstract adapter between the batchor runtime and a batch API provider.
Each method maps to one step of the batch submission/polling lifecycle.
The :class:~batchor.OpenAIBatchProvider is the built-in implementation.
build_request_line(*, custom_id, prompt_parts, structured_output=None)
abstractmethod
¶
Build one JSONL request line for a batch input file.
| PARAMETER | DESCRIPTION |
|---|---|
custom_id
|
Unique identifier for this request within the batch.
TYPE:
|
prompt_parts
|
Prompt text and optional system prompt.
TYPE:
|
structured_output
|
Optional structured output schema to embed.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
A
|
class:
TYPE:
|
BatchRequestLine
|
serialised as a JSONL line. |
upload_input_file(input_path)
abstractmethod
¶
Upload a prepared JSONL input file to the provider.
| PARAMETER | DESCRIPTION |
|---|---|
input_path
|
Local path to the JSONL batch input file.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
The provider-assigned file identifier. |
delete_input_file(file_id)
abstractmethod
¶
Best-effort deletion of an uploaded input file.
Implementations should swallow errors — this is a clean-up path.
| PARAMETER | DESCRIPTION |
|---|---|
file_id
|
Provider-assigned file identifier to delete.
TYPE:
|
create_batch(*, input_file_id, metadata=None)
abstractmethod
¶
Create a remote batch from a previously uploaded input file.
| PARAMETER | DESCRIPTION |
|---|---|
input_file_id
|
Provider file identifier for the batch input.
TYPE:
|
metadata
|
Optional key-value metadata attached to the batch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
A
|
class:
TYPE:
|
BatchRemoteRecord
|
initial remote batch state. |
get_batch(batch_id)
abstractmethod
¶
Fetch the current remote state for one batch.
| PARAMETER | DESCRIPTION |
|---|---|
batch_id
|
Provider-assigned batch identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
A
|
class:
TYPE:
|
BatchRemoteRecord
|
status and file identifiers. |
download_file_content(file_id)
abstractmethod
¶
Download a provider-hosted file and return its text content.
| PARAMETER | DESCRIPTION |
|---|---|
file_id
|
Provider-assigned file identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
The file's text content (typically JSONL). |
parse_batch_output(*, output_content, error_content)
abstractmethod
¶
Split raw batch output/error JSONL into success and error maps.
| PARAMETER | DESCRIPTION |
|---|---|
output_content
|
Raw text of the provider output JSONL file, or
TYPE:
|
error_content
|
Raw text of the provider error JSONL file, or
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, JSONObject]
|
A 3-tuple |
dict[str, JSONObject]
|
and errors are dicts keyed by |
list[JSONObject]
|
is the flat list of all parsed records. |
estimate_request_tokens(request_line, *, chars_per_token)
abstractmethod
¶
Estimate the token count for a single request line.
Used by the token-budget enforcement logic to avoid submitting batches that exceed the enqueued-token limit.
| PARAMETER | DESCRIPTION |
|---|---|
request_line
|
The request line whose token count to estimate.
TYPE:
|
chars_per_token
|
Fallback ratio when tiktoken is unavailable.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Estimated token count as a non-negative integer. |
ProviderConfig
¶
Bases: ABC
Abstract configuration for a specific batch provider.
Sub-classes must implement serialisation and expose the stable
:attr:provider_kind identifier used for runtime dispatch and durable
storage.
| ATTRIBUTE | DESCRIPTION |
|---|---|
poll_interval_sec |
Default seconds between polling cycles when
:meth:
TYPE:
|
provider_kind
abstractmethod
property
¶
Stable provider identifier used for runtime dispatch and persistence.
to_payload()
abstractmethod
¶
Serialise provider-specific config to JSON for durable storage.
| RETURNS | DESCRIPTION |
|---|---|
JSONObject
|
A JSON-serialisable dictionary including all configuration fields, |
JSONObject
|
including secrets. Use :meth: |
to_public_payload()
¶
Serialise provider config without secret material for durable storage.
| RETURNS | DESCRIPTION |
|---|---|
JSONObject
|
A JSON-serialisable dictionary safe to persist or log. The default |
JSONObject
|
implementation delegates to :meth: |
JSONObject
|
override this to omit credentials. |
StructuredOutputSchema(name, schema)
dataclass
¶
Validated JSON Schema for a structured output model.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Schema name sent to the provider (snake_case by convention).
TYPE:
|
schema |
Strict JSON Schema object with
TYPE:
|
OpenAIBatchProvider(config, client=None)
¶
Bases: BatchProvider
Built-in provider that adapts batchor jobs to the OpenAI Batch API.
build_request_line(*, custom_id, prompt_parts, structured_output=None)
¶
Build one OpenAI batch JSONL request row for a logical item.
upload_input_file(input_path)
¶
Upload a prepared local JSONL file to OpenAI and return the file id.
delete_input_file(file_id)
¶
Best-effort deletion of an uploaded input file.
create_batch(*, input_file_id, metadata=None)
¶
Create an OpenAI batch from a previously uploaded input file.
get_batch(batch_id)
¶
Fetch the current remote state for one OpenAI batch.
download_file_content(file_id)
¶
Download a provider file and normalize it to text.
parse_jsonl(content)
staticmethod
¶
Parse provider JSONL payloads into JSON objects.
parse_batch_output(*, output_content, error_content)
¶
Split OpenAI output/error payloads into success and error maps.
estimate_request_tokens(request_line, *, chars_per_token)
¶
Estimate submitted tokens for one provider request line.
ProviderRegistry()
¶
Dispatch table mapping provider kinds to factory and loader callables.
Use :func:build_default_provider_registry to obtain a registry pre-loaded
with the built-in OpenAI provider, or construct one manually to register
custom providers.
register(*, kind, factory, loader)
¶
Register a provider factory and config loader for kind.
| PARAMETER | DESCRIPTION |
|---|---|
kind
|
The :class:
TYPE:
|
factory
|
Callable
TYPE:
|
loader
|
Callable
TYPE:
|
create(config)
¶
Create a live provider instance from config.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A :class:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BatchProvider
|
A ready-to-use :class: |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
dump_config(config, *, include_secrets=True)
¶
Serialise config to a JSON object with a provider_kind envelope.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Provider config to serialise.
TYPE:
|
include_secrets
|
When
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
JSONObject
|
A |
load_config(payload)
¶
Deserialise a persisted config payload produced by :meth:dump_config.
| PARAMETER | DESCRIPTION |
|---|---|
payload
|
A
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderConfig
|
A reconstructed :class: |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If |
ValueError
|
If the |
BatchRunner(*, storage=None, provider_registry=None, storage_registry=None, provider_factory=None, observer=None, sleep=None, artifact_store=None, temp_root=None)
¶
Durable orchestrator for creating, resuming, and inspecting batch runs.
Initialize the runner facade and its internal runtime layers.
| PARAMETER | DESCRIPTION |
|---|---|
storage
|
Storage backend instance or registered storage kind.
TYPE:
|
provider_registry
|
Provider registry used for config dumping and provider construction.
TYPE:
|
storage_registry
|
Storage registry used for named storage backends.
TYPE:
|
provider_factory
|
Optional provider factory override for tests.
TYPE:
|
observer
|
Optional run event observer callback.
TYPE:
|
sleep
|
Optional sleep function override for polling loops.
TYPE:
|
artifact_store
|
Artifact store override.
TYPE:
|
temp_root
|
Root used for the default local artifact store.
TYPE:
|
start(job, *, run_id=None)
¶
run_and_wait(job, *, run_id=None)
¶
get_run(run_id)
¶
Rehydrate an existing durable run handle from storage.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Rehydrated durable run handle. |
pause_run(run_id)
¶
Set a run's control state to paused.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Durable run handle with updated control state. |
resume_run(run_id)
¶
Resume a previously paused run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Durable run handle with updated control state. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the run is already cancelling. |
cancel_run(run_id)
¶
Request cancellation of an active run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Durable run handle with updated control state. |
read_terminal_results(run_id, *, after_sequence=0, limit=None)
¶
Read a page of terminal item results for cursor-based streaming.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
after_sequence
|
Terminal-result cursor from the previous page.
TYPE:
|
limit
|
Optional maximum number of items to return.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TerminalResultsPage
|
Terminal results page. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
export_terminal_results(run_id, *, destination, after_sequence=0, append=True, limit=None)
¶
Export terminal item results to a JSONL file.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
destination
|
Destination JSONL path.
TYPE:
|
after_sequence
|
Terminal-result cursor from the previous page.
TYPE:
|
append
|
Whether to append to the destination file.
TYPE:
|
limit
|
Optional maximum number of items to export.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TerminalResultsExportResult
|
Terminal results export report. |
export_artifacts(run_id, *, destination_dir)
¶
Export retained run artifacts and a results manifest for a terminal run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
destination_dir
|
Directory that will receive the exported run bundle.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactExportResult
|
Artifact export report. |
prune_artifacts(run_id, *, include_raw_output_artifacts=False)
¶
Remove retained artifacts for a terminal run and clear stored pointers.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
include_raw_output_artifacts
|
Whether raw provider payloads should also be pruned.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactPruneResult
|
Artifact prune report. |
Run(*, runner, run_id, summary)
¶
Durable handle for polling, waiting on, and inspecting one batch run.
Initialize the durable run handle.
| PARAMETER | DESCRIPTION |
|---|---|
runner
|
Owning runner facade.
TYPE:
|
run_id
|
Durable run identifier.
TYPE:
|
summary
|
Cached summary for the run.
TYPE:
|
status
property
¶
Return the cached lifecycle status for the run.
| RETURNS | DESCRIPTION |
|---|---|
RunLifecycleStatus
|
Cached lifecycle status. |
control_state
property
¶
Return the cached operator control state for the run.
| RETURNS | DESCRIPTION |
|---|---|
RunControlState
|
Cached control state. |
control_reason
property
¶
Return the cached operator control reason for the run.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
Cached machine-readable control reason, if one is set. |
is_finished
property
¶
Return whether the run is in a terminal lifecycle state.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the run is terminal. |
refresh()
¶
Perform one poll-and-submit cycle and return the updated summary.
| RETURNS | DESCRIPTION |
|---|---|
RunSummary
|
Updated run summary. |
wait(*, timeout=None, poll_interval=None)
¶
Block until the run is terminal or the optional timeout expires.
| PARAMETER | DESCRIPTION |
|---|---|
timeout
|
Optional timeout in seconds.
TYPE:
|
poll_interval
|
Optional polling interval override in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
The same run handle after it becomes terminal. |
| RAISES | DESCRIPTION |
|---|---|
RunPausedError
|
If the run is paused while waiting. |
TimeoutError
|
If the timeout elapses before the run is terminal. |
summary()
¶
Read the latest persisted summary for the run from storage.
| RETURNS | DESCRIPTION |
|---|---|
RunSummary
|
Latest persisted run summary. |
snapshot()
¶
Return the current summary plus expanded terminal item payloads.
| RETURNS | DESCRIPTION |
|---|---|
RunSnapshot
|
Snapshot containing the latest summary plus materialized results. |
results()
¶
Return terminal item results for the run.
| RETURNS | DESCRIPTION |
|---|---|
list[BatchResultItem]
|
Terminal item results in durable item order. |
| RAISES | DESCRIPTION |
|---|---|
RunNotFinishedError
|
If the run is not terminal yet. |
prune_artifacts(*, include_raw_output_artifacts=False)
¶
Prune retained artifacts for this terminal run.
| PARAMETER | DESCRIPTION |
|---|---|
include_raw_output_artifacts
|
Whether raw provider payloads should also be pruned.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactPruneResult
|
Artifact prune report. |
export_artifacts(destination_dir)
¶
Export retained artifacts for this terminal run.
| PARAMETER | DESCRIPTION |
|---|---|
destination_dir
|
Directory that will receive the exported run bundle.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactExportResult
|
Artifact export report. |
pause()
¶
Suspend execution of this run.
| RETURNS | DESCRIPTION |
|---|---|
Updated
|
class:
TYPE:
|
RunSummary
|
|
resume()
¶
Resume this run after it has been paused.
| RETURNS | DESCRIPTION |
|---|---|
Updated
|
class:
TYPE:
|
RunSummary
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the run is in |
cancel()
¶
Request cancellation of this run.
| RETURNS | DESCRIPTION |
|---|---|
Updated
|
class:
TYPE:
|
RunSummary
|
|
read_terminal_results(*, after_sequence=0, limit=None)
¶
Read a page of terminal item results using cursor-based pagination.
| PARAMETER | DESCRIPTION |
|---|---|
after_sequence
|
Cursor from the previous page's
TYPE:
|
limit
|
Maximum number of results to return per call.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
A
|
class:
TYPE:
|
TerminalResultsPage
|
cursor. |
export_terminal_results(destination, *, after_sequence=0, append=True, limit=None)
¶
Export terminal item results to a JSONL file.
| PARAMETER | DESCRIPTION |
|---|---|
destination
|
Path to the output JSONL file.
TYPE:
|
after_sequence
|
Cursor from a previous call.
TYPE:
|
append
|
When
TYPE:
|
limit
|
Maximum number of results to export per call.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
A
|
class: |
TerminalResultsExportResult
|
path, export count, and updated cursor. |
StructuredOutputError(error_class, message, *, raw_error=None)
¶
Bases: ValueError
Raised when a provider response cannot be parsed as the expected model.
| ATTRIBUTE | DESCRIPTION |
|---|---|
error_class |
Short machine-readable error category (e.g.
|
message |
Human-readable error description.
|
raw_error |
Optional raw payload captured for debugging.
|
Initialise the error.
| PARAMETER | DESCRIPTION |
|---|---|
error_class
|
Short machine-readable error category.
TYPE:
|
message
|
Human-readable description of the parse failure.
TYPE:
|
raw_error
|
Optional raw payload (parsed JSON or raw text) for debugging.
TYPE:
|
CheckpointedItemSource
¶
Bases: ItemSource[PayloadT], Generic[PayloadT]
Item source with durable checkpoint support for mid-stream resumption.
The runner persists the checkpoint returned by each
:class:CheckpointedBatchItem to state. On resume it calls
:meth:iter_from_checkpoint with the last persisted checkpoint so
iteration continues exactly where it left off.
source_identity()
abstractmethod
¶
Return a stable identity used to validate resume compatibility.
initial_checkpoint()
abstractmethod
¶
Return the checkpoint that represents the start of the source.
| RETURNS | DESCRIPTION |
|---|---|
JSONValue
|
An opaque :data: |
JSONValue
|
meth: |
iter_from_checkpoint(checkpoint)
abstractmethod
¶
Yield items plus the next opaque checkpoint after each durable item.
| PARAMETER | DESCRIPTION |
|---|---|
checkpoint
|
The checkpoint at which to resume. Must be a value
previously returned by a :class:
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
CheckpointedBatchItem[PayloadT]
|
class: |
checkpoint_is_complete(checkpoint)
¶
Return whether checkpoint is known to be at the end of the source.
Implementations can override this when they can answer from cheap metadata. The default is conservative because arbitrary checkpoint payloads may not encode source length.
ItemSource
¶
Bases: ABC, Generic[PayloadT]
Abstract iterable source of :class:~batchor.BatchItem objects.
Implement this class when your source does not need resume support. For
durable checkpointing use :class:CheckpointedItemSource instead.
CompositeItemSource(sources)
¶
Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]
Compose ordered checkpointed sources into one logical deterministic source.
CsvItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')
¶
Bases: ResumableItemSource[PayloadT], Generic[PayloadT]
Stream BatchItem values from a CSV file with durable resume support.
JsonlItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')
¶
Bases: ResumableItemSource[PayloadT], Generic[PayloadT]
Stream BatchItem values from a JSONL file with durable resume support.
ParquetItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, columns=None)
¶
Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]
Stream :class:~batchor.BatchItem values from a Parquet file.
Uses pyarrow to read row groups lazily, checkpointing at the
(row_group_index, row_index_within_group) level to support efficient
resumption within large files.
| ATTRIBUTE | DESCRIPTION |
|---|---|
path |
Path to the Parquet file.
|
item_id_from_row |
Callable that derives a unique item ID from a row.
|
payload_from_row |
Callable that converts a row to the item payload.
|
metadata_from_row |
Optional callable for per-item metadata.
|
columns |
Optional list of column names to read (projection push-down).
|
checkpoint_is_complete(checkpoint)
¶
Return whether a Parquet checkpoint points beyond the last row group.
StorageRegistry()
¶
Dispatch table mapping storage kinds to factory callables.
Use :func:build_default_storage_registry to obtain a registry
pre-loaded with SQLite, Postgres, and in-memory backends, or construct
one manually to register custom backends.
register(*, kind, factory)
¶
Register a storage factory for kind.
| PARAMETER | DESCRIPTION |
|---|---|
kind
|
The :class:
TYPE:
|
factory
|
Zero-argument callable that returns a
:class:
TYPE:
|
create(kind)
¶
Create a storage backend of the given kind.
| PARAMETER | DESCRIPTION |
|---|---|
kind
|
A :class:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StateStore
|
A ready-to-use :class: |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If kind is not registered or is not a valid
:class: |
SQLiteStorage(*, name='default', path=None, now=None, engine=None, provider_registry=None)
¶
Bases: SQLiteResultsMixin, SQLiteLifecycleMixin, SQLiteQueryMixin, StateStore
SQLite-backed :class:~batchor.StateStore.
The default state store for batchor runs. Uses a single SQLite database file with WAL journal mode for concurrent reads and a companion artifact directory for JSONL files.
| ATTRIBUTE | DESCRIPTION |
|---|---|
path |
Absolute path to the SQLite database file.
|
engine |
SQLAlchemy engine used for all database operations.
|
provider_registry |
Registry used to deserialise provider configs on run resume.
|
Initialise or open a SQLite storage backend.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logical name used to derive the default file path
(
TYPE:
|
path
|
Explicit path to the database file.
TYPE:
|
now
|
Optional clock override for testing. Defaults to
TYPE:
|
engine
|
Optional pre-built SQLAlchemy engine. When provided, WAL and busy-timeout pragmas are not configured automatically.
TYPE:
|
provider_registry
|
Registry for deserialising provider configs on run resume. Defaults to the built-in registry.
TYPE:
|
default_path(name)
staticmethod
¶
Return the default database file path for a given store name.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logical store name. Whitespace-only names fall back to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
|
close()
¶
Dispose the SQLAlchemy connection pool and release resources.
MemoryStateStore(*, now=None)
¶
Bases: StateStore
Ephemeral in-memory implementation of :class:~batchor.StateStore.
All state is held in plain Python dictionaries. No persistence occurs; data is lost when the store is garbage-collected or the process exits.
Thread safety: individual method calls are not locked. Do not share a single instance across threads without external synchronisation.
StateStore
¶
Bases: ABC
Abstract interface for durable run state storage.
All methods are synchronous and transactional within the scope of a single call. Implementations must be safe to use from multiple threads provided callers do not share a connection outside a method call.
The two primary implementations are:
- :class:
~batchor.SQLiteStorage— file-backed, zero-dependency (default). - :class:
~batchor.PostgresStorage— shared Postgres control plane.
has_run(*, run_id)
abstractmethod
¶
Return True if a run with run_id exists in storage.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier to check.
TYPE:
|
create_run(*, run_id, config, items)
abstractmethod
¶
Create a new run record in storage.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Unique identifier for the new run.
TYPE:
|
config
|
Serialised run configuration.
TYPE:
|
items
|
Optional initial batch of materialized items to insert.
TYPE:
|
append_items(*, run_id, items)
abstractmethod
¶
Append additional items to an existing run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Identifier of the run to extend.
TYPE:
|
items
|
Materialized items to append.
TYPE:
|
append_items_with_ingest_checkpoint(*, run_id, items, next_item_index, checkpoint_payload=None, ingestion_complete)
¶
Append materialized items and advance the ingest checkpoint together.
Durable stores should override this to make the item rows and checkpoint update one transaction. The default keeps custom stores source-compatible but is not crash-atomic.
set_ingest_checkpoint(*, run_id, checkpoint)
abstractmethod
¶
Persist the initial ingest checkpoint for a checkpointed source.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
checkpoint
|
Checkpoint representing the start-of-source state.
TYPE:
|
get_ingest_checkpoint(*, run_id)
abstractmethod
¶
Return the current ingest checkpoint, or None if not set.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
update_ingest_checkpoint(*, run_id, next_item_index, checkpoint_payload=None, ingestion_complete)
abstractmethod
¶
Advance the ingest checkpoint after a batch of items is ingested.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
next_item_index
|
0-based index of the next item to ingest.
TYPE:
|
checkpoint_payload
|
Updated source-specific checkpoint payload.
TYPE:
|
ingestion_complete
|
TYPE:
|
get_run_config(*, run_id)
abstractmethod
¶
Return the persisted configuration for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
get_run_control_state(*, run_id)
abstractmethod
¶
Return the current operator control state for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
set_run_control_state(*, run_id, control_state, control_reason=None)
abstractmethod
¶
Persist a new operator control state for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
control_state
|
The new control state to set.
TYPE:
|
control_reason
|
Optional machine-readable reason for the control
state. Cleared by callers when returning to
TYPE:
|
claim_items_for_submission(*, run_id, max_attempts, limit=None)
abstractmethod
¶
Atomically claim a batch of pending items for the submission cycle.
Items are moved from PENDING to QUEUED_LOCAL. Only items
whose attempt count is less than max_attempts are eligible.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
max_attempts
|
Maximum attempts per item.
TYPE:
|
limit
|
Maximum number of items to claim.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[ClaimedItem]
|
List of claimed items (may be empty if none are pending). |
release_items_to_pending(*, run_id, item_ids)
abstractmethod
¶
Return locally-queued items to PENDING state.
Called when a submission cycle is interrupted before all claimed items could be submitted.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
item_ids
|
Identifiers of items to release.
TYPE:
|
requeue_local_items(*, run_id)
abstractmethod
¶
Re-queue any items stuck in QUEUED_LOCAL back to PENDING.
Called when a run handle is rehydrated to recover items claimed by a previous process that terminated unexpectedly.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of items re-queued. |
record_request_artifacts(*, run_id, pointers)
abstractmethod
¶
Persist per-item request artifact pointers.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
pointers
|
Artifact pointer records to persist.
TYPE:
|
get_request_artifact_paths(*, run_id)
abstractmethod
¶
Return all distinct request artifact paths recorded for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
clear_request_artifact_pointers(*, run_id, artifact_paths)
abstractmethod
¶
Clear per-item request artifact pointers for the given paths.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
artifact_paths
|
Relative artifact paths to clear.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of pointer records cleared. |
record_batch_artifacts(*, run_id, pointers)
abstractmethod
¶
Persist batch output and error artifact pointers.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
pointers
|
Batch artifact pointer records to persist.
TYPE:
|
get_artifact_inventory(*, run_id)
abstractmethod
¶
Return the complete artifact inventory for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
clear_batch_artifact_pointers(*, run_id, artifact_paths)
abstractmethod
¶
Clear batch artifact pointers for the given paths.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
artifact_paths
|
Relative artifact paths to clear.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of pointer records cleared. |
mark_artifacts_exported(*, run_id, export_root)
abstractmethod
¶
Record that artifacts have been exported to export_root.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
export_root
|
Absolute path to the export directory.
TYPE:
|
register_batch(*, run_id, local_batch_id, provider_batch_id, status, custom_ids)
abstractmethod
¶
Record a newly submitted provider batch.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
local_batch_id
|
Locally generated batch identifier.
TYPE:
|
provider_batch_id
|
Provider-assigned batch identifier.
TYPE:
|
status
|
Initial batch status string.
TYPE:
|
custom_ids
|
TYPE:
|
mark_items_submitted(*, run_id, provider_batch_id, submissions)
abstractmethod
¶
Transition items from QUEUED_LOCAL to SUBMITTED.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
provider_batch_id
|
Batch that now owns these items.
TYPE:
|
submissions
|
Submission records including
TYPE:
|
update_batch_status(*, run_id, provider_batch_id, status, output_file_id=None, error_file_id=None)
abstractmethod
¶
Persist the latest polled status for a provider batch.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
provider_batch_id
|
Batch to update.
TYPE:
|
status
|
New status string from the provider.
TYPE:
|
output_file_id
|
Provider file ID for output, if now available.
TYPE:
|
error_file_id
|
Provider file ID for errors, if now available.
TYPE:
|
get_active_batches(*, run_id)
abstractmethod
¶
Return all in-flight (non-terminal) batch records for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
get_submitted_custom_ids_for_batch(*, run_id, provider_batch_id)
abstractmethod
¶
Return all custom_id values submitted in provider_batch_id.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
provider_batch_id
|
Batch to query.
TYPE:
|
mark_items_completed(*, run_id, completions)
abstractmethod
¶
Transition items to COMPLETED and persist their results.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
completions
|
Completed item records to persist.
TYPE:
|
mark_items_failed(*, run_id, failures, max_attempts)
abstractmethod
¶
Transition items after a provider-level failure.
Items whose attempt count reaches max_attempts are moved to
FAILED_PERMANENT; others are moved to FAILED_RETRYABLE and
will be re-queued on the next submission cycle.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
failures
|
Failure records identified by
TYPE:
|
max_attempts
|
Attempt ceiling from the retry policy.
TYPE:
|
mark_queued_items_failed(*, run_id, failures, max_attempts)
abstractmethod
¶
Fail items that are still in the local queue (pre-submission).
Used for token-budget rejections where items never received a
custom_id.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
failures
|
Failure records identified by
TYPE:
|
max_attempts
|
Attempt ceiling from the retry policy.
TYPE:
|
reset_batch_items_to_pending(*, run_id, provider_batch_id, error)
abstractmethod
¶
Reset all items in a failed batch to pending for retry.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
provider_batch_id
|
The batch that failed or was cancelled.
TYPE:
|
error
|
Failure details to record on each item.
TYPE:
|
get_active_submitted_token_estimate(*, run_id)
abstractmethod
¶
Return the total token estimate for all currently submitted items.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
record_batch_retry_failure(*, run_id, error_class, base_delay_sec, max_delay_sec)
abstractmethod
¶
Record a batch control-plane failure and advance the backoff state.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
error_class
|
Short error category from the failure.
TYPE:
|
base_delay_sec
|
Base backoff delay.
TYPE:
|
max_delay_sec
|
Maximum backoff delay.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Updated
|
class:
TYPE:
|
clear_batch_retry_backoff(*, run_id)
abstractmethod
¶
Reset the control-plane backoff state after a successful operation.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
get_batch_retry_backoff_remaining_sec(*, run_id)
abstractmethod
¶
Return the seconds remaining until the next retry is permitted.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
float
|
Remaining backoff in seconds, or |
get_run_summary(*, run_id)
abstractmethod
¶
Compute and return the aggregated summary for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
get_item_records(*, run_id)
abstractmethod
¶
Return all item records for a run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
get_terminal_item_records(*, run_id, after_sequence, limit=None)
abstractmethod
¶
Return terminal item records using cursor-based pagination.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
after_sequence
|
Return only records whose
TYPE:
|
limit
|
Maximum number of records to return.
TYPE:
|
mark_nonterminal_items_cancelled(*, run_id, error)
abstractmethod
¶
Move all non-terminal items to FAILED_PERMANENT with error.
Called when a cancellation is finalised.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Run identifier.
TYPE:
|
error
|
Cancellation failure details to attach to each item.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of items that were cancelled. |
build_default_provider_registry()
¶
Create a :class:ProviderRegistry pre-loaded with the OpenAI provider.
| RETURNS | DESCRIPTION |
|---|---|
A
|
class:
TYPE:
|
ProviderRegistry
|
registered. |
default_schema_name(model)
¶
Derive a snake_case schema name from a Pydantic model class.
Leading underscores are stripped before conversion. Double underscores produced by the conversion are collapsed.
| PARAMETER | DESCRIPTION |
|---|---|
model
|
The Pydantic model class.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
A snake_case string suitable for use as an OpenAI schema name. |
Example
default_schema_name(MyOutputModel) 'my_output_model'
model_output_schema(model, *, schema_name=None)
¶
Derive a validated strict JSON Schema from a Pydantic model.
Calls :func:validate_strict_json_schema to ensure the schema is
compatible with OpenAI's structured output API before returning it.
| PARAMETER | DESCRIPTION |
|---|---|
model
|
Pydantic model class to derive the schema from.
TYPE:
|
schema_name
|
Optional explicit schema name. When
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
A 2-tuple |
JSONObject
|
resolved name and schema is the strict JSON Schema object. |
| RAISES | DESCRIPTION |
|---|---|
StructuredOutputSchemaError
|
If the schema is not compatible with OpenAI's strict structured output requirements. |
build_default_storage_registry(*, provider_registry=None)
¶
Create a :class:StorageRegistry pre-loaded with all built-in backends.
Registers:
StorageKind.SQLITE→ :class:~batchor.SQLiteStorage(default store)StorageKind.POSTGRES→ :class:~batchor.PostgresStorageStorageKind.MEMORY→ :class:~batchor.MemoryStateStore
| PARAMETER | DESCRIPTION |
|---|---|
provider_registry
|
Optional provider registry forwarded to backends that need it for config round-tripping on resume.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
A
|
class:
TYPE:
|
Runner¶
BatchRunner is the main entrypoint for the Python API, and Run is the durable handle returned by start() or get_run().
batchor.runtime.runner
¶
BatchRunner public facade over the internal runtime execution layers.
BatchRunner(*, storage=None, provider_registry=None, storage_registry=None, provider_factory=None, observer=None, sleep=None, artifact_store=None, temp_root=None)
¶
Durable orchestrator for creating, resuming, and inspecting batch runs.
Initialize the runner facade and its internal runtime layers.
| PARAMETER | DESCRIPTION |
|---|---|
storage
|
Storage backend instance or registered storage kind.
TYPE:
|
provider_registry
|
Provider registry used for config dumping and provider construction.
TYPE:
|
storage_registry
|
Storage registry used for named storage backends.
TYPE:
|
provider_factory
|
Optional provider factory override for tests.
TYPE:
|
observer
|
Optional run event observer callback.
TYPE:
|
sleep
|
Optional sleep function override for polling loops.
TYPE:
|
artifact_store
|
Artifact store override.
TYPE:
|
temp_root
|
Root used for the default local artifact store.
TYPE:
|
start(job, *, run_id=None)
¶
run_and_wait(job, *, run_id=None)
¶
get_run(run_id)
¶
Rehydrate an existing durable run handle from storage.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Rehydrated durable run handle. |
pause_run(run_id)
¶
Set a run's control state to paused.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Durable run handle with updated control state. |
resume_run(run_id)
¶
Resume a previously paused run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Durable run handle with updated control state. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the run is already cancelling. |
cancel_run(run_id)
¶
Request cancellation of an active run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Run
|
Durable run handle with updated control state. |
read_terminal_results(run_id, *, after_sequence=0, limit=None)
¶
Read a page of terminal item results for cursor-based streaming.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
after_sequence
|
Terminal-result cursor from the previous page.
TYPE:
|
limit
|
Optional maximum number of items to return.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TerminalResultsPage
|
Terminal results page. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
export_terminal_results(run_id, *, destination, after_sequence=0, append=True, limit=None)
¶
Export terminal item results to a JSONL file.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
destination
|
Destination JSONL path.
TYPE:
|
after_sequence
|
Terminal-result cursor from the previous page.
TYPE:
|
append
|
Whether to append to the destination file.
TYPE:
|
limit
|
Optional maximum number of items to export.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TerminalResultsExportResult
|
Terminal results export report. |
export_artifacts(run_id, *, destination_dir)
¶
Export retained run artifacts and a results manifest for a terminal run.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
destination_dir
|
Directory that will receive the exported run bundle.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactExportResult
|
Artifact export report. |
prune_artifacts(run_id, *, include_raw_output_artifacts=False)
¶
Remove retained artifacts for a terminal run and clear stored pointers.
| PARAMETER | DESCRIPTION |
|---|---|
run_id
|
Durable run identifier.
TYPE:
|
include_raw_output_artifacts
|
Whether raw provider payloads should also be pruned.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ArtifactPruneResult
|
Artifact prune report. |
OpenAI provider¶
This is the built-in provider implementation. Most consumers only need OpenAIProviderConfig, but the lower-level provider class is documented here for extension work and tests.
batchor.providers.openai
¶
OpenAI Batch API provider implementation.
Adapts batchor's generic batch model to the OpenAI Batch API. Supports both
the /v1/responses (default) and /v1/chat/completions endpoints, with
optional structured JSON output via OpenAI's json_schema response format.
OpenAIBatchProvider(config, client=None)
¶
Bases: BatchProvider
Built-in provider that adapts batchor jobs to the OpenAI Batch API.
build_request_line(*, custom_id, prompt_parts, structured_output=None)
¶
Build one OpenAI batch JSONL request row for a logical item.
upload_input_file(input_path)
¶
Upload a prepared local JSONL file to OpenAI and return the file id.
delete_input_file(file_id)
¶
Best-effort deletion of an uploaded input file.
create_batch(*, input_file_id, metadata=None)
¶
Create an OpenAI batch from a previously uploaded input file.
get_batch(batch_id)
¶
Fetch the current remote state for one OpenAI batch.
download_file_content(file_id)
¶
Download a provider file and normalize it to text.
parse_jsonl(content)
staticmethod
¶
Parse provider JSONL payloads into JSON objects.
parse_batch_output(*, output_content, error_content)
¶
Split OpenAI output/error payloads into success and error maps.
estimate_request_tokens(request_line, *, chars_per_token)
¶
Estimate submitted tokens for one provider request line.
resolve_openai_api_key(config)
¶
Resolve credentials from explicit config first, then the environment.
Sources¶
These sources support durable resume through source fingerprints and checkpoints.
batchor.sources.base
¶
Abstract base classes for item source adapters.
Item sources are responsible for streaming :class:~batchor.BatchItem objects
to the runner. The hierarchy provides three levels of capability:
- :class:
ItemSource— a plain iterable (no resume support). - :class:
CheckpointedItemSource— supports durable checkpoints for resuming mid-stream after a process restart. - :class:
ResumableItemSource— a convenience sub-class of :class:CheckpointedItemSourcethat uses a monotonic integer index as the checkpoint (suitable for ordered, seekable sources like CSV and JSONL files).
SourceIdentity(source_kind, source_ref, source_fingerprint)
dataclass
¶
Stable identity for a checkpointed source, used to validate resumability.
When a run is resumed the runner compares the stored identity against the current source's identity to detect mismatches (e.g. the source file was replaced).
| ATTRIBUTE | DESCRIPTION |
|---|---|
source_kind |
Short identifier for the source type (e.g.
TYPE:
|
source_ref |
Human-readable reference to the source (e.g. absolute file path).
TYPE:
|
source_fingerprint |
Content fingerprint (e.g. SHA-256 of size + mtime) used to detect source mutations.
TYPE:
|
IndexedBatchItem(item_index, item)
dataclass
¶
Bases: Generic[PayloadT]
A :class:~batchor.BatchItem paired with its 0-based source index.
| ATTRIBUTE | DESCRIPTION |
|---|---|
item_index |
0-based position of this item within the source.
TYPE:
|
item |
The batch item.
TYPE:
|
CheckpointedBatchItem(next_checkpoint, item)
dataclass
¶
Bases: Generic[PayloadT]
A :class:~batchor.BatchItem paired with an opaque resume checkpoint.
The checkpoint is an opaque :data:~batchor.core.types.JSONValue that,
when passed back to
:meth:~batchor.CheckpointedItemSource.iter_from_checkpoint, resumes
iteration at the item after this one.
| ATTRIBUTE | DESCRIPTION |
|---|---|
next_checkpoint |
Checkpoint to persist after processing this item.
TYPE:
|
item |
The batch item.
TYPE:
|
ItemSource
¶
Bases: ABC, Generic[PayloadT]
Abstract iterable source of :class:~batchor.BatchItem objects.
Implement this class when your source does not need resume support. For
durable checkpointing use :class:CheckpointedItemSource instead.
CheckpointedItemSource
¶
Bases: ItemSource[PayloadT], Generic[PayloadT]
Item source with durable checkpoint support for mid-stream resumption.
The runner persists the checkpoint returned by each
:class:CheckpointedBatchItem to state. On resume it calls
:meth:iter_from_checkpoint with the last persisted checkpoint so
iteration continues exactly where it left off.
source_identity()
abstractmethod
¶
Return a stable identity used to validate resume compatibility.
initial_checkpoint()
abstractmethod
¶
Return the checkpoint that represents the start of the source.
| RETURNS | DESCRIPTION |
|---|---|
JSONValue
|
An opaque :data: |
JSONValue
|
meth: |
iter_from_checkpoint(checkpoint)
abstractmethod
¶
Yield items plus the next opaque checkpoint after each durable item.
| PARAMETER | DESCRIPTION |
|---|---|
checkpoint
|
The checkpoint at which to resume. Must be a value
previously returned by a :class:
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
CheckpointedBatchItem[PayloadT]
|
class: |
checkpoint_is_complete(checkpoint)
¶
Return whether checkpoint is known to be at the end of the source.
Implementations can override this when they can answer from cheap metadata. The default is conservative because arbitrary checkpoint payloads may not encode source length.
ResumableItemSource
¶
Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]
Checkpointed source where the checkpoint is a 0-based integer index.
Suitable for ordered seekable sources (CSV, JSONL) where iteration can efficiently skip ahead to a given row index. The checkpoint is simply the index of the next item to yield.
iter_from(item_index)
abstractmethod
¶
Yield items beginning at the given 0-based source item index.
| PARAMETER | DESCRIPTION |
|---|---|
item_index
|
0-based index of the first item to yield. Items before this index are skipped.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
IndexedBatchItem[PayloadT]
|
class: |
batchor.sources.composite
¶
CompositeItemSource(sources)
¶
Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]
Compose ordered checkpointed sources into one logical deterministic source.
batchor.sources.files
¶
File-backed item source implementations (CSV, JSONL, Parquet).
Each source wraps a local file and streams :class:~batchor.BatchItem objects
row by row. All three implementations support durable resumption:
- :class:
CsvItemSourceand :class:JsonlItemSourceuse a monotonic row index as the checkpoint (via :class:~batchor.sources.base.ResumableItemSource). - :class:
ParquetItemSourceuses a(row_group_index, row_index_within_group)checkpoint to resume efficiently within large Parquet files.
Every emitted item carries batchor_lineage metadata recording the source
path, row index, and (where applicable) a primary key and partition ID.
CsvItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')
¶
Bases: ResumableItemSource[PayloadT], Generic[PayloadT]
Stream BatchItem values from a CSV file with durable resume support.
JsonlItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, encoding='utf-8')
¶
Bases: ResumableItemSource[PayloadT], Generic[PayloadT]
Stream BatchItem values from a JSONL file with durable resume support.
ParquetItemSource(path, *, item_id_from_row, payload_from_row, metadata_from_row=None, columns=None)
¶
Bases: CheckpointedItemSource[PayloadT], Generic[PayloadT]
Stream :class:~batchor.BatchItem values from a Parquet file.
Uses pyarrow to read row groups lazily, checkpointing at the
(row_group_index, row_index_within_group) level to support efficient
resumption within large files.
| ATTRIBUTE | DESCRIPTION |
|---|---|
path |
Path to the Parquet file.
|
item_id_from_row |
Callable that derives a unique item ID from a row.
|
payload_from_row |
Callable that converts a row to the item payload.
|
metadata_from_row |
Optional callable for per-item metadata.
|
columns |
Optional list of column names to read (projection push-down).
|
checkpoint_is_complete(checkpoint)
¶
Return whether a Parquet checkpoint points beyond the last row group.
Storage backends¶
The generated reference below describes the built-in storage entrypoints. The state-store protocol and detailed storage lifecycle are explained in Storage & Runs.
batchor.storage.sqlite
¶
Public re-export module for the SQLite storage backend.
Import :class:~batchor.SQLiteStorage from here (or from the top-level
:mod:batchor package) rather than from
:mod:batchor.storage.sqlite_store directly.
SQLiteStorage(*, name='default', path=None, now=None, engine=None, provider_registry=None)
¶
Bases: SQLiteResultsMixin, SQLiteLifecycleMixin, SQLiteQueryMixin, StateStore
SQLite-backed :class:~batchor.StateStore.
The default state store for batchor runs. Uses a single SQLite database file with WAL journal mode for concurrent reads and a companion artifact directory for JSONL files.
| ATTRIBUTE | DESCRIPTION |
|---|---|
path |
Absolute path to the SQLite database file.
|
engine |
SQLAlchemy engine used for all database operations.
|
provider_registry |
Registry used to deserialise provider configs on run resume.
|
Initialise or open a SQLite storage backend.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logical name used to derive the default file path
(
TYPE:
|
path
|
Explicit path to the database file.
TYPE:
|
now
|
Optional clock override for testing. Defaults to
TYPE:
|
engine
|
Optional pre-built SQLAlchemy engine. When provided, WAL and busy-timeout pragmas are not configured automatically.
TYPE:
|
provider_registry
|
Registry for deserialising provider configs on run resume. Defaults to the built-in registry.
TYPE:
|
default_path(name)
staticmethod
¶
Return the default database file path for a given store name.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Logical store name. Whitespace-only names fall back to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
|
close()
¶
Dispose the SQLAlchemy connection pool and release resources.