Python SDK
Async Python SDK for the FastVM microVM platform. Launch, snapshot, restore, and run commands on microVMs over a multiplexed HTTP/2 connection.
Installation
pip install fastvmOr with uv:
uv add fastvmQuick Start
Set your API key, then use the async context manager:
import asyncio from fastvm import FastVM async def main(): async with FastVM() as client: vm = await client.launch(machine="c1m2") snapshot = await client.snapshot(vm, name="my-snap") restored = await client.restore(snapshot) result = await client.run(restored, "echo hello") print(result.stdout) # hello asyncio.run(main())
launch, restore, and snapshot all block until the resource is ready. The client pre-establishes an HTTP/2 connection on entry so that all subsequent calls reuse a single multiplexed TLS session with zero additional handshake cost. If name is omitted, a UUID is generated automatically.
Configuration
The SDK reads environment variables when constructor arguments are not provided:
| Variable | Required | Default | Description |
|---|---|---|---|
| FASTVM_API_KEY | Yes | — | API key for authentication |
| FASTVM_BASE_URL | No | https://api.fastvm.org | API base URL |
You can also pass credentials explicitly:
client = FastVM(api_key="your-key", base_url="https://api.fastvm.org")
FastVM()
FastVM( api_key: str | None = None, base_url: str | None = None, timeout: float = 300.0, ) -> None
Creates a new SDK client. Uses HTTP/2 for multiplexed connections. Supports async with for automatic cleanup:
async with FastVM() as client: # warmup() called on entry, close() on exit ...
VM Management
client.launch()
async def launch( *, machine: str = "c1m2", name: str | None = None, disk_gib: int | None = None, firewall: FirewallPolicy | None = None, timeout: float = 300.0, ) -> VM
Description
Launch a new microVM and wait until it is ready.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| machine | str | "c1m2" | Machine type — c{vCPUs}m{GiB RAM}. Options: c1m2, c2m4, c4m8, c8m16 |
| name | str | None | None | VM name. Auto-generates UUID if omitted |
| disk_gib | int | None | None | Disk size in GiB |
| firewall | FirewallPolicy | None | None | Public IPv6 ingress firewall policy |
| timeout | float | 300.0 | Seconds to wait for 'running' status |
Returns
VM
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
Example
vm = await client.launch(machine="c2m4", name="my-vm", disk_gib=20)
client.restore()
async def restore( snapshot: str | Snapshot, *, name: str | None = None, firewall: FirewallPolicy | None = None, timeout: float = 300.0, ) -> VM
Description
Restore a VM from a snapshot and wait until it is ready.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| snapshot | str | Snapshot | — | Snapshot ID or Snapshot object |
| name | str | None | None | VM name. Auto-generates UUID if omitted |
| firewall | FirewallPolicy | None | None | Override the snapshot's firewall policy |
| timeout | float | 300.0 | Seconds to wait |
Returns
VM
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
Example
restored = await client.restore(snapshot, name="worker-01")
client.get()
async def get(vm: str | VM) -> VM
Description
Fetch the current state of a VM.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM ID or VM object |
Returns
VM
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
client.list()
async def list() -> list[VM]
Description
List all VMs in the organization.
Returns
list[VM]
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
client.remove()
async def remove(vm: str | VM) -> None
Description
Delete a VM.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM ID or VM object |
client.rename()
async def rename(vm: str | VM, name: str) -> VM
Description
Rename a VM.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM ID or VM object |
| name | str | — | New name for the VM |
Returns
VM
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
Example
vm = await client.rename(vm, "new-name")
client.set_firewall()
async def set_firewall(vm: str | VM, policy: FirewallPolicy) -> VM
Description
Replace the VM's public IPv6 ingress firewall policy.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM ID or VM object |
| policy | FirewallPolicy | — | New firewall policy |
Returns
VM
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
Example
vm = await client.set_firewall(vm, FirewallPolicy(mode="open"))
client.patch_firewall()
async def patch_firewall(vm: str | VM, patch: PatchFirewallPolicy) -> VM
Description
Partially update the VM's firewall. Only the provided fields are changed.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM ID or VM object |
| patch | PatchFirewallPolicy | — | Partial firewall update (mode and/or ingress rules) |
Returns
VM
| Field | Type | Description |
|---|---|---|
| id | str | Unique VM identifier |
| name | str | VM name |
| org_id | str | Organization ID |
| machine_name | str | Machine type name |
| cpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| status | str | Current status (provisioning, running, stopped, deleting, error) |
| created_at | str | ISO 8601 creation timestamp |
| source_name | str | Source snapshot or image name |
| public_ipv6 | str | None | Public IPv6 address if assigned |
| firewall | FirewallPolicy | None | Firewall policy if configured |
| deleted_at | str | None | Deletion timestamp or None |
Example
vm = await client.patch_firewall(vm, PatchFirewallPolicy(mode="restricted"))
Command Execution
client.run()
async def run( vm: str | VM, command: str | list[str], *, timeout_sec: int | None = None, timeout: float | None = None, ) -> CommandResult
Description
Execute a command on a VM. String commands are wrapped as sh -c '…'; pass a list for an explicit argv.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM ID or VM object |
| command | str | list[str] | — | Shell command string or explicit argv list |
| timeout_sec | int | None | None | Server-side command timeout in seconds |
| timeout | float | None | None | Client-side transport timeout (defaults to constructor timeout) |
Returns
CommandResult
| Field | Type | Description |
|---|---|---|
| exit_code | int | Process exit code |
| stdout | str | Standard output |
| stderr | str | Standard error |
| timed_out | bool | Whether the command timed out |
| stdout_truncated | bool | Whether stdout was truncated |
| stderr_truncated | bool | Whether stderr was truncated |
| duration_ms | int | Execution time in milliseconds |
Example
result = await client.run(vm, "python3 --version") print(result.stdout) # Python 3.11.4 print(result.exit_code) # 0 print(result.duration_ms) # 42
Long-Running Tasks
client.run() is connection-bound — if the SDK disconnects or the transport timeout is reached, the command is killed. For tasks that must outlive the connection, detach them inside the VM.
Fire-and-forget with nohup
Use nohup to start a background process that survives disconnection:
# Start a long-running training job in the background await client.run(vm, "nohup python3 train.py > /tmp/train.log 2>&1 &") # Check on it later result = await client.run(vm, "tail -20 /tmp/train.log") print(result.stdout)
Persistent sessions with tmux
Use tmux when you need to re-attach or inspect live output:
# Start a tmux session with the long-running task await client.run(vm, "tmux new-session -d -s train 'python3 train.py'") # Capture output later result = await client.run(vm, "tmux capture-pane -t train -p") print(result.stdout)
| Pattern | Use when |
|---|---|
| client.run(vm, cmd) | Commands that finish within the transport timeout |
| nohup cmd & | Fire-and-forget background jobs |
| tmux / screen | Need to re-attach or inspect live output |
Snapshots
client.snapshot()
async def snapshot( vm: str | VM, *, name: str = "", timeout: float = 300.0, ) -> Snapshot
Description
Create a snapshot of a running VM. Blocks until the snapshot is ready.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| vm | str | VM | — | VM to snapshot |
| name | str | "" | Snapshot name. Auto-generates UUID if empty |
| timeout | float | 300.0 | Seconds to wait |
Returns
Snapshot
| Field | Type | Description |
|---|---|---|
| id | str | Unique snapshot identifier |
| name | str | Snapshot name |
| org_id | str | Organization ID |
| vm_id | str | Source VM ID |
| status | str | Current status (creating, ready, error) |
| created_at | str | ISO 8601 creation timestamp |
| firewall | FirewallPolicy | None | Firewall policy from the source VM |
Example
snap = await client.snapshot(vm, name="before-deploy")
client.list_snapshots()
async def list_snapshots() -> list[Snapshot]
Description
List all snapshots in the organization.
Returns
list[Snapshot]
| Field | Type | Description |
|---|---|---|
| id | str | Unique snapshot identifier |
| name | str | Snapshot name |
| org_id | str | Organization ID |
| vm_id | str | Source VM ID |
| status | str | Current status (creating, ready, error) |
| created_at | str | ISO 8601 creation timestamp |
| firewall | FirewallPolicy | None | Firewall policy from the source VM |
client.remove_snapshot()
async def remove_snapshot(snapshot: str | Snapshot) -> None
Description
Delete a snapshot.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| snapshot | str | Snapshot | — | Snapshot ID or object |
client.rename_snapshot()
async def rename_snapshot(snapshot: str | Snapshot, name: str) -> Snapshot
Description
Rename a snapshot.
Parameters
| Param | Type | Default | Description |
|---|---|---|---|
| snapshot | str | Snapshot | — | Snapshot ID or object |
| name | str | — | New name for the snapshot |
Returns
Snapshot
| Field | Type | Description |
|---|---|---|
| id | str | Unique snapshot identifier |
| name | str | Snapshot name |
| org_id | str | Organization ID |
| vm_id | str | Source VM ID |
| status | str | Current status (creating, ready, error) |
| created_at | str | ISO 8601 creation timestamp |
| firewall | FirewallPolicy | None | Firewall policy from the source VM |
Example
snap = await client.rename_snapshot(snap, "v2-stable")
Firewall
VMs support public IPv6 ingress firewall policies. Pass a FirewallPolicy to launch() or restore() to configure which ports are open:
from fastvm import FastVM, FirewallPolicy, FirewallRule async with FastVM() as client: policy = FirewallPolicy( mode="restricted", ingress=[ FirewallRule(protocol="tcp", port_start=22, description="SSH"), FirewallRule(protocol="tcp", port_start=8080, description="HTTP"), ], ) vm = await client.launch(firewall=policy)
FirewallPolicy
| Field | Type | Description |
|---|---|---|
| mode | str | Firewall mode ("open" or "restricted") |
| ingress | list[FirewallRule] | List of ingress allow rules |
FirewallRule
| Field | Type | Description |
|---|---|---|
| protocol | str | Protocol ("tcp" or "udp") |
| port_start | int | Start of port range |
| port_end | int | None | End of port range (None = single port) |
| source_cidrs | list[str] | Allowed source CIDRs (empty = any) |
| description | str | Human-readable description |
PatchFirewallPolicy
| Field | Type | Description |
|---|---|---|
| mode | str | None | Firewall mode to set (None = unchanged) |
| ingress | list[FirewallRule] | None | Ingress rules to set (None = unchanged) |
Quotas
client.quotas()
async def quotas() -> OrgQuotaUsage
Description
Fetch organization quota limits and current usage.
Returns
OrgQuotaUsage
| Field | Type | Description |
|---|---|---|
| org_id | str | Organization ID |
| limits | OrgQuotaValues | Maximum allowed resources |
| usage | OrgQuotaValues | Currently consumed resources |
Example
usage = await client.quotas() print(f"vCPUs: {usage.usage.vcpu} / {usage.limits.vcpu}")
OrgQuotaUsage
| Field | Type | Description |
|---|---|---|
| org_id | str | Organization ID |
| limits | OrgQuotaValues | Maximum allowed resources |
| usage | OrgQuotaValues | Currently consumed resources |
OrgQuotaValues
| Field | Type | Description |
|---|---|---|
| vcpu | int | Number of vCPUs |
| memory_mib | int | Memory in MiB |
| disk_gib | int | Disk in GiB |
| snapshot_count | int | Number of snapshots |
Branching
Branching is implemented via snapshot + restore. Snapshot a VM, then restore it as many times as needed — concurrently with asyncio.gather:
# Create a branching point snap = await client.snapshot(base_vm, name="branch-point") # Fork into 10 parallel paths branches = await asyncio.gather(*( client.restore(snap, name=f"branch-{i}") for i in range(10) )) # Run different strategies in parallel results = await asyncio.gather(*( client.run(branch, f"python3 solve.py --seed={i}") for i, branch in enumerate(branches) )) # Pick the winner best = min(results, key=lambda r: r.duration_ms)