Python SDK

Async Python SDK for the FastVM microVM platform. Launch, snapshot, restore, and run commands on microVMs over a multiplexed HTTP/2 connection.

Installation

shell
pip install fastvm

Or with uv:

shell
uv add fastvm

Quick Start

Set your API key, then use the async context manager:

python
import asyncio
from fastvm import FastVM

async def main():
    async with FastVM() as client:
        vm = await client.launch(machine="c1m2")
        snapshot = await client.snapshot(vm, name="my-snap")
        restored = await client.restore(snapshot)
        result = await client.run(restored, "echo hello")
        print(result.stdout)  # hello

asyncio.run(main())

launch, restore, and snapshot all block until the resource is ready. The client pre-establishes an HTTP/2 connection on entry so that all subsequent calls reuse a single multiplexed TLS session with zero additional handshake cost. If name is omitted, a UUID is generated automatically.

Configuration

The SDK reads environment variables when constructor arguments are not provided:

VariableRequiredDefaultDescription
FASTVM_API_KEYYesAPI key for authentication
FASTVM_BASE_URLNohttps://api.fastvm.orgAPI base URL

You can also pass credentials explicitly:

python
client = FastVM(api_key="your-key", base_url="https://api.fastvm.org")

FastVM()

FastVM(
    api_key: str | None = None,
    base_url: str | None = None,
    timeout: float = 300.0,
) -> None

Creates a new SDK client. Uses HTTP/2 for multiplexed connections. Supports async with for automatic cleanup:

python
async with FastVM() as client:
    # warmup() called on entry, close() on exit
    ...

VM Management

client.launch()

async def launch(
    *,
    machine: str = "c1m2",
    name: str | None = None,
    disk_gib: int | None = None,
    firewall: FirewallPolicy | None = None,
    timeout: float = 300.0,
) -> VM

Description

Launch a new microVM and wait until it is ready.

Parameters

ParamTypeDefaultDescription
machinestr"c1m2"Machine type — c{vCPUs}m{GiB RAM}. Options: c1m2, c2m4, c4m8, c8m16
namestr | NoneNoneVM name. Auto-generates UUID if omitted
disk_gibint | NoneNoneDisk size in GiB
firewallFirewallPolicy | NoneNonePublic IPv6 ingress firewall policy
timeoutfloat300.0Seconds to wait for 'running' status

Returns

VM
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

Example

python
vm = await client.launch(machine="c2m4", name="my-vm", disk_gib=20)

client.restore()

async def restore(
    snapshot: str | Snapshot,
    *,
    name: str | None = None,
    firewall: FirewallPolicy | None = None,
    timeout: float = 300.0,
) -> VM

Description

Restore a VM from a snapshot and wait until it is ready.

Parameters

ParamTypeDefaultDescription
snapshotstr | SnapshotSnapshot ID or Snapshot object
namestr | NoneNoneVM name. Auto-generates UUID if omitted
firewallFirewallPolicy | NoneNoneOverride the snapshot's firewall policy
timeoutfloat300.0Seconds to wait

Returns

VM
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

Example

python
restored = await client.restore(snapshot, name="worker-01")

client.get()

async def get(vm: str | VM) -> VM

Description

Fetch the current state of a VM.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM ID or VM object

Returns

VM
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

client.list()

async def list() -> list[VM]

Description

List all VMs in the organization.

Returns

list[VM]
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

client.remove()

async def remove(vm: str | VM) -> None

Description

Delete a VM.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM ID or VM object

client.rename()

async def rename(vm: str | VM, name: str) -> VM

Description

Rename a VM.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM ID or VM object
namestrNew name for the VM

Returns

VM
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

Example

python
vm = await client.rename(vm, "new-name")

client.set_firewall()

async def set_firewall(vm: str | VM, policy: FirewallPolicy) -> VM

Description

Replace the VM's public IPv6 ingress firewall policy.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM ID or VM object
policyFirewallPolicyNew firewall policy

Returns

VM
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

Example

python
vm = await client.set_firewall(vm, FirewallPolicy(mode="open"))

client.patch_firewall()

async def patch_firewall(vm: str | VM, patch: PatchFirewallPolicy) -> VM

Description

Partially update the VM's firewall. Only the provided fields are changed.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM ID or VM object
patchPatchFirewallPolicyPartial firewall update (mode and/or ingress rules)

Returns

VM
FieldTypeDescription
idstrUnique VM identifier
namestrVM name
org_idstrOrganization ID
machine_namestrMachine type name
cpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
statusstrCurrent status (provisioning, running, stopped, deleting, error)
created_atstrISO 8601 creation timestamp
source_namestrSource snapshot or image name
public_ipv6str | NonePublic IPv6 address if assigned
firewallFirewallPolicy | NoneFirewall policy if configured
deleted_atstr | NoneDeletion timestamp or None

Example

python
vm = await client.patch_firewall(vm, PatchFirewallPolicy(mode="restricted"))

Command Execution

client.run()

async def run(
    vm: str | VM,
    command: str | list[str],
    *,
    timeout_sec: int | None = None,
    timeout: float | None = None,
) -> CommandResult

Description

Execute a command on a VM. String commands are wrapped as sh -c '…'; pass a list for an explicit argv.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM ID or VM object
commandstr | list[str]Shell command string or explicit argv list
timeout_secint | NoneNoneServer-side command timeout in seconds
timeoutfloat | NoneNoneClient-side transport timeout (defaults to constructor timeout)

Returns

CommandResult
FieldTypeDescription
exit_codeintProcess exit code
stdoutstrStandard output
stderrstrStandard error
timed_outboolWhether the command timed out
stdout_truncatedboolWhether stdout was truncated
stderr_truncatedboolWhether stderr was truncated
duration_msintExecution time in milliseconds

Example

python
result = await client.run(vm, "python3 --version")
print(result.stdout)       # Python 3.11.4
print(result.exit_code)    # 0
print(result.duration_ms)  # 42

Long-Running Tasks

client.run() is connection-bound — if the SDK disconnects or the transport timeout is reached, the command is killed. For tasks that must outlive the connection, detach them inside the VM.

Fire-and-forget with nohup

Use nohup to start a background process that survives disconnection:

python
# Start a long-running training job in the background
await client.run(vm, "nohup python3 train.py > /tmp/train.log 2>&1 &")

# Check on it later
result = await client.run(vm, "tail -20 /tmp/train.log")
print(result.stdout)

Persistent sessions with tmux

Use tmux when you need to re-attach or inspect live output:

python
# Start a tmux session with the long-running task
await client.run(vm, "tmux new-session -d -s train 'python3 train.py'")

# Capture output later
result = await client.run(vm, "tmux capture-pane -t train -p")
print(result.stdout)
PatternUse when
client.run(vm, cmd)Commands that finish within the transport timeout
nohup cmd &Fire-and-forget background jobs
tmux / screenNeed to re-attach or inspect live output

Snapshots

client.snapshot()

async def snapshot(
    vm: str | VM,
    *,
    name: str = "",
    timeout: float = 300.0,
) -> Snapshot

Description

Create a snapshot of a running VM. Blocks until the snapshot is ready.

Parameters

ParamTypeDefaultDescription
vmstr | VMVM to snapshot
namestr""Snapshot name. Auto-generates UUID if empty
timeoutfloat300.0Seconds to wait

Returns

Snapshot
FieldTypeDescription
idstrUnique snapshot identifier
namestrSnapshot name
org_idstrOrganization ID
vm_idstrSource VM ID
statusstrCurrent status (creating, ready, error)
created_atstrISO 8601 creation timestamp
firewallFirewallPolicy | NoneFirewall policy from the source VM

Example

python
snap = await client.snapshot(vm, name="before-deploy")

client.list_snapshots()

async def list_snapshots() -> list[Snapshot]

Description

List all snapshots in the organization.

Returns

list[Snapshot]
FieldTypeDescription
idstrUnique snapshot identifier
namestrSnapshot name
org_idstrOrganization ID
vm_idstrSource VM ID
statusstrCurrent status (creating, ready, error)
created_atstrISO 8601 creation timestamp
firewallFirewallPolicy | NoneFirewall policy from the source VM

client.remove_snapshot()

async def remove_snapshot(snapshot: str | Snapshot) -> None

Description

Delete a snapshot.

Parameters

ParamTypeDefaultDescription
snapshotstr | SnapshotSnapshot ID or object

client.rename_snapshot()

async def rename_snapshot(snapshot: str | Snapshot, name: str) -> Snapshot

Description

Rename a snapshot.

Parameters

ParamTypeDefaultDescription
snapshotstr | SnapshotSnapshot ID or object
namestrNew name for the snapshot

Returns

Snapshot
FieldTypeDescription
idstrUnique snapshot identifier
namestrSnapshot name
org_idstrOrganization ID
vm_idstrSource VM ID
statusstrCurrent status (creating, ready, error)
created_atstrISO 8601 creation timestamp
firewallFirewallPolicy | NoneFirewall policy from the source VM

Example

python
snap = await client.rename_snapshot(snap, "v2-stable")

Firewall

VMs support public IPv6 ingress firewall policies. Pass a FirewallPolicy to launch() or restore() to configure which ports are open:

python
from fastvm import FastVM, FirewallPolicy, FirewallRule

async with FastVM() as client:
    policy = FirewallPolicy(
        mode="restricted",
        ingress=[
            FirewallRule(protocol="tcp", port_start=22, description="SSH"),
            FirewallRule(protocol="tcp", port_start=8080, description="HTTP"),
        ],
    )
    vm = await client.launch(firewall=policy)

FirewallPolicy

FieldTypeDescription
modestrFirewall mode ("open" or "restricted")
ingresslist[FirewallRule]List of ingress allow rules

FirewallRule

FieldTypeDescription
protocolstrProtocol ("tcp" or "udp")
port_startintStart of port range
port_endint | NoneEnd of port range (None = single port)
source_cidrslist[str]Allowed source CIDRs (empty = any)
descriptionstrHuman-readable description

PatchFirewallPolicy

FieldTypeDescription
modestr | NoneFirewall mode to set (None = unchanged)
ingresslist[FirewallRule] | NoneIngress rules to set (None = unchanged)

Quotas

client.quotas()

async def quotas() -> OrgQuotaUsage

Description

Fetch organization quota limits and current usage.

Returns

OrgQuotaUsage
FieldTypeDescription
org_idstrOrganization ID
limitsOrgQuotaValuesMaximum allowed resources
usageOrgQuotaValuesCurrently consumed resources

Example

python
usage = await client.quotas()
print(f"vCPUs: {usage.usage.vcpu} / {usage.limits.vcpu}")

OrgQuotaUsage

FieldTypeDescription
org_idstrOrganization ID
limitsOrgQuotaValuesMaximum allowed resources
usageOrgQuotaValuesCurrently consumed resources

OrgQuotaValues

FieldTypeDescription
vcpuintNumber of vCPUs
memory_mibintMemory in MiB
disk_gibintDisk in GiB
snapshot_countintNumber of snapshots

Branching

Branching is implemented via snapshot + restore. Snapshot a VM, then restore it as many times as needed — concurrently with asyncio.gather:

python
# Create a branching point
snap = await client.snapshot(base_vm, name="branch-point")

# Fork into 10 parallel paths
branches = await asyncio.gather(*(
    client.restore(snap, name=f"branch-{i}")
    for i in range(10)
))

# Run different strategies in parallel
results = await asyncio.gather(*(
    client.run(branch, f"python3 solve.py --seed={i}")
    for i, branch in enumerate(branches)
))

# Pick the winner
best = min(results, key=lambda r: r.duration_ms)