Why Async?

Most sandbox automations spin up more than one environment. The async client lets you fan out creates, waits, commands, and teardown without juggling threads. Want to explore every helper? Peek at src/prime_cli/api/sandbox.py for the complete API.
import asyncio
from prime_cli.api.sandbox import AsyncSandboxClient, CreateSandboxRequest

async def launch_demo() -> None:
    async with AsyncSandboxClient() as sandboxes:
        request = CreateSandboxRequest(
            name="sdk-demo",
            docker_image="python:3.11-slim",
            timeout_minutes=120,
        )
        sandbox = await sandboxes.create(request)
        await sandboxes.wait_for_creation(sandbox.id)
        result = await sandboxes.execute_command(sandbox.id, "python -c 'print(42)'")
        print(result.stdout.strip())
        await sandboxes.delete(sandbox.id)

asyncio.run(launch_demo())

Launch a Fleet

async def create_many(images: list[str]) -> None:
    async with AsyncSandboxClient() as sandboxes:
        requests = [
            CreateSandboxRequest(name=f"batch-{i}", docker_image=image)
            for i, image in enumerate(images, start=1)
        ]
        created = await asyncio.gather(*[sandboxes.create(req) for req in requests])
        await sandboxes.bulk_wait_for_creation([sbx.id for sbx in created])
        print("Ready:", ", ".join(sbx.name for sbx in created))

# asyncio.run(create_many(["python:3.11-slim", "node:20-slim"]))
bulk_wait_for_creation polls via the list endpoint, backing off automatically if the API throttles you.

Run Commands & Collect Logs

async def smoke_test(sandbox_id: str) -> None:
    async with AsyncSandboxClient() as sandboxes:
        results = await sandboxes.execute_command(
            sandbox_id,
            "python -c 'import platform; print(platform.python_version())'",
        )
        print("stdout:", results.stdout.strip())
        logs = await sandboxes.get_logs(sandbox_id)
        print("logs snippet:", logs[:120])
Command responses include stdout, stderr, and exit code so you can short-circuit pipelines when something breaks.

Move Data In and Out

async def sync_artifacts(sandbox_id: str) -> None:
    async with AsyncSandboxClient() as sandboxes:
        await sandboxes.upload_file(sandbox_id, "/workspace/model.bin", "./artifacts/model.bin")
        await sandboxes.download_file(sandbox_id, "/workspace/report.csv", "./reports/report.csv")
Uploads/downloads use short-lived gateway tokens stored in a local cache. Call sandboxes.clear_auth_cache() if you rotate credentials or hit 401s.

Clean Exit

async def teardown(ids: list[str]) -> None:
    async with AsyncSandboxClient() as sandboxes:
        await asyncio.gather(*[sandboxes.delete(sbx_id) for sbx_id in ids])

# asyncio.run(teardown(["sbx_123", "sbx_456"]))
For a full script, see prime-cli/examples/sandbox_async_demo.py, which covers create → wait → run → logs → delete.