Documentation Index
Fetch the complete documentation index at: https://docs.primeintellect.ai/llms.txt
Use this file to discover all available pages before exploring further.
Table of Contents
Type Aliases
Messages
Messages = str | list[ChatMessage]
The primary message type. Either a plain string (completion mode) or a list of chat messages (chat mode).
ChatMessage
ChatMessage = ChatCompletionMessageParam # from openai.types.chat
OpenAI’s chat message type with role, content, and optional tool_calls / tool_call_id fields.
SystemMessage
class SystemMessage:
role: Literal["system"] = "system"
content: MessageContent
@classmethod
def from_path(cls, path: str | Path) -> "SystemMessage": ...
Provider-agnostic system message type. Use vf.SystemMessage.from_path(...) to load a system prompt from a UTF-8 text file while preserving the file contents verbatim.
Info
Arbitrary metadata dictionary from dataset rows.
SamplingArgs
SamplingArgs = dict[str, Any]
Generation parameters passed to the inference server (e.g., temperature, top_p, max_tokens).
RewardFunc
IndividualRewardFunc = Callable[..., float | Awaitable[float]]
GroupRewardFunc = Callable[..., list[float] | Awaitable[list[float]]]
RewardFunc = IndividualRewardFunc | GroupRewardFunc
Individual reward functions operate on single rollouts. Group reward functions operate on all rollouts for an example together (useful for relative scoring).
ClientType
ClientType = Literal[
"openai_completions",
"openai_chat_completions",
"openai_chat_completions_token",
"openai_responses",
"renderer",
"anthropic_messages",
"nemorl_chat_completions",
]
Selects which Client implementation to use. Set via ClientConfig.client_type.
Data Types
State
class State(dict):
INPUT_FIELDS = ["prompt", "answer", "info", "example_id"]
A dict subclass that tracks rollout information. Accessing keys in INPUT_FIELDS automatically forwards to the nested input object.
Fields set during initialization:
| Field | Type | Description |
|---|
input | RolloutInput | Nested input data |
client | Client | Client instance |
model | str | Model name |
sampling_args | SamplingArgs | None | Generation parameters |
is_completed | bool | Whether rollout has ended |
is_truncated | bool | Whether generation was truncated |
tool_defs | list[Tool] | None | Available tool definitions |
trajectory | list[TrajectoryStep] | Multi-turn trajectory |
trajectory_id | str | UUID for this rollout |
timing | RolloutTiming | Timing information |
Fields set after scoring:
| Field | Type | Description |
|---|
completion | Messages | None | Final completion |
reward | float | None | Final reward |
advantage | float | None | Advantage over group mean |
metrics | dict[str, float] | None | Per-function metrics |
stop_condition | str | None | Name of triggered stop condition |
error | Error | None | Error if rollout failed |
class RolloutInput(TypedDict):
prompt: Messages # Required
example_id: int # Required
answer: str # Optional
info: Info # Optional
RolloutOutput
class RolloutOutput(dict):
# Required fields
example_id: int
prompt: Messages | None
completion: Messages | None
reward: float
timing: RolloutTiming
is_completed: bool
is_truncated: bool
metrics: dict[str, float]
# Optional fields
answer: str
info: Info
error: str | None
stop_condition: str | None
token_usage: TokenUsage
trajectory: list[TrajectoryStep]
tool_defs: list[Tool] | None
Serialized output from a rollout. This is a dict subclass that provides typed access to known fields while supporting arbitrary additional fields from state_columns. All values must be JSON-serializable. Used in GenerateOutputs and for saving results to disk.
TrajectoryStep
class TrajectoryStep(TypedDict):
prompt: Messages
completion: Messages
response: Response
tokens: TrajectoryStepTokens | None
reward: float | None
advantage: float | None
is_truncated: bool
trajectory_id: str
extras: dict[str, Any]
A single turn in a multi-turn rollout.
TrajectoryStepTokens
class TrajectoryStepTokens(TypedDict):
prompt_ids: list[int]
prompt_mask: list[int]
completion_ids: list[int]
completion_mask: list[int]
completion_logprobs: list[float]
overlong_prompt: bool
is_truncated: bool
routed_experts: list[list[list[int]]] | None # [seq_len, layers, topk] to enable router replay
Token-level data for training.
TimeSpan
class TimeSpan(CustomBaseModel):
"""A timed span. duration = end - start."""
start: float = 0.0 # Unix timestamp (seconds since epoch)
end: float = 0.0 # Unix timestamp (seconds since epoch)
# duration: float (computed_field)
TimeSpans
class TimeSpans(CustomBaseModel):
"""A list of TimeSpan with aggregate duration (sum)."""
spans: list[TimeSpan] = []
# duration: float (computed_field)
RolloutTiming
class RolloutTiming(CustomBaseModel):
"""Rollout-level timing. All values in seconds."""
start_time: float # wall-clock at rollout start
setup: TimeSpan = TimeSpan() # setup_state() span
generation: TimeSpan = TimeSpan() # full generation phase
scoring: TimeSpan = TimeSpan() # rubric.score_*() span
model: TimeSpans = TimeSpans() # all model-call spans
env: TimeSpans = TimeSpans() # all env-response spans
# total, overhead: float (computed_fields)
Derivations:
total = scoring.end - generation.start
overhead = total - setup.duration - model.duration - env.duration - scoring.duration
generation.start is stamped at the top of the rollout (before setup_state), so total covers the entire rollout including setup, generation loop, finalize, and scoring. overhead captures any time not attributed to the named phases.
TokenUsage
class TokenUsage(TypedDict, total=False):
input_tokens: float
output_tokens: float
final_input_tokens: float
final_output_tokens: float
| Field | Description |
|---|
input_tokens | Sum of prompt tokens across all turns. Shared context is counted each time it appears in a prompt. |
output_tokens | Sum of completion tokens across all turns. |
final_input_tokens | Non-completion tokens in the final turn’s context (system prompts, user messages, tool results, etc.). |
final_output_tokens | Completion tokens in the final turn’s context. Equals output_tokens for single-turn rollouts. |
In a single-turn rollout, input_tokens == final_input_tokens and output_tokens == final_output_tokens. In a multi-turn rollout, input_tokens > final_input_tokens because earlier turns’ prompts are counted again.
The final_* metrics assume a single, continuously extended trajectory. Non-linear trajectories (multi-agent, context summarization, history rewriting) are not accounted for.
GenerateOutputs
class GenerateOutputs(TypedDict):
outputs: list[RolloutOutput]
metadata: GenerateMetadata
Output from Environment.generate(). Contains a list of RolloutOutput objects (one per rollout) and generation metadata. Each RolloutOutput is a serialized, JSON-compatible dict containing the rollout’s prompt, completion, answer, reward, metrics, timing, and other per-rollout data.
class VersionInfo(TypedDict):
vf_version: str
vf_commit: str | None
env_version: str | None
env_commit: str | None
class GenerateMetadata(TypedDict):
env_id: str
env_args: dict
model: str
base_url: str
num_examples: int
rollouts_per_example: int
sampling_args: SamplingArgs
date: str
time_ms: float
avg_reward: float
avg_metrics: dict[str, float]
avg_error: float
pass_at_k: dict[str, float]
pass_all_k: dict[str, float]
pass_threshold: float
usage: TokenUsage | None
version_info: VersionInfo
state_columns: list[str]
path_to_save: Path
tools: list[Tool] | None
base_url is always serialized as a string. For multi-endpoint runs (e.g., using ClientConfig.endpoint_configs), it is stored as a comma-separated list of URLs.
version_info captures the verifiers framework version/commit and the environment package version/commit at generation time. Populated automatically by GenerateOutputsBuilder.
RolloutScore / RolloutScores
class RolloutScore(TypedDict):
reward: float
metrics: dict[str, float]
class RolloutScores(TypedDict):
reward: list[float]
metrics: dict[str, list[float]]
Classes
Environment Classes
Environment
class Environment(ABC):
def __init__(
self,
dataset: Dataset | None = None,
eval_dataset: Dataset | None = None,
system_prompt: str | None = None,
few_shot: list[ChatMessage] | None = None,
parser: Parser | None = None,
rubric: Rubric | None = None,
sampling_args: SamplingArgs | None = None,
message_type: MessageType = "chat",
max_workers: int = 512,
env_id: str | None = None,
env_args: dict | None = None,
max_seq_len: int | None = None,
score_rollouts: bool = True,
pass_threshold: float = 0.5,
**kwargs,
): ...
Abstract base class for all environments.
Generation methods:
| Method | Returns | Description |
|---|
generate(inputs, client, model, ...) | GenerateOutputs | Run rollouts asynchronously. client accepts Client | ClientConfig. |
generate_sync(inputs, client, ...) | GenerateOutputs | Synchronous wrapper |
evaluate(client, model, ...) | GenerateOutputs | Evaluate on eval_dataset |
evaluate_sync(client, model, ...) | GenerateOutputs | Synchronous evaluation |
Dataset methods:
| Method | Returns | Description |
|---|
get_dataset(n=-1, seed=None) | Dataset | Get training dataset (optionally first n, shuffled) |
get_eval_dataset(n=-1, seed=None) | Dataset | Get evaluation dataset |
make_dataset(...) | Dataset | Static method to create dataset from inputs |
Rollout methods (used internally or by subclasses):
| Method | Returns | Description |
|---|
rollout(input, client, model, sampling_args) | State | Abstract: run single rollout |
init_state(input, client, model, sampling_args) | State | Create initial state from input |
get_model_response(state, prompt, ...) | Response | Get model response for prompt |
is_completed(state) | bool | Check all stop conditions |
run_rollout(sem, input, client, model, sampling_args) | State | Run rollout with semaphore |
run_group(group_inputs, client, model, ...) | list[State] | Generate and score one group |
Configuration methods:
| Method | Description |
|---|
set_kwargs(**kwargs) | Set attributes using setter methods when available |
set_concurrency(concurrency) | Set concurrency and scale all registered thread-pool executors to match |
add_rubric(rubric) | Add or merge rubric |
set_max_seq_len(max_seq_len) | Set maximum sequence length |
set_score_rollouts(bool) | Enable/disable scoring |
SingleTurnEnv
Single-response Q&A tasks. Inherits from Environment.
MultiTurnEnv
class MultiTurnEnv(Environment):
def __init__(
self,
max_turns: int = -1,
timeout_seconds: float | None = None,
**kwargs,
): ...
Multi-turn interactions. Subclasses must implement env_response.
Abstract method:
async def env_response(self, messages: Messages, state: State, **kwargs) -> Messages:
"""Generate environment feedback after model turn."""
Built-in stop conditions: has_error, prompt_too_long, max_turns_reached, timeout_reached, max_total_completion_tokens_reached, has_final_env_response
Hooks:
| Method | Description |
|---|
setup_state(state) | Initialize per-rollout state |
get_prompt_messages(state) | Customize prompt construction |
render_completion(state) | Customize completion rendering |
add_trajectory_step(state, step) | Customize trajectory handling |
set_max_total_completion_tokens(int) | Set maximum total completion tokens |
class ToolEnv(MultiTurnEnv):
def __init__(
self,
tools: list[Callable] | None = None,
max_turns: int = 10,
error_formatter: Callable[[Exception], str] = lambda e: f"{e}",
stop_errors: list[type[Exception]] | None = None,
**kwargs,
): ...
Tool calling with stateless Python functions. Automatically converts functions to OpenAI tool format.
Built-in stop condition: no_tools_called (ends when model responds without tool calls)
Methods:
| Method | Description |
|---|
add_tool(tool) | Add a tool at runtime |
remove_tool(tool) | Remove a tool at runtime |
call_tool(name, args, id) | Override to customize tool execution |
Tools requiring per-rollout state. Override setup_state and update_tool_args to inject state.
SandboxEnv
class SandboxEnv(StatefulToolEnv):
def __init__(
self,
sandbox_name: str = "sandbox-env",
docker_image: str = "python:3.11-slim",
start_command: str = "tail -f /dev/null",
cpu_cores: int = 1,
memory_gb: int = 2,
disk_size_gb: int = 5,
gpu_count: int = 0,
timeout_minutes: int = 60,
timeout_per_command_seconds: int = 30,
environment_vars: dict[str, str] | None = None,
team_id: str | None = None,
advanced_configs: AdvancedConfigs | None = None,
labels: list[str] | None = None,
**kwargs,
): ...
Sandboxed container execution using prime sandboxes.
Key parameters:
| Parameter | Type | Description |
|---|
sandbox_name | str | Name prefix for sandbox instances |
docker_image | str | Docker image to use for the sandbox |
cpu_cores | int | Number of CPU cores |
memory_gb | int | Memory allocation in GB |
disk_size_gb | int | Disk size in GB |
gpu_count | int | Number of GPUs |
timeout_minutes | int | Sandbox timeout in minutes |
timeout_per_command_seconds | int | Per-command execution timeout |
environment_vars | dict[str, str] | None | Environment variables to set in sandbox |
labels | list[str] | None | Labels for sandbox categorization and filtering |
PythonEnv
Persistent Python REPL in sandbox. Extends SandboxEnv.
OpenEnvEnv
class OpenEnvEnv(MultiTurnEnv):
def __init__(
self,
openenv_project: str | Path,
num_train_examples: int = 100,
num_eval_examples: int = 50,
seed: int = 0,
prompt_renderer: Callable[..., ChatMessages] | None = None,
max_turns: int = -1,
rubric: Rubric | None = None,
**kwargs,
): ...
OpenEnv integration that runs OpenEnv projects in Prime Sandboxes using a prebuilt image manifest (.build.json), supports both gym and MCP contracts, and requires a prompt_renderer to convert observations into chat messages.
SWEDebugEnv
class SWEDebugEnv(SandboxMixin, MultiTurnEnv):
def __init__(
self,
taskset: SandboxTaskSet,
dataset: Any = None,
*,
run_setup: bool = True,
debug_step: Literal["none", "gold_patch", "command", "script"] = "gold_patch",
run_tests: bool = True,
debug_command: str | None = None,
debug_script: str | None = None,
debug_script_path: str | None = None,
debug_timeout: int | None = None,
test_timeout: int = 900,
cpu_cores: int | None = None,
memory_gb: int | None = None,
disk_size_gb: int | None = None,
labels: list[str] | None = None,
timeout_seconds: float = 1800.0,
output_tail_chars: int = 2000,
**sandbox_kwargs,
): ...
No-agent debugger for SWE-style SandboxTaskSet instances. It creates the task sandbox, optionally runs task setup, runs one debug step (none, gold_patch, command, or script), and optionally runs tests and scores the result.
EnvGroup
env_group = vf.EnvGroup(
envs=[env1, env2, env3],
env_names=["math", "code", "qa"] # optional
)
Combines multiple environments for mixed-task training. Combined datasets use
info["env_id"] as internal routing metadata; it is not a top-level input,
state, or output field.
v1 Taskset/Harness Classes
The v1 API is exposed as verifiers.v1 and documented in
BYO Harness. Its core unit is:
state = await harness.run(task, state=None)
Taskset and Env package that runner for datasets, evals, and training.
Task
class Task(dict):
def freeze(self) -> Task: ...
Immutable, JSON-serializable input data. A task is usually created by a
Taskset, but can be run directly through a standalone Harness.
Common top-level fields:
| Field | Description |
|---|
prompt | User/developer/tool messages for the rollout. Must not contain system messages. |
system_prompt | Per-task system messages or string. |
answer | Reference answer or target data. Stays on task, not state. |
info | Serializable metadata. |
max_turns | Per-task base-loop turn limit. |
tools | Tool visibility: {"show": [...]} or {"hide": [...]}. |
toolsets | Toolset visibility or rollout-local toolset config. |
sandbox | Per-task sandbox overrides for sandboxed programs. |
program | Task-owned files, dirs, env, setup, artifacts, bindings, and command args. |
task.runtime is not public schema. Runtime metadata belongs on State.
State
class State(dict):
@classmethod
def for_task(task: Mapping[str, Any], ...) -> State: ...
def stop(self, condition: str = "state_done") -> None: ...
def get_model(self) -> str: ...
def get_client(api: str = "chat_completions", *, sync: bool = False) -> object: ...
def get_endpoint_config(api: str = "chat_completions") -> dict[str, str]: ...
def get_tools() -> dict[str, Callable[..., Awaitable[object]]]: ...
def get_max_turns(default: int) -> int: ...
def finalize() -> State: ...
Mutable rollout output. State starts from a task and accumulates trajectory,
completion, metrics, reward, timing, artifacts, errors, and user-defined
serializable fields.
Framework-managed fields such as is_completed, stop_condition,
is_truncated, and error cannot be written directly. Use state.stop(...) or
raise vf.Error subclasses.
State.for_task(...) can borrow selected active runtime handles from another
state:
child_state = state.for_task(child_task, borrow=["model", "sandbox"], tools="bash")
child_state = await child_harness.run(child_task, child_state)
Borrowed handles are process-local and stripped before state crosses the
serialization boundary.
Taskset
class Taskset:
def __init__(
source=None,
eval_source=None,
taskset_id: str | None = None,
system_prompt=None,
user=None,
toolsets=(),
stops=(),
setups=(),
updates=(),
metrics=(),
rewards=(),
advantages=(),
cleanups=(),
config: TasksetConfig | Mapping[str, object] | None = None,
): ...
def rows() -> list[dict[str, Any]]: ...
def eval_rows() -> list[dict[str, Any]]: ...
def task(row: Mapping[str, Any]) -> Task: ...
def to_task(value: Mapping[str, Any] | Task | str) -> Task: ...
async def init_group(task: Task, num_rollouts: int) -> tuple[list[Task], list[State]]: ...
def get_dataset() -> Dataset: ...
def get_eval_dataset() -> Dataset: ...
Packages task rows and task-owned behavior. source and eval_source may be
iterables or zero-argument loaders. Loaders should close over resolved config
instead of accepting runtime kwargs.
Harness
class Harness:
def __init__(
program=None,
system_prompt=None,
user=None,
sandbox=None,
client=None,
model: str | None = None,
sampling_args: SamplingArgs | None = None,
max_turns: int | None = None,
toolsets=None,
stops=None,
setups=None,
updates=None,
metrics=None,
rewards=None,
advantages=None,
cleanups=None,
config: HarnessConfig | Mapping[str, object] | None = None,
): ...
async def run(task: Task | Mapping[str, Any], state: State | None = None) -> State: ...
async def score_group(tasks: list[Task], states: list[State]) -> list[State]: ...
async def cleanup_group(tasks: list[Task], states: list[State]) -> None: ...
async def teardown() -> None: ...
Runs one task. All model calls go through the v1 interception endpoint so
trajectory capture, sampling args, tool forwarding, and protocol translation use
one path across local Python, sandboxed Python, command programs, and the base
tool loop.
program forms:
| Form | Meaning |
|---|
None | Default endpoint-backed tool loop. |
| callable | In-process Python program with task, state. |
{"base": true, ...} | Explicit default loop, usually with sandbox options. |
{"fn": "pkg.module:run", ...} | Importable Python program. |
{"command": ["cmd", "arg"], ...} | Local or sandboxed command. |
Sandboxed program.fn refs resolve their owning local package from the resolved
module root: single-file modules use pyproject.toml in the same directory as
the module file, and package modules use pyproject.toml inside the package
directory. v1 uploads and installs that package in the program sandbox. Package
dependencies come from normal [project.dependencies].
Env
class Env(vf.Environment):
def __init__(taskset: Taskset, harness: Harness | None = None): ...
Adapter that makes a v1 taskset/harness pair usable by eval and training
workers. If harness is omitted, Env uses the base Harness.
class Toolset:
def __init__(
tools=(),
show=None,
hide=None,
bindings=None,
objects=None,
write: bool = False,
scope: Literal["rollout", "group", "global"] = "rollout",
sandbox=None,
stops=(),
setups=(),
updates=(),
metrics=(),
rewards=(),
advantages=(),
cleanups=(),
config: ToolsetConfig | Mapping[str, object] | None = None,
): ...
class MCPTool:
def __init__(command: str, args=None, env=None, cwd: str | None = None): ...
Toolsets package callable tools, MCP servers, private dependency factories, and
hidden bindings. objects.* bindings are private to the owning toolset/user and
are not directly accessible from state.
v1 Config Models
TasksetConfig.from_toml(path, section=None)
HarnessConfig.from_toml(path, section=None)
ToolsetConfig(...)
SandboxConfig(...)
UserConfig(...)
MCPToolConfig(...)
v1 config models are Pydantic models. Constructors accept config objects or
plain mappings; TOML config uses "module:object" refs for Python callables and
loaders. Unknown fields fail validation.
Parser Classes
Parser
class Parser:
def __init__(self, extract_fn: Callable[[str], str] = lambda x: x): ...
def parse(self, text: str) -> Any: ...
def parse_answer(self, completion: Messages) -> str | None: ...
def get_format_reward_func(self) -> Callable: ...
Base parser. Default behavior returns text as-is.
XMLParser
class XMLParser(Parser):
def __init__(
self,
fields: list[str | tuple[str, ...]],
answer_field: str = "answer",
extract_fn: Callable[[str], str] = lambda x: x,
): ...
Extracts structured fields from XML-tagged output.
parser = vf.XMLParser(fields=["reasoning", "answer"])
# Parses: <reasoning>...</reasoning><answer>...</answer>
# With alternatives:
parser = vf.XMLParser(fields=["reasoning", ("code", "answer")])
# Accepts either <code> or <answer> for second field
Methods:
| Method | Returns | Description |
|---|
parse(text) | SimpleNamespace | Parse XML into object with field attributes |
parse_answer(completion) | str | None | Extract answer field from completion |
get_format_str() | str | Get format description string |
get_fields() | list[str] | Get canonical field names |
format(**kwargs) | str | Format kwargs into XML string |
ThinkParser
class ThinkParser(Parser):
def __init__(self, extract_fn: Callable[[str], str] = lambda x: x): ...
Extracts content after </think> tag. For models that always include <think> tags but don’t parse them automatically.
MaybeThinkParser
Handles optional <think> tags (for models that may or may not think).
Rubric Classes
Rubric
class Rubric:
def __init__(
self,
funcs: list[RewardFunc] | None = None,
weights: list[float] | None = None,
parser: Parser | None = None,
): ...
Combines multiple reward functions with weights. Default weight is 1.0. Functions with weight=0.0 are tracked as metrics only.
Methods:
| Method | Description |
|---|
add_reward_func(func, weight=1.0) | Add a reward function |
add_metric(func, weight=0.0) | Add a metric (no reward contribution) |
add_class_object(name, obj) | Add object accessible in reward functions |
Reward function signature:
def my_reward(
completion: Messages,
answer: str = "",
prompt: Messages | None = None,
state: State | None = None,
parser: Parser | None = None, # if rubric has parser
info: Info | None = None,
**kwargs
) -> float:
...
Group reward function signature:
def my_group_reward(
completions: list[Messages],
answers: list[str],
states: list[State],
# ... plural versions of individual args
**kwargs
) -> list[float]:
...
JudgeRubric
LLM-as-judge evaluation.
MathRubric
Math-specific evaluation using math-verify.
RubricGroup
Combines rubrics for EnvGroup.
Client Classes
Client
class Client(ABC, Generic[ClientT, MessagesT, ResponseT, ToolT]):
def __init__(self, client_or_config: ClientT | ClientConfig) -> None: ...
@property
def client(self) -> ClientT: ...
async def get_response(
self,
prompt: Messages,
model: str,
sampling_args: SamplingArgs,
tools: list[Tool] | None = None,
**kwargs,
) -> Response: ...
async def close(self) -> None: ...
Abstract base class for all model clients. Wraps a provider-specific SDK client and translates between provider-agnostic vf types (Messages, Tool, Response) and provider-native formats. The client property exposes the underlying SDK client (e.g., AsyncOpenAI, AsyncAnthropic).
get_response() is the main public method — it converts the prompt and tools to the native format, calls the provider API, validates the response, and converts it back to a vf.Response. Errors are wrapped in vf.ModelError unless they are already vf.Error or authentication errors.
Abstract methods (for subclass implementors):
| Method | Description |
|---|
setup_client(config) | Create the native SDK client from ClientConfig |
to_native_prompt(messages) | Convert Messages → native prompt format + extra kwargs |
to_native_tool(tool) | Convert Tool → native tool format |
get_native_response(prompt, model, ...) | Call the provider API |
raise_from_native_response(response) | Raise ModelError for invalid responses |
from_native_response(response) | Convert native response → vf.Response |
close() | Close the underlying SDK client |
Built-in Client Implementations
| Class | client_type | SDK Client | Description |
|---|
OpenAIChatCompletionsClient | "openai_chat_completions" | AsyncOpenAI | Chat Completions API (default) |
OpenAICompletionsClient | "openai_completions" | AsyncOpenAI | Legacy Completions API |
OpenAIChatCompletionsTokenClient | "openai_chat_completions_token" | AsyncOpenAI | Custom vLLM token route (/v1/chat/completions/tokens) — server-side templating + token IDs returned alongside content |
OpenAIResponsesClient | "openai_responses" | AsyncOpenAI | OpenAI Responses API |
RendererClient | "renderer" | AsyncOpenAI | Renderer-backed token-in generate client (client-side tokenization via the renderers package) |
AnthropicMessagesClient | "anthropic_messages" | AsyncAnthropic | Anthropic Messages API |
NeMoRLChatCompletionsClient | "nemorl_chat_completions" | AsyncOpenAI | NeMo-RL Chat Completions variant |
All built-in clients are available as vf.OpenAIChatCompletionsClient, vf.AnthropicMessagesClient, etc. RendererClient requires the optional renderer package; install it with uv add "verifiers[renderers]" before importing vf.RendererClient or using client_type="renderer".
Response
class Response(BaseModel):
id: str
created: int
model: str
usage: Usage | None
message: ResponseMessage
class ResponseMessage(BaseModel):
content: str | None
reasoning_content: str | None
finish_reason: Literal["stop", "length", "tool_calls"] | None
is_truncated: bool | None
tokens: ResponseTokens | None
tool_calls: list[ToolCall] | None
Provider-agnostic model response. All Client implementations return Response from get_response().
class Tool(BaseModel):
name: str
description: str
parameters: dict[str, object]
strict: bool | None = None
Provider-agnostic tool definition. Environments define tools using this type; each Client converts them to its native format via to_native_tool().
Configuration Types
v1 Config
class Config(BaseModel):
def __init__(self, config: object | None = None, /, **data: object): ...
@classmethod
def from_config(cls, config: object | None = None, /, **data: object) -> Self: ...
@classmethod
def from_toml(
cls, path: str | Path, section: str | Iterable[str] | None = None
) -> Self: ...
class EnvConfig(Config):
taskset: object | None = None
harness: object | None = None
class TasksetConfig(Config):
taskset_id: str | None = None
system_prompt: object | None = None
source: object | None = None
eval_source: object | None = None
user: object | None = None
class HarnessConfig(Config):
program: object | None = None
system_prompt: object | None = None
sandbox: SandboxConfig | None = None
model: str | None = None
sampling_args: dict[str, object] = {}
max_turns: int = 10
EnvConfig is the typed v1 loader envelope. TOML [env.taskset] and
[env.harness] sections flow to config.taskset and config.harness;
environment-specific named args flow through [env.args].
Config subclasses accept a positional source config plus direct keyword
overrides. The source object is positional-only so subclasses can define a real
field named config.
ClientConfig
class ClientConfig(BaseModel):
client_idx: int = 0
client_type: ClientType = "openai_chat_completions"
preserve_all_thinking: bool = False
preserve_thinking_between_tool_calls: bool = False
api_key_var: str = "PRIME_API_KEY"
api_base_url: str = "https://api.pinference.ai/api/v1"
endpoint_configs: list[EndpointClientConfig] = []
timeout: float = 3600.0
connect_timeout: float = 5.0
max_connections: int = 28000
max_keepalive_connections: int = 28000
max_retries: int = 10
extra_headers: dict[str, str] = {}
extra_headers_from_state: dict[str, str] = {}
extra_headers_from_state maps HTTP header names to state field names. For each inference request, the header value is dynamically read from the rollout state dict. For example, {"X-Session-ID": "example_id"} adds a X-Session-ID header with the value of state["example_id"], enabling sticky routing at the inference router level.
client_type selects which Client implementation to instantiate (see Client Classes). Use endpoint_configs for multi-endpoint round-robin. In grouped scoring mode, groups are distributed round-robin across endpoint configs.
preserve_all_thinking and preserve_thinking_between_tool_calls are forwarded to the underlying renderer when client_type == "renderer". They control whether past-assistant reasoning_content is re-emitted on subsequent renders — preserve_all_thinking keeps every past-assistant turn’s thinking, and preserve_thinking_between_tool_calls keeps thinking only inside the in-flight assistant→tool→…→assistant block after the most recent user turn (when that block contains at least one tool response). Both default to False (template default applies).
When api_key_var is "PRIME_API_KEY" (the default), credentials are loaded with the following precedence:
- API key:
PRIME_API_KEY env var > ~/.prime/config.json > "EMPTY"
- Team ID:
PRIME_TEAM_ID env var > ~/.prime/config.json > not set
This allows seamless use after running prime login.
EndpointClientConfig
class EndpointClientConfig(BaseModel):
client_idx: int = 0
api_key_var: str = "PRIME_API_KEY"
api_base_url: str = "https://api.pinference.ai/api/v1"
timeout: float = 3600.0
max_connections: int = 28000
max_keepalive_connections: int = 28000
max_retries: int = 10
extra_headers: dict[str, str] = {}
Leaf endpoint configuration used inside ClientConfig.endpoint_configs. Has the same fields as ClientConfig except endpoint_configs itself, preventing recursive nesting.
EvalConfig
class EvalConfig(BaseModel):
env_id: str
env_args: dict
env_dir_path: str
endpoint_id: str | None = None
model: str
client_config: ClientConfig
sampling_args: SamplingArgs
num_examples: int
rollouts_per_example: int
max_concurrent: int
independent_scoring: bool = False
extra_env_kwargs: dict = {}
max_retries: int = 0
verbose: bool = False
state_columns: list[str] | None = None
save_results: bool = False
resume_path: Path | None = None
save_to_hf_hub: bool = False
hf_hub_dataset_name: str | None = None
Endpoint
Endpoint = TypedDict(
"Endpoint",
{
"key": str,
"url": str,
"model": str,
"api_client_type": NotRequired[ClientType],
"extra_headers": NotRequired[dict[str, str]],
},
)
Endpoints = dict[str, list[Endpoint]]
Endpoints maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list.
Prime CLI Plugin
Verifiers exposes a plugin contract consumed by prime for command execution.
PRIME_PLUGIN_API_VERSION
PRIME_PLUGIN_API_VERSION = 1
API version for compatibility checks between prime and verifiers.
PrimeCLIPlugin
@dataclass(frozen=True)
class PrimeCLIPlugin:
api_version: int = PRIME_PLUGIN_API_VERSION
eval_module: str = "verifiers.cli.commands.eval"
gepa_module: str = "verifiers.cli.commands.gepa"
install_module: str = "verifiers.cli.commands.install"
init_module: str = "verifiers.cli.commands.init"
setup_module: str = "verifiers.cli.commands.setup"
build_module: str = "verifiers.cli.commands.build"
def build_module_command(
self, module_name: str, args: Sequence[str] | None = None
) -> list[str]:
...
build_module_command returns a subprocess command list for python -m <module> ....
get_plugin
def get_plugin() -> PrimeCLIPlugin:
...
Returns the plugin instance consumed by prime.
Decorators
@vf.stop
@vf.stop
async def my_condition(self, state: State) -> bool:
"""Return True to end the rollout."""
...
@vf.stop(priority=10) # Higher priority runs first
async def early_check(self, state: State) -> bool:
...
Mark a method as a stop condition. All stop conditions are checked by is_completed().
@vf.cleanup
@vf.cleanup
async def my_cleanup(self, state: State) -> None:
"""Called after each rollout completes."""
...
@vf.cleanup(priority=10)
async def early_cleanup(self, state: State) -> None:
...
Mark a method as a rollout cleanup handler. Cleanup methods should be idempotent—safe to call multiple times—and handle errors gracefully to ensure cleanup completes even when resources are in unexpected states.
@vf.teardown
@vf.teardown
async def my_teardown(self) -> None:
"""Called when environment is destroyed."""
...
@vf.teardown(priority=10)
async def early_teardown(self) -> None:
...
Mark a method as an environment teardown handler.
Utility Functions
Data Utilities
vf.load_example_dataset(name: str) -> Dataset
Load a built-in example dataset.
vf.extract_boxed_answer(text: str, strict: bool = False) -> str
Extract answer from LaTeX \boxed{} format. When strict=True, returns "" if no \boxed{} is found (used by MathRubric to avoid scoring unformatted responses). When strict=False (default), returns the original text as a passthrough.
vf.extract_hash_answer(text: str) -> str | None
Extract answer after #### marker (GSM8K format).
Environment Utilities
vf.load_environment(env_id: str, **kwargs) -> Environment
Load an environment by ID (e.g., "primeintellect/gsm8k").
Configuration Utilities
vf.ensure_keys(keys: list[str]) -> None
Validate that required environment variables are set. Raises MissingKeyError (a ValueError subclass) with a clear message listing all missing keys and instructions for setting them.
class MissingKeyError(ValueError):
keys: list[str] # list of missing key names
Example:
def load_environment(api_key_var: str = "OPENAI_API_KEY") -> vf.Environment:
vf.ensure_keys([api_key_var])
# now safe to use os.environ[api_key_var]
...
Logging Utilities
vf.print_prompt_completions_sample(outputs: GenerateOutputs, n: int = 3)
Pretty-print sample rollouts.
vf.setup_logging(level: str = "INFO")
Configure verifiers logging. Set VF_LOG_LEVEL env var to change default.
vf.log_level(level: str | int)
Context manager to temporarily set the verifiers logger to a new log level. Useful for temporarily adjusting verbosity during specific operations.
with vf.log_level("DEBUG"):
# verifiers logs at DEBUG level here
...
# reverts to previous level
Context manager to temporarily silence verifiers logging by setting WARNING level. Shorthand for vf.log_level("WARNING").
with vf.quiet_verifiers():
# verifiers logging is quieted here
outputs = env.generate(...)
# logging restored