Skip to main content

Why Async?

Most sandbox automations spin up more than one environment. The async client lets you fan out creates, waits, commands, and teardown without juggling threads. Want to explore every helper? Peek at src/prime_cli/api/sandbox.py for the complete API.
import asyncio
from prime_cli.api.sandbox import AsyncSandboxClient, CreateSandboxRequest

async def launch_demo() -> None:
    async with AsyncSandboxClient() as sandboxes:
        request = CreateSandboxRequest(
            name="sdk-demo",
            docker_image="python:3.11-slim",
            labels=["experiment", "ml-pipeline", "team-research"],
            timeout_minutes=120,
        )
        sandbox = await sandboxes.create(request)
        await sandboxes.wait_for_creation(sandbox.id)
        result = await sandboxes.execute_command(sandbox.id, "python -c 'print(42)'")
        print(result.stdout.strip())
        await sandboxes.delete(sandbox.id)

asyncio.run(launch_demo())

Launch a Fleet

async def create_many(images: list[str]) -> None:
    async with AsyncSandboxClient() as sandboxes:
        requests = [
            CreateSandboxRequest(name=f"batch-{i}", docker_image=image)
            for i, image in enumerate(images, start=1)
        ]
        created = await asyncio.gather(*[sandboxes.create(req) for req in requests])
        await sandboxes.bulk_wait_for_creation([sbx.id for sbx in created])
        print("Ready:", ", ".join(sbx.name for sbx in created))

# asyncio.run(create_many(["python:3.11-slim", "node:20-slim"]))
bulk_wait_for_creation polls via the list endpoint, backing off automatically if the API throttles you.

Run Commands & Collect Logs

async def smoke_test(sandbox_id: str) -> None:
    async with AsyncSandboxClient() as sandboxes:
        results = await sandboxes.execute_command(
            sandbox_id,
            "python -c 'import platform; print(platform.python_version())'",
        )
        print("stdout:", results.stdout.strip())
        logs = await sandboxes.get_logs(sandbox_id)
        print("logs snippet:", logs[:120])
Command responses include stdout, stderr, and exit code so you can short-circuit pipelines when something breaks.

Move Data In and Out

async def sync_artifacts(sandbox_id: str) -> None:
    async with AsyncSandboxClient() as sandboxes:
        await sandboxes.upload_file(sandbox_id, "/workspace/model.bin", "./artifacts/model.bin")
        await sandboxes.download_file(sandbox_id, "/workspace/report.csv", "./reports/report.csv")
Note: File uploads are limited to 200MB per file. Uploads/downloads use short-lived gateway tokens stored in a local cache. Call sandboxes.clear_auth_cache() if you rotate credentials or hit 401s.

Expose Ports

Make HTTP services inside your sandbox accessible over the internet.
Currently only HTTP is supported. TCP/UDP support is coming soon.
async def run_web_server() -> None:
    async with AsyncSandboxClient() as sandboxes:
        sandbox = await sandboxes.create(
            CreateSandboxRequest(name="web-server", docker_image="python:3.11-slim")
        )
        await sandboxes.wait_for_creation(sandbox.id)

        await sandboxes.execute_command(
            sandbox.id,
            "nohup python -m http.server 8000 --bind 0.0.0.0 > /dev/null 2>&1 &",
        )

        exposed = await sandboxes.expose(sandbox.id, port=8000, name="web-server")
        print(f"Server available at: {exposed.url}")

        await sandboxes.unexpose(sandbox.id, exposed.exposure_id)
        await sandboxes.delete(sandbox.id)
Works with Flask, FastAPI, Jupyter, or any HTTP service.

Network Isolation

For running untrusted code, create sandboxes without internet access:
async def isolated_sandbox() -> None:
    async with AsyncSandboxClient() as sandboxes:
        # Create a sandbox without outbound internet access
        request = CreateSandboxRequest(
            name="isolated-runner",
            docker_image="python:3.11-slim",
            network_access=False,  # Disable outbound internet
        )
        sandbox = await sandboxes.create(request)
        await sandboxes.wait_for_creation(sandbox.id)

        # Code runs in isolation - no external network calls possible
        result = await sandboxes.execute_command(
            sandbox.id,
            "python -c 'import urllib.request; urllib.request.urlopen(\"https://example.com\")'",
        )
        # This will fail with a network error

        await sandboxes.delete(sandbox.id)
When network_access=False:
  • Outbound connections to the internet are blocked
  • DNS resolution for internal services still works
By default, network_access=True and sandboxes have full internet access.

Long-Running Commands

Commands can run up to 15 minutes using the timeout parameter:
async def run_long_command(sandbox_id: str) -> None:
    async with AsyncSandboxClient() as sandboxes:
        # Run a command that takes up to 15 minutes
        result = await sandboxes.execute_command(
            sandbox_id,
            "python preprocessing.py --dataset large",
            timeout=900,  # 15 minutes max
        )
        print(f"Exit code: {result.exit_code}")
For tasks longer than 15 minutes, use background jobs instead. They’re more reliable and won’t tie up your connection.

Background Jobs

Use start_background_job for tasks that run longer than 15 minutes. The job continues running in the sandbox while you poll for completion.
async def run_training_job() -> None:
    async with AsyncSandboxClient() as sandboxes:
        sandbox = await sandboxes.create(
            CreateSandboxRequest(
                name="training-job",
                docker_image="python:3.11-slim",
                timeout_minutes=1440,  # 24 hours
                cpu_cores=4,
                memory_gb=16,
            )
        )
        await sandboxes.wait_for_creation(sandbox.id)

        # Start a long-running job in the background
        job = await sandboxes.start_background_job(
            sandbox.id,
            "python train.py --epochs 100"
        )
        print(f"Job started: {job.job_id}")

        # Poll for completion
        while True:
            status = await sandboxes.get_background_job(sandbox.id, job)
            if status.completed:
                print(f"Exit code: {status.exit_code}")
                print(status.stdout)
                break
            print("Still running...")
            await asyncio.sleep(30)

        # Download results
        await sandboxes.download_file(sandbox.id, "/app/model.pt", "./model.pt")
        await sandboxes.delete(sandbox.id)
The timeout_minutes parameter controls how long the sandbox stays alive. Background jobs persist across API calls until completion or sandbox termination.

Clean Exit

async def teardown(ids: list[str]) -> None:
    async with AsyncSandboxClient() as sandboxes:
        await asyncio.gather(*[sandboxes.delete(sbx_id) for sbx_id in ids])

# asyncio.run(teardown(["sbx_123", "sbx_456"]))
For a full script, see prime-cli/examples/sandbox_async_demo.py, which covers create → wait → run → logs → delete.