Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.primeintellect.ai/llms.txt

Use this file to discover all available pages before exploring further.

Table of Contents


Type Aliases

Messages

Messages = str | list[ChatMessage]
The primary message type. Either a plain string (completion mode) or a list of chat messages (chat mode).

ChatMessage

ChatMessage = ChatCompletionMessageParam  # from openai.types.chat
OpenAI’s chat message type with role, content, and optional tool_calls / tool_call_id fields.

SystemMessage

class SystemMessage:
    role: Literal["system"] = "system"
    content: MessageContent

    @classmethod
    def from_path(cls, path: str | Path) -> "SystemMessage": ...
Provider-agnostic system message type. Use vf.SystemMessage.from_path(...) to load a system prompt from a UTF-8 text file while preserving the file contents verbatim.

Info

Info = dict[str, Any]
Arbitrary metadata dictionary from dataset rows.

SamplingArgs

SamplingArgs = dict[str, Any]
Generation parameters passed to the inference server (e.g., temperature, top_p, max_tokens).

RewardFunc

IndividualRewardFunc = Callable[..., float | Awaitable[float]]
GroupRewardFunc = Callable[..., list[float] | Awaitable[list[float]]]
RewardFunc = IndividualRewardFunc | GroupRewardFunc
Individual reward functions operate on single rollouts. Group reward functions operate on all rollouts for an example together (useful for relative scoring).

ClientType

ClientType = Literal[
    "openai_completions",
    "openai_chat_completions",
    "openai_chat_completions_token",
    "openai_responses",
    "renderer",
    "anthropic_messages",
    "nemorl_chat_completions",
]
Selects which Client implementation to use. Set via ClientConfig.client_type.

Data Types

State

class State(dict):
    INPUT_FIELDS = ["prompt", "answer", "info", "example_id"]
A dict subclass that tracks rollout information. Accessing keys in INPUT_FIELDS automatically forwards to the nested input object. Fields set during initialization:
FieldTypeDescription
inputRolloutInputNested input data
clientClientClient instance
modelstrModel name
sampling_argsSamplingArgs | NoneGeneration parameters
is_completedboolWhether rollout has ended
is_truncatedboolWhether generation was truncated
tool_defslist[Tool] | NoneAvailable tool definitions
trajectorylist[TrajectoryStep]Multi-turn trajectory
trajectory_idstrUUID for this rollout
timingRolloutTimingTiming information
Fields set after scoring:
FieldTypeDescription
completionMessages | NoneFinal completion
rewardfloat | NoneFinal reward
advantagefloat | NoneAdvantage over group mean
metricsdict[str, float] | NonePer-function metrics
stop_conditionstr | NoneName of triggered stop condition
errorError | NoneError if rollout failed

RolloutInput

class RolloutInput(TypedDict):
    prompt: Messages        # Required
    example_id: int         # Required
    answer: str             # Optional
    info: Info              # Optional

RolloutOutput

class RolloutOutput(dict):
    # Required fields
    example_id: int
    prompt: Messages | None
    completion: Messages | None
    reward: float
    timing: RolloutTiming
    is_completed: bool
    is_truncated: bool
    metrics: dict[str, float]
    # Optional fields
    answer: str
    info: Info
    error: str | None
    stop_condition: str | None
    token_usage: TokenUsage
    trajectory: list[TrajectoryStep]
    tool_defs: list[Tool] | None
Serialized output from a rollout. This is a dict subclass that provides typed access to known fields while supporting arbitrary additional fields from state_columns. All values must be JSON-serializable. Used in GenerateOutputs and for saving results to disk.

TrajectoryStep

class TrajectoryStep(TypedDict):
    prompt: Messages
    completion: Messages
    response: Response
    tokens: TrajectoryStepTokens | None
    reward: float | None
    advantage: float | None
    is_truncated: bool
    trajectory_id: str
    extras: dict[str, Any]
A single turn in a multi-turn rollout.

TrajectoryStepTokens

class TrajectoryStepTokens(TypedDict):
    prompt_ids: list[int]
    prompt_mask: list[int]
    completion_ids: list[int]
    completion_mask: list[int]
    completion_logprobs: list[float]
    overlong_prompt: bool
    is_truncated: bool
    routed_experts: list[list[list[int]]] | None  # [seq_len, layers, topk] to enable router replay
Token-level data for training.

TimeSpan

class TimeSpan(CustomBaseModel):
    """A timed span. duration = end - start."""
    start: float = 0.0   # Unix timestamp (seconds since epoch)
    end: float = 0.0     # Unix timestamp (seconds since epoch)
    # duration: float    (computed_field)

TimeSpans

class TimeSpans(CustomBaseModel):
    """A list of TimeSpan with aggregate duration (sum)."""
    spans: list[TimeSpan] = []
    # duration: float    (computed_field)

RolloutTiming

class RolloutTiming(CustomBaseModel):
    """Rollout-level timing. All values in seconds."""
    start_time: float                       # wall-clock at rollout start
    setup: TimeSpan = TimeSpan()            # setup_state() span
    generation: TimeSpan = TimeSpan()       # full generation phase
    scoring: TimeSpan = TimeSpan()          # rubric.score_*() span
    model: TimeSpans = TimeSpans()          # all model-call spans
    env: TimeSpans = TimeSpans()            # all env-response spans
    # total, overhead: float                (computed_fields)
Derivations:
  • total = scoring.end - generation.start
  • overhead = total - setup.duration - model.duration - env.duration - scoring.duration
generation.start is stamped at the top of the rollout (before setup_state), so total covers the entire rollout including setup, generation loop, finalize, and scoring. overhead captures any time not attributed to the named phases.

TokenUsage

class TokenUsage(TypedDict, total=False):
    input_tokens: float
    output_tokens: float
    final_input_tokens: float
    final_output_tokens: float
FieldDescription
input_tokensSum of prompt tokens across all turns. Shared context is counted each time it appears in a prompt.
output_tokensSum of completion tokens across all turns.
final_input_tokensNon-completion tokens in the final turn’s context (system prompts, user messages, tool results, etc.).
final_output_tokensCompletion tokens in the final turn’s context. Equals output_tokens for single-turn rollouts.
In a single-turn rollout, input_tokens == final_input_tokens and output_tokens == final_output_tokens. In a multi-turn rollout, input_tokens > final_input_tokens because earlier turns’ prompts are counted again. The final_* metrics assume a single, continuously extended trajectory. Non-linear trajectories (multi-agent, context summarization, history rewriting) are not accounted for.

GenerateOutputs

class GenerateOutputs(TypedDict):
    outputs: list[RolloutOutput]
    metadata: GenerateMetadata
Output from Environment.generate(). Contains a list of RolloutOutput objects (one per rollout) and generation metadata. Each RolloutOutput is a serialized, JSON-compatible dict containing the rollout’s prompt, completion, answer, reward, metrics, timing, and other per-rollout data.

GenerateMetadata

class VersionInfo(TypedDict):
    vf_version: str
    vf_commit: str | None
    env_version: str | None
    env_commit: str | None

class GenerateMetadata(TypedDict):
    env_id: str
    env_args: dict
    model: str
    base_url: str
    num_examples: int
    rollouts_per_example: int
    sampling_args: SamplingArgs
    date: str
    time_ms: float
    avg_reward: float
    avg_metrics: dict[str, float]
    avg_error: float
    pass_at_k: dict[str, float]
    pass_all_k: dict[str, float]
    pass_threshold: float
    usage: TokenUsage | None
    version_info: VersionInfo
    state_columns: list[str]
    path_to_save: Path
    tools: list[Tool] | None
base_url is always serialized as a string. For multi-endpoint runs (e.g., using ClientConfig.endpoint_configs), it is stored as a comma-separated list of URLs. version_info captures the verifiers framework version/commit and the environment package version/commit at generation time. Populated automatically by GenerateOutputsBuilder.

RolloutScore / RolloutScores

class RolloutScore(TypedDict):
    reward: float
    metrics: dict[str, float]

class RolloutScores(TypedDict):
    reward: list[float]
    metrics: dict[str, list[float]]

Classes

Environment Classes

Environment

class Environment(ABC):
    def __init__(
        self,
        dataset: Dataset | None = None,
        eval_dataset: Dataset | None = None,
        system_prompt: str | None = None,
        few_shot: list[ChatMessage] | None = None,
        parser: Parser | None = None,
        rubric: Rubric | None = None,
        sampling_args: SamplingArgs | None = None,
        message_type: MessageType = "chat",
        max_workers: int = 512,
        env_id: str | None = None,
        env_args: dict | None = None,
        max_seq_len: int | None = None,
        score_rollouts: bool = True,
        pass_threshold: float = 0.5,
        **kwargs,
    ): ...
Abstract base class for all environments. Generation methods:
MethodReturnsDescription
generate(inputs, client, model, ...)GenerateOutputsRun rollouts asynchronously. client accepts Client | ClientConfig.
generate_sync(inputs, client, ...)GenerateOutputsSynchronous wrapper
evaluate(client, model, ...)GenerateOutputsEvaluate on eval_dataset
evaluate_sync(client, model, ...)GenerateOutputsSynchronous evaluation
Dataset methods:
MethodReturnsDescription
get_dataset(n=-1, seed=None)DatasetGet training dataset (optionally first n, shuffled)
get_eval_dataset(n=-1, seed=None)DatasetGet evaluation dataset
make_dataset(...)DatasetStatic method to create dataset from inputs
Rollout methods (used internally or by subclasses):
MethodReturnsDescription
rollout(input, client, model, sampling_args)StateAbstract: run single rollout
init_state(input, client, model, sampling_args)StateCreate initial state from input
get_model_response(state, prompt, ...)ResponseGet model response for prompt
is_completed(state)boolCheck all stop conditions
run_rollout(sem, input, client, model, sampling_args)StateRun rollout with semaphore
run_group(group_inputs, client, model, ...)list[State]Generate and score one group
Configuration methods:
MethodDescription
set_kwargs(**kwargs)Set attributes using setter methods when available
set_concurrency(concurrency)Set concurrency and scale all registered thread-pool executors to match
add_rubric(rubric)Add or merge rubric
set_max_seq_len(max_seq_len)Set maximum sequence length
set_score_rollouts(bool)Enable/disable scoring

SingleTurnEnv

Single-response Q&A tasks. Inherits from Environment.

MultiTurnEnv

class MultiTurnEnv(Environment):
    def __init__(
        self,
        max_turns: int = -1,
        timeout_seconds: float | None = None,
        **kwargs,
    ): ...
Multi-turn interactions. Subclasses must implement env_response. Abstract method:
async def env_response(self, messages: Messages, state: State, **kwargs) -> Messages:
    """Generate environment feedback after model turn."""
Built-in stop conditions: has_error, prompt_too_long, max_turns_reached, timeout_reached, max_total_completion_tokens_reached, has_final_env_response Hooks:
MethodDescription
setup_state(state)Initialize per-rollout state
get_prompt_messages(state)Customize prompt construction
render_completion(state)Customize completion rendering
add_trajectory_step(state, step)Customize trajectory handling
set_max_total_completion_tokens(int)Set maximum total completion tokens

ToolEnv

class ToolEnv(MultiTurnEnv):
    def __init__(
        self,
        tools: list[Callable] | None = None,
        max_turns: int = 10,
        error_formatter: Callable[[Exception], str] = lambda e: f"{e}",
        stop_errors: list[type[Exception]] | None = None,
        **kwargs,
    ): ...
Tool calling with stateless Python functions. Automatically converts functions to OpenAI tool format. Built-in stop condition: no_tools_called (ends when model responds without tool calls) Methods:
MethodDescription
add_tool(tool)Add a tool at runtime
remove_tool(tool)Remove a tool at runtime
call_tool(name, args, id)Override to customize tool execution

StatefulToolEnv

Tools requiring per-rollout state. Override setup_state and update_tool_args to inject state.

SandboxEnv

class SandboxEnv(StatefulToolEnv):
    def __init__(
        self,
        sandbox_name: str = "sandbox-env",
        docker_image: str = "python:3.11-slim",
        start_command: str = "tail -f /dev/null",
        cpu_cores: int = 1,
        memory_gb: int = 2,
        disk_size_gb: int = 5,
        gpu_count: int = 0,
        timeout_minutes: int = 60,
        timeout_per_command_seconds: int = 30,
        environment_vars: dict[str, str] | None = None,
        team_id: str | None = None,
        advanced_configs: AdvancedConfigs | None = None,
        labels: list[str] | None = None,
        **kwargs,
    ): ...
Sandboxed container execution using prime sandboxes. Key parameters:
ParameterTypeDescription
sandbox_namestrName prefix for sandbox instances
docker_imagestrDocker image to use for the sandbox
cpu_coresintNumber of CPU cores
memory_gbintMemory allocation in GB
disk_size_gbintDisk size in GB
gpu_countintNumber of GPUs
timeout_minutesintSandbox timeout in minutes
timeout_per_command_secondsintPer-command execution timeout
environment_varsdict[str, str] | NoneEnvironment variables to set in sandbox
labelslist[str] | NoneLabels for sandbox categorization and filtering

PythonEnv

Persistent Python REPL in sandbox. Extends SandboxEnv.

OpenEnvEnv

class OpenEnvEnv(MultiTurnEnv):
    def __init__(
        self,
        openenv_project: str | Path,
        num_train_examples: int = 100,
        num_eval_examples: int = 50,
        seed: int = 0,
        prompt_renderer: Callable[..., ChatMessages] | None = None,
        max_turns: int = -1,
        rubric: Rubric | None = None,
        **kwargs,
    ): ...
OpenEnv integration that runs OpenEnv projects in Prime Sandboxes using a prebuilt image manifest (.build.json), supports both gym and MCP contracts, and requires a prompt_renderer to convert observations into chat messages.

SWEDebugEnv

class SWEDebugEnv(SandboxMixin, MultiTurnEnv):
    def __init__(
        self,
        taskset: SandboxTaskSet,
        dataset: Any = None,
        *,
        run_setup: bool = True,
        debug_step: Literal["none", "gold_patch", "command", "script"] = "gold_patch",
        run_tests: bool = True,
        debug_command: str | None = None,
        debug_script: str | None = None,
        debug_script_path: str | None = None,
        debug_timeout: int | None = None,
        test_timeout: int = 900,
        cpu_cores: int | None = None,
        memory_gb: int | None = None,
        disk_size_gb: int | None = None,
        labels: list[str] | None = None,
        timeout_seconds: float = 1800.0,
        output_tail_chars: int = 2000,
        **sandbox_kwargs,
    ): ...
No-agent debugger for SWE-style SandboxTaskSet instances. It creates the task sandbox, optionally runs task setup, runs one debug step (none, gold_patch, command, or script), and optionally runs tests and scores the result.

EnvGroup

env_group = vf.EnvGroup(
    envs=[env1, env2, env3],
    env_names=["math", "code", "qa"]  # optional
)
Combines multiple environments for mixed-task training. Combined datasets use info["env_id"] as internal routing metadata; it is not a top-level input, state, or output field.

v1 Taskset/Harness Classes

The v1 API is exposed as verifiers.v1 and documented in BYO Harness. Its core unit is:
state = await harness.run(task, state=None)
Taskset and Env package that runner for datasets, evals, and training.

Task

class Task(dict):
    def freeze(self) -> Task: ...
Immutable, JSON-serializable input data. A task is usually created by a Taskset, but can be run directly through a standalone Harness. Common top-level fields:
FieldDescription
promptUser/developer/tool messages for the rollout. Must not contain system messages.
system_promptPer-task system messages or string.
answerReference answer or target data. Stays on task, not state.
infoSerializable metadata.
max_turnsPer-task base-loop turn limit.
toolsTool visibility: {"show": [...]} or {"hide": [...]}.
toolsetsToolset visibility or rollout-local toolset config.
sandboxPer-task sandbox overrides for sandboxed programs.
programTask-owned files, dirs, env, setup, artifacts, bindings, and command args.
task.runtime is not public schema. Runtime metadata belongs on State.

State

class State(dict):
    @classmethod
    def for_task(task: Mapping[str, Any], ...) -> State: ...
    def stop(self, condition: str = "state_done") -> None: ...
    def get_model(self) -> str: ...
    def get_client(api: str = "chat_completions", *, sync: bool = False) -> object: ...
    def get_endpoint_config(api: str = "chat_completions") -> dict[str, str]: ...
    def get_tools() -> dict[str, Callable[..., Awaitable[object]]]: ...
    def get_max_turns(default: int) -> int: ...
    def finalize() -> State: ...
Mutable rollout output. State starts from a task and accumulates trajectory, completion, metrics, reward, timing, artifacts, errors, and user-defined serializable fields. Framework-managed fields such as is_completed, stop_condition, is_truncated, and error cannot be written directly. Use state.stop(...) or raise vf.Error subclasses. State.for_task(...) can borrow selected active runtime handles from another state:
child_state = state.for_task(child_task, borrow=["model", "sandbox"], tools="bash")
child_state = await child_harness.run(child_task, child_state)
Borrowed handles are process-local and stripped before state crosses the serialization boundary.

Taskset

class Taskset:
    def __init__(
        source=None,
        eval_source=None,
        taskset_id: str | None = None,
        system_prompt=None,
        user=None,
        toolsets=(),
        stops=(),
        setups=(),
        updates=(),
        metrics=(),
        rewards=(),
        advantages=(),
        cleanups=(),
        config: TasksetConfig | Mapping[str, object] | None = None,
    ): ...

    def rows() -> list[dict[str, Any]]: ...
    def eval_rows() -> list[dict[str, Any]]: ...
    def task(row: Mapping[str, Any]) -> Task: ...
    def to_task(value: Mapping[str, Any] | Task | str) -> Task: ...
    async def init_group(task: Task, num_rollouts: int) -> tuple[list[Task], list[State]]: ...
    def get_dataset() -> Dataset: ...
    def get_eval_dataset() -> Dataset: ...
Packages task rows and task-owned behavior. source and eval_source may be iterables or zero-argument loaders. Loaders should close over resolved config instead of accepting runtime kwargs.

Harness

class Harness:
    def __init__(
        program=None,
        system_prompt=None,
        user=None,
        sandbox=None,
        client=None,
        model: str | None = None,
        sampling_args: SamplingArgs | None = None,
        max_turns: int | None = None,
        toolsets=None,
        stops=None,
        setups=None,
        updates=None,
        metrics=None,
        rewards=None,
        advantages=None,
        cleanups=None,
        config: HarnessConfig | Mapping[str, object] | None = None,
    ): ...

    async def run(task: Task | Mapping[str, Any], state: State | None = None) -> State: ...
    async def score_group(tasks: list[Task], states: list[State]) -> list[State]: ...
    async def cleanup_group(tasks: list[Task], states: list[State]) -> None: ...
    async def teardown() -> None: ...
Runs one task. All model calls go through the v1 interception endpoint so trajectory capture, sampling args, tool forwarding, and protocol translation use one path across local Python, sandboxed Python, command programs, and the base tool loop. program forms:
FormMeaning
NoneDefault endpoint-backed tool loop.
callableIn-process Python program with task, state.
{"base": true, ...}Explicit default loop, usually with sandbox options.
{"fn": "pkg.module:run", ...}Importable Python program.
{"command": ["cmd", "arg"], ...}Local or sandboxed command.
Sandboxed program.fn refs resolve their owning local package from the resolved module root: single-file modules use pyproject.toml in the same directory as the module file, and package modules use pyproject.toml inside the package directory. v1 uploads and installs that package in the program sandbox. Package dependencies come from normal [project.dependencies].

Env

class Env(vf.Environment):
    def __init__(taskset: Taskset, harness: Harness | None = None): ...
Adapter that makes a v1 taskset/harness pair usable by eval and training workers. If harness is omitted, Env uses the base Harness.

Toolset And MCPTool

class Toolset:
    def __init__(
        tools=(),
        show=None,
        hide=None,
        bindings=None,
        objects=None,
        write: bool = False,
        scope: Literal["rollout", "group", "global"] = "rollout",
        sandbox=None,
        stops=(),
        setups=(),
        updates=(),
        metrics=(),
        rewards=(),
        advantages=(),
        cleanups=(),
        config: ToolsetConfig | Mapping[str, object] | None = None,
    ): ...

class MCPTool:
    def __init__(command: str, args=None, env=None, cwd: str | None = None): ...
Toolsets package callable tools, MCP servers, private dependency factories, and hidden bindings. objects.* bindings are private to the owning toolset/user and are not directly accessible from state.

v1 Config Models

TasksetConfig.from_toml(path, section=None)
HarnessConfig.from_toml(path, section=None)
ToolsetConfig(...)
SandboxConfig(...)
UserConfig(...)
MCPToolConfig(...)
v1 config models are Pydantic models. Constructors accept config objects or plain mappings; TOML config uses "module:object" refs for Python callables and loaders. Unknown fields fail validation.

Parser Classes

Parser

class Parser:
    def __init__(self, extract_fn: Callable[[str], str] = lambda x: x): ...
    
    def parse(self, text: str) -> Any: ...
    def parse_answer(self, completion: Messages) -> str | None: ...
    def get_format_reward_func(self) -> Callable: ...
Base parser. Default behavior returns text as-is.

XMLParser

class XMLParser(Parser):
    def __init__(
        self,
        fields: list[str | tuple[str, ...]],
        answer_field: str = "answer",
        extract_fn: Callable[[str], str] = lambda x: x,
    ): ...
Extracts structured fields from XML-tagged output.
parser = vf.XMLParser(fields=["reasoning", "answer"])
# Parses: <reasoning>...</reasoning><answer>...</answer>

# With alternatives:
parser = vf.XMLParser(fields=["reasoning", ("code", "answer")])
# Accepts either <code> or <answer> for second field
Methods:
MethodReturnsDescription
parse(text)SimpleNamespaceParse XML into object with field attributes
parse_answer(completion)str | NoneExtract answer field from completion
get_format_str()strGet format description string
get_fields()list[str]Get canonical field names
format(**kwargs)strFormat kwargs into XML string

ThinkParser

class ThinkParser(Parser):
    def __init__(self, extract_fn: Callable[[str], str] = lambda x: x): ...
Extracts content after </think> tag. For models that always include <think> tags but don’t parse them automatically.

MaybeThinkParser

Handles optional <think> tags (for models that may or may not think).

Rubric Classes

Rubric

class Rubric:
    def __init__(
        self,
        funcs: list[RewardFunc] | None = None,
        weights: list[float] | None = None,
        parser: Parser | None = None,
    ): ...
Combines multiple reward functions with weights. Default weight is 1.0. Functions with weight=0.0 are tracked as metrics only. Methods:
MethodDescription
add_reward_func(func, weight=1.0)Add a reward function
add_metric(func, weight=0.0)Add a metric (no reward contribution)
add_class_object(name, obj)Add object accessible in reward functions
Reward function signature:
def my_reward(
    completion: Messages,
    answer: str = "",
    prompt: Messages | None = None,
    state: State | None = None,
    parser: Parser | None = None,  # if rubric has parser
    info: Info | None = None,
    **kwargs
) -> float:
    ...
Group reward function signature:
def my_group_reward(
    completions: list[Messages],
    answers: list[str],
    states: list[State],
    # ... plural versions of individual args
    **kwargs
) -> list[float]:
    ...

JudgeRubric

LLM-as-judge evaluation.

MathRubric

Math-specific evaluation using math-verify.

RubricGroup

Combines rubrics for EnvGroup.

Client Classes

Client

class Client(ABC, Generic[ClientT, MessagesT, ResponseT, ToolT]):
    def __init__(self, client_or_config: ClientT | ClientConfig) -> None: ...

    @property
    def client(self) -> ClientT: ...

    async def get_response(
        self,
        prompt: Messages,
        model: str,
        sampling_args: SamplingArgs,
        tools: list[Tool] | None = None,
        **kwargs,
    ) -> Response: ...

    async def close(self) -> None: ...
Abstract base class for all model clients. Wraps a provider-specific SDK client and translates between provider-agnostic vf types (Messages, Tool, Response) and provider-native formats. The client property exposes the underlying SDK client (e.g., AsyncOpenAI, AsyncAnthropic). get_response() is the main public method — it converts the prompt and tools to the native format, calls the provider API, validates the response, and converts it back to a vf.Response. Errors are wrapped in vf.ModelError unless they are already vf.Error or authentication errors. Abstract methods (for subclass implementors):
MethodDescription
setup_client(config)Create the native SDK client from ClientConfig
to_native_prompt(messages)Convert Messages → native prompt format + extra kwargs
to_native_tool(tool)Convert Tool → native tool format
get_native_response(prompt, model, ...)Call the provider API
raise_from_native_response(response)Raise ModelError for invalid responses
from_native_response(response)Convert native response → vf.Response
close()Close the underlying SDK client

Built-in Client Implementations

Classclient_typeSDK ClientDescription
OpenAIChatCompletionsClient"openai_chat_completions"AsyncOpenAIChat Completions API (default)
OpenAICompletionsClient"openai_completions"AsyncOpenAILegacy Completions API
OpenAIChatCompletionsTokenClient"openai_chat_completions_token"AsyncOpenAICustom vLLM token route (/v1/chat/completions/tokens) — server-side templating + token IDs returned alongside content
OpenAIResponsesClient"openai_responses"AsyncOpenAIOpenAI Responses API
RendererClient"renderer"AsyncOpenAIRenderer-backed token-in generate client (client-side tokenization via the renderers package)
AnthropicMessagesClient"anthropic_messages"AsyncAnthropicAnthropic Messages API
NeMoRLChatCompletionsClient"nemorl_chat_completions"AsyncOpenAINeMo-RL Chat Completions variant
All built-in clients are available as vf.OpenAIChatCompletionsClient, vf.AnthropicMessagesClient, etc. RendererClient requires the optional renderer package; install it with uv add "verifiers[renderers]" before importing vf.RendererClient or using client_type="renderer".

Response

class Response(BaseModel):
    id: str
    created: int
    model: str
    usage: Usage | None
    message: ResponseMessage

class ResponseMessage(BaseModel):
    content: str | None
    reasoning_content: str | None
    finish_reason: Literal["stop", "length", "tool_calls"] | None
    is_truncated: bool | None
    tokens: ResponseTokens | None
    tool_calls: list[ToolCall] | None
Provider-agnostic model response. All Client implementations return Response from get_response().

Tool

class Tool(BaseModel):
    name: str
    description: str
    parameters: dict[str, object]
    strict: bool | None = None
Provider-agnostic tool definition. Environments define tools using this type; each Client converts them to its native format via to_native_tool().

Configuration Types

v1 Config

class Config(BaseModel):
    def __init__(self, config: object | None = None, /, **data: object): ...

    @classmethod
    def from_config(cls, config: object | None = None, /, **data: object) -> Self: ...

    @classmethod
    def from_toml(
        cls, path: str | Path, section: str | Iterable[str] | None = None
    ) -> Self: ...

class EnvConfig(Config):
    taskset: object | None = None
    harness: object | None = None

class TasksetConfig(Config):
    taskset_id: str | None = None
    system_prompt: object | None = None
    source: object | None = None
    eval_source: object | None = None
    user: object | None = None

class HarnessConfig(Config):
    program: object | None = None
    system_prompt: object | None = None
    sandbox: SandboxConfig | None = None
    model: str | None = None
    sampling_args: dict[str, object] = {}
    max_turns: int = 10
EnvConfig is the typed v1 loader envelope. TOML [env.taskset] and [env.harness] sections flow to config.taskset and config.harness; environment-specific named args flow through [env.args]. Config subclasses accept a positional source config plus direct keyword overrides. The source object is positional-only so subclasses can define a real field named config.

ClientConfig

class ClientConfig(BaseModel):
    client_idx: int = 0
    client_type: ClientType = "openai_chat_completions"
    preserve_all_thinking: bool = False
    preserve_thinking_between_tool_calls: bool = False
    api_key_var: str = "PRIME_API_KEY"
    api_base_url: str = "https://api.pinference.ai/api/v1"
    endpoint_configs: list[EndpointClientConfig] = []
    timeout: float = 3600.0
    connect_timeout: float = 5.0
    max_connections: int = 28000
    max_keepalive_connections: int = 28000
    max_retries: int = 10
    extra_headers: dict[str, str] = {}
    extra_headers_from_state: dict[str, str] = {}
extra_headers_from_state maps HTTP header names to state field names. For each inference request, the header value is dynamically read from the rollout state dict. For example, {"X-Session-ID": "example_id"} adds a X-Session-ID header with the value of state["example_id"], enabling sticky routing at the inference router level. client_type selects which Client implementation to instantiate (see Client Classes). Use endpoint_configs for multi-endpoint round-robin. In grouped scoring mode, groups are distributed round-robin across endpoint configs. preserve_all_thinking and preserve_thinking_between_tool_calls are forwarded to the underlying renderer when client_type == "renderer". They control whether past-assistant reasoning_content is re-emitted on subsequent renders — preserve_all_thinking keeps every past-assistant turn’s thinking, and preserve_thinking_between_tool_calls keeps thinking only inside the in-flight assistant→tool→…→assistant block after the most recent user turn (when that block contains at least one tool response). Both default to False (template default applies). When api_key_var is "PRIME_API_KEY" (the default), credentials are loaded with the following precedence:
  • API key: PRIME_API_KEY env var > ~/.prime/config.json > "EMPTY"
  • Team ID: PRIME_TEAM_ID env var > ~/.prime/config.json > not set
This allows seamless use after running prime login.

EndpointClientConfig

class EndpointClientConfig(BaseModel):
    client_idx: int = 0
    api_key_var: str = "PRIME_API_KEY"
    api_base_url: str = "https://api.pinference.ai/api/v1"
    timeout: float = 3600.0
    max_connections: int = 28000
    max_keepalive_connections: int = 28000
    max_retries: int = 10
    extra_headers: dict[str, str] = {}
Leaf endpoint configuration used inside ClientConfig.endpoint_configs. Has the same fields as ClientConfig except endpoint_configs itself, preventing recursive nesting.

EvalConfig

class EvalConfig(BaseModel):
    env_id: str
    env_args: dict
    env_dir_path: str
    endpoint_id: str | None = None
    model: str
    client_config: ClientConfig
    sampling_args: SamplingArgs
    num_examples: int
    rollouts_per_example: int
    max_concurrent: int
    independent_scoring: bool = False
    extra_env_kwargs: dict = {}
    max_retries: int = 0
    verbose: bool = False
    state_columns: list[str] | None = None
    save_results: bool = False
    resume_path: Path | None = None
    save_to_hf_hub: bool = False
    hf_hub_dataset_name: str | None = None

Endpoint

Endpoint = TypedDict(
    "Endpoint",
    {
        "key": str,
        "url": str,
        "model": str,
        "api_client_type": NotRequired[ClientType],
        "extra_headers": NotRequired[dict[str, str]],
    },
)
Endpoints = dict[str, list[Endpoint]]
Endpoints maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list.

Prime CLI Plugin

Verifiers exposes a plugin contract consumed by prime for command execution.

PRIME_PLUGIN_API_VERSION

PRIME_PLUGIN_API_VERSION = 1
API version for compatibility checks between prime and verifiers.

PrimeCLIPlugin

@dataclass(frozen=True)
class PrimeCLIPlugin:
    api_version: int = PRIME_PLUGIN_API_VERSION
    eval_module: str = "verifiers.cli.commands.eval"
    gepa_module: str = "verifiers.cli.commands.gepa"
    install_module: str = "verifiers.cli.commands.install"
    init_module: str = "verifiers.cli.commands.init"
    setup_module: str = "verifiers.cli.commands.setup"
    build_module: str = "verifiers.cli.commands.build"

    def build_module_command(
        self, module_name: str, args: Sequence[str] | None = None
    ) -> list[str]:
        ...
build_module_command returns a subprocess command list for python -m <module> ....

get_plugin

def get_plugin() -> PrimeCLIPlugin:
    ...
Returns the plugin instance consumed by prime.

Decorators

@vf.stop

@vf.stop
async def my_condition(self, state: State) -> bool:
    """Return True to end the rollout."""
    ...

@vf.stop(priority=10)  # Higher priority runs first
async def early_check(self, state: State) -> bool:
    ...
Mark a method as a stop condition. All stop conditions are checked by is_completed().

@vf.cleanup

@vf.cleanup
async def my_cleanup(self, state: State) -> None:
    """Called after each rollout completes."""
    ...

@vf.cleanup(priority=10)
async def early_cleanup(self, state: State) -> None:
    ...
Mark a method as a rollout cleanup handler. Cleanup methods should be idempotent—safe to call multiple times—and handle errors gracefully to ensure cleanup completes even when resources are in unexpected states.

@vf.teardown

@vf.teardown
async def my_teardown(self) -> None:
    """Called when environment is destroyed."""
    ...

@vf.teardown(priority=10)
async def early_teardown(self) -> None:
    ...
Mark a method as an environment teardown handler.

Utility Functions

Data Utilities

vf.load_example_dataset(name: str) -> Dataset
Load a built-in example dataset.
vf.extract_boxed_answer(text: str, strict: bool = False) -> str
Extract answer from LaTeX \boxed{} format. When strict=True, returns "" if no \boxed{} is found (used by MathRubric to avoid scoring unformatted responses). When strict=False (default), returns the original text as a passthrough.
vf.extract_hash_answer(text: str) -> str | None
Extract answer after #### marker (GSM8K format).

Environment Utilities

vf.load_environment(env_id: str, **kwargs) -> Environment
Load an environment by ID (e.g., "primeintellect/gsm8k").

Configuration Utilities

vf.ensure_keys(keys: list[str]) -> None
Validate that required environment variables are set. Raises MissingKeyError (a ValueError subclass) with a clear message listing all missing keys and instructions for setting them.
class MissingKeyError(ValueError):
    keys: list[str]  # list of missing key names
Example:
def load_environment(api_key_var: str = "OPENAI_API_KEY") -> vf.Environment:
    vf.ensure_keys([api_key_var])
    # now safe to use os.environ[api_key_var]
    ...

Logging Utilities

vf.print_prompt_completions_sample(outputs: GenerateOutputs, n: int = 3)
Pretty-print sample rollouts.
vf.setup_logging(level: str = "INFO")
Configure verifiers logging. Set VF_LOG_LEVEL env var to change default.
vf.log_level(level: str | int)
Context manager to temporarily set the verifiers logger to a new log level. Useful for temporarily adjusting verbosity during specific operations.
with vf.log_level("DEBUG"):
    # verifiers logs at DEBUG level here
    ...
# reverts to previous level
vf.quiet_verifiers()
Context manager to temporarily silence verifiers logging by setting WARNING level. Shorthand for vf.log_level("WARNING").
with vf.quiet_verifiers():
    # verifiers logging is quieted here
    outputs = env.generate(...)
# logging restored