The SDK ships two clients with identical methods: SandboxClient for synchronous scripts and AsyncSandboxClient for concurrent workloads. Both are importable from prime_sandboxes.
Sync Client
SandboxClient is the simplest way to get started — no async/await needed.
from prime_sandboxes import SandboxClient, CreateSandboxRequest, APIClient
client = SandboxClient(APIClient())
# Create and wait
sandbox = client.create(
CreateSandboxRequest(
name="sdk-demo",
docker_image="python:3.11-slim",
labels=["experiment"],
timeout_minutes=120,
environment_vars={"LOG_LEVEL": "debug"},
secrets={"API_KEY": "sk-abc123"},
)
)
client.wait_for_creation(sandbox.id)
# Run a command
result = client.execute_command(sandbox.id, "python -c 'print(42)'")
print(result.stdout.strip())
# Upload / download files
client.upload_file(sandbox.id, "/workspace/data.csv", "./data.csv")
client.download_file(sandbox.id, "/workspace/output.csv", "./output.csv")
# Expose a port
exposed = client.expose(sandbox.id, port=8000, name="web")
print(exposed.url)
# Clean up
client.delete(sandbox.id)
Every method shown in the async sections below has an identical synchronous counterpart on SandboxClient.
Async Client
Most sandbox automations spin up more than one environment. The async client lets you fan out creates, waits, commands, and teardown without juggling threads.
import asyncio
from prime_sandboxes import AsyncSandboxClient, CreateSandboxRequest
async def launch_demo() -> None:
async with AsyncSandboxClient() as sandboxes:
request = CreateSandboxRequest(
name="sdk-demo",
docker_image="python:3.11-slim",
labels=["experiment", "ml-pipeline", "team-research"],
timeout_minutes=120,
)
sandbox = await sandboxes.create(request)
await sandboxes.wait_for_creation(sandbox.id)
result = await sandboxes.execute_command(sandbox.id, "python -c 'print(42)'")
print(result.stdout.strip())
await sandboxes.delete(sandbox.id)
asyncio.run(launch_demo())
Launch a Fleet
async def create_many(images: list[str]) -> None:
async with AsyncSandboxClient() as sandboxes:
requests = [
CreateSandboxRequest(name=f"batch-{i}", docker_image=image)
for i, image in enumerate(images, start=1)
]
created = await asyncio.gather(*[sandboxes.create(req) for req in requests])
await sandboxes.bulk_wait_for_creation([sbx.id for sbx in created])
print("Ready:", ", ".join(sbx.name for sbx in created))
# asyncio.run(create_many(["python:3.11-slim", "node:20-slim"]))
bulk_wait_for_creation polls via the list endpoint, backing off automatically if the API throttles you.
Run Commands & Collect Logs
async def smoke_test(sandbox_id: str) -> None:
async with AsyncSandboxClient() as sandboxes:
results = await sandboxes.execute_command(
sandbox_id,
"python -c 'import platform; print(platform.python_version())'",
)
print("stdout:", results.stdout.strip())
logs = await sandboxes.get_logs(sandbox_id)
print("logs snippet:", logs[:120])
Command responses include stdout, stderr, and exit code so you can short-circuit pipelines when something breaks.
Move Data In and Out
async def sync_artifacts(sandbox_id: str) -> None:
async with AsyncSandboxClient() as sandboxes:
await sandboxes.upload_file(sandbox_id, "/workspace/model.bin", "./artifacts/model.bin")
await sandboxes.download_file(sandbox_id, "/workspace/report.csv", "./reports/report.csv")
Note: File uploads are limited to 200MB per file.
Uploads/downloads use short-lived gateway tokens stored in a local cache. Call sandboxes.clear_auth_cache() if you rotate credentials or hit 401s.
Expose Ports
Make services inside your sandbox accessible over the internet. Both HTTP and TCP protocols are supported.
Ports must be in the range 22–9000. Ports 8080, 2222, and 8081 cannot be exposed.
HTTP
Expose an HTTP service and get a public HTTPS URL:
async def run_web_server() -> None:
async with AsyncSandboxClient() as sandboxes:
sandbox = await sandboxes.create(
CreateSandboxRequest(name="web-server", docker_image="python:3.11-slim")
)
await sandboxes.wait_for_creation(sandbox.id)
# Start the server as a background job so it keeps running
await sandboxes.start_background_job(
sandbox.id, "python -m http.server 8000 --bind 0.0.0.0"
)
exposed = await sandboxes.expose(sandbox.id, port=8000, name="web-server")
await asyncio.sleep(10)
print(f"Server available at: {exposed.url}")
TCP
Expose a raw TCP service and get a public host:port endpoint:
async def run_tcp_server() -> None:
async with AsyncSandboxClient() as sandboxes:
sandbox = await sandboxes.create(
CreateSandboxRequest(name="tcp-server", docker_image="python:3.11-slim")
)
await sandboxes.wait_for_creation(sandbox.id)
# Start a TCP echo server as a background job
await sandboxes.start_background_job(
sandbox.id,
"python -c \"import socketserver; "
"socketserver.TCPServer(('0.0.0.0', 9000), socketserver.StreamRequestHandler).serve_forever()\"",
)
exposed = await sandboxes.expose(sandbox.id, port=9000, name="tcp-server", protocol="TCP")
print(f"TCP endpoint: {exposed.external_endpoint}")
print(f"External port: {exposed.external_port}")
TCP exposures return an external_endpoint (host:port) and external_port instead of a URL. Connect using any TCP client, for example Python’s socket.create_connection().
Start Command
By default, sandboxes run tail -f /dev/null to keep the container alive for interactive use. Pass start_command to override the image’s ENTRYPOINT with your own process:
sandbox = await sandboxes.create(
CreateSandboxRequest(
name="api-server",
docker_image="python:3.11-slim",
start_command="python serve.py --port 8000",
)
)
If you omit start_command, the default keeps the sandbox idle and ready for execute_command calls.
Environment Variables & Secrets
Pass configuration and credentials when creating a sandbox:
sandbox = await sandboxes.create(
CreateSandboxRequest(
name="configured-runner",
docker_image="python:3.11-slim",
environment_vars={
"APP_ENV": "staging",
"LOG_LEVEL": "debug",
},
secrets={
"DB_PASSWORD": "hunter2",
"API_KEY": "sk-abc123",
},
)
)
Environment variables are stored in plain text. Secrets are encrypted at rest and never returned in API responses — use them for API keys, passwords, and other sensitive values. Both are injected into the container as standard environment variables.
You can also pass per-command environment variables to execute_command:
result = await sandboxes.execute_command(
sandbox.id,
"echo $CUSTOM_VAR",
env={"CUSTOM_VAR": "hello"},
)
Network Isolation
For running untrusted code, create sandboxes without internet access:
async def isolated_sandbox() -> None:
async with AsyncSandboxClient() as sandboxes:
# Create a sandbox without outbound internet access
request = CreateSandboxRequest(
name="isolated-runner",
docker_image="python:3.11-slim",
network_access=False, # Disable outbound internet
)
sandbox = await sandboxes.create(request)
await sandboxes.wait_for_creation(sandbox.id)
# Code runs in isolation - no external network calls possible
result = await sandboxes.execute_command(
sandbox.id,
"python -c 'import urllib.request; urllib.request.urlopen(\"https://example.com\")'",
)
# This will fail with a network error
await sandboxes.delete(sandbox.id)
When network_access=False:
- Outbound connections to the internet are blocked
- DNS resolution for internal services still works
By default, network_access=True and sandboxes have full internet access.
Long-Running Commands
Commands can run up to 15 minutes using the timeout parameter:
async def run_long_command(sandbox_id: str) -> None:
async with AsyncSandboxClient() as sandboxes:
# Run a command that takes up to 15 minutes
result = await sandboxes.execute_command(
sandbox_id,
"python preprocessing.py --dataset large",
timeout=900, # 15 minutes max
)
print(f"Exit code: {result.exit_code}")
For tasks longer than 15 minutes, use background jobs instead. They’re more
reliable and won’t tie up your connection.
Background Jobs
Use start_background_job for tasks that run longer than 15 minutes. The job continues running in the sandbox while you poll for completion.
async def run_training_job() -> None:
async with AsyncSandboxClient() as sandboxes:
sandbox = await sandboxes.create(
CreateSandboxRequest(
name="training-job",
docker_image="python:3.11-slim",
timeout_minutes=1440, # 24 hours
cpu_cores=4,
memory_gb=16,
)
)
await sandboxes.wait_for_creation(sandbox.id)
# Start a long-running job in the background
job = await sandboxes.start_background_job(
sandbox.id,
"python train.py --epochs 100"
)
print(f"Job started: {job.job_id}")
# Poll for completion
while True:
status = await sandboxes.get_background_job(sandbox.id, job)
if status.completed:
print(f"Exit code: {status.exit_code}")
print(status.stdout)
break
print("Still running...")
await asyncio.sleep(30)
# Download results
await sandboxes.download_file(sandbox.id, "/app/model.pt", "./model.pt")
await sandboxes.delete(sandbox.id)
The timeout_minutes parameter controls how long the sandbox stays alive. Background jobs persist across API calls until completion or sandbox termination.
Error Handling
The SDK raises typed exceptions so you can handle specific failure modes. All exceptions are importable directly from prime_sandboxes.
from prime_sandboxes import (
# Sandbox lifecycle errors
SandboxNotRunningError,
SandboxOOMError,
SandboxTimeoutError,
SandboxImagePullError,
# Operation errors
CommandTimeoutError,
UploadTimeoutError,
DownloadTimeoutError,
# API errors
APIError,
UnauthorizedError,
PaymentRequiredError,
APITimeoutError,
)
Sandbox Lifecycle Errors
These are raised when a sandbox is no longer in RUNNING state. They form a hierarchy — catch the base class for broad handling, or specific subclasses for targeted recovery.
| Exception | Cause | Typical Fix |
|---|
SandboxNotRunningError | Operation attempted on a non-running sandbox (terminated, errored, or timed out) | Check sandbox status before operating on it |
SandboxOOMError | Sandbox killed due to out-of-memory | Increase memory_gb in CreateSandboxRequest or optimize memory usage |
SandboxTimeoutError | Sandbox exceeded its timeout_minutes and was terminated | Increase the timeout or split work into smaller tasks |
SandboxImagePullError | Docker image could not be pulled | Verify image name, tag, and registry credentials |
SandboxOOMError, SandboxTimeoutError, and SandboxImagePullError are all subclasses of SandboxNotRunningError.
Operation Errors
Raised during specific operations when the sandbox is still running but the operation itself fails.
| Exception | Cause | Typical Fix |
|---|
CommandTimeoutError | execute_command exceeded its timeout parameter | Increase the per-command timeout, or use start_background_job for long tasks |
UploadTimeoutError | File upload timed out | Check file size (200MB limit) and network conditions |
DownloadTimeoutError | File download timed out | Check file size and network conditions |
API Errors
Raised for HTTP-level failures when communicating with the platform API.
| Exception | Cause | Typical Fix |
|---|
APIError | Base class for all API errors (non-2xx response, malformed response, network failure) | Inspect the error message for details |
UnauthorizedError | Invalid or expired API key (HTTP 401) | Check PRIME_API_KEY or re-run prime login |
PaymentRequiredError | Insufficient balance (HTTP 402) | Top up your account balance |
APITimeoutError | API request timed out before receiving a response | Retry the request; check network connectivity |
Clean Exit
Delete a single sandbox or use bulk_delete to tear down many at once by IDs or labels:
async def teardown() -> None:
async with AsyncSandboxClient() as sandboxes:
# Delete one sandbox
await sandboxes.delete("sbx_123")
# Bulk delete by IDs
result = await sandboxes.bulk_delete(sandbox_ids=["sbx_456", "sbx_789"])
print(f"Deleted: {result.succeeded}, Failed: {result.failed}")
# Bulk delete by labels — removes all sandboxes matching ALL given labels
result = await sandboxes.bulk_delete(labels=["experiment", "staging"])
print(f"Deleted: {result.succeeded}, Failed: {result.failed}")
You must pass either sandbox_ids or labels, not both.
For a full script, see prime-cli/examples/sandbox_async_demo.py, which covers create → wait → run → logs → delete.