Skip to main content
Complete reference for all types, classes, and functions in the Beltic Python SDK.

Validation

validate_developer_credential

Validate a DeveloperCredential against the v1 JSON schema.
def validate_developer_credential(
    input: dict
) -> ValidationResult[DeveloperCredential]

validate_agent_credential

Validate an AgentCredential against the v1 JSON schema.
def validate_agent_credential(
    input: dict
) -> ValidationResult[AgentCredential]

is_developer_credential

Type guard to check if a credential is a DeveloperCredential.
def is_developer_credential(credential: dict) -> bool

is_agent_credential

Type guard to check if a credential is an AgentCredential.
def is_agent_credential(credential: dict) -> bool

Signing

sign_credential

Sign a credential as a JWS token.
def sign_credential(
    credential: dict,
    private_key: CryptoKey,
    options: SignOptions
) -> str

generate_key_pair

Generate a new Ed25519 or ES256 key pair.
def generate_key_pair(
    algorithm: Literal["EdDSA", "ES256"]
) -> tuple[CryptoKey, CryptoKey]

import_key_from_pem

Import a key from PEM format.
def import_key_from_pem(
    pem: bytes,
    algorithm: Literal["EdDSA", "ES256"],
    private: bool = False
) -> CryptoKey

export_key_to_pem

Export a key to PEM format.
def export_key_to_pem(
    key: CryptoKey,
    private: bool = False
) -> bytes

export_public_key

Export a public key as JWK.
def export_public_key(key: CryptoKey) -> dict

import_public_key

Import a public key from JWK.
def import_public_key(
    jwk: dict,
    algorithm: Literal["EdDSA", "ES256"]
) -> CryptoKey

Verification

verify_credential

Verify a JWS token’s signature and claims.
async def verify_credential(
    token: str,
    options: VerifyOptions
) -> VerifiedCredential

decode_token

Decode a JWT without verification (for debugging).
def decode_token(token: str) -> DecodedToken

get_credential_type

Get the type of credential from a token.
def get_credential_type(
    token: str
) -> Literal["developer", "agent"] | None

Trust Chain

verify_agent_trust_chain

Verify complete agent trust chain with policies.
async def verify_agent_trust_chain(
    agent_token: str,
    options: TrustChainOptions
) -> TrustChainResult

Status List

decode_status_list

Decode a compressed Status List 2021 bitstring.
def decode_status_list(encoded_list: str) -> bytes

is_bit_set

Check if a bit is set in a bitstring.
def is_bit_set(bitstring: bytes, index: int) -> bool

check_status_list_2021

Check credential status using Status List 2021.
async def check_status_list_2021(
    status_list_url: str,
    status_list_index: int
) -> CredentialStatus

parse_status_entry

Parse a credentialStatus object from a credential.
def parse_status_entry(
    credential_status: dict | None
) -> StatusEntry | None

StatusListCache

Cache for Status List 2021 fetches.
class StatusListCache:
    def __init__(self, ttl_ms: int = 300000): ...
    async def fetch(self, url: str) -> bytes: ...
    def clear(self) -> None: ...

HTTP Signing (Web Bot Auth)

sign_http_request

Sign an HTTP request per RFC 9421.
async def sign_http_request(
    request: HttpRequestInfo,
    options: HttpSignOptions
) -> HttpSignatureHeaders

create_signed_request

Create a fully signed HTTP request.
async def create_signed_request(
    request: HttpRequestInfo,
    options: HttpSignOptions
) -> dict

compute_jwk_thumbprint

Compute JWK thumbprint (RFC 7638).
def compute_jwk_thumbprint(jwk: dict) -> str

compute_content_digest

Compute Content-Digest header value.
def compute_content_digest(body: bytes) -> str

generate_nonce

Generate a cryptographic nonce.
def generate_nonce() -> str

HTTP Verification

verify_http_signature

Verify HTTP message signature.
async def verify_http_signature(
    request: IncomingHttpRequest,
    options: HttpVerifyOptions
) -> HttpVerificationResult

fetch_key_directory

Fetch a key directory from URL.
async def fetch_key_directory(url: str) -> KeyDirectory

find_key_by_thumbprint

Find a key in a directory by thumbprint.
def find_key_by_thumbprint(
    directory: KeyDirectory,
    thumbprint: str
) -> dict | None

create_default_key_resolver

Create a default key resolver for HTTP verification.
def create_default_key_resolver() -> Callable

verify_content_digest

Verify Content-Digest header.
def verify_content_digest(body: bytes, header: str) -> bool

Key Directory

generate_key_directory

Generate a key directory JSON structure.
def generate_key_directory(
    options: GenerateDirectoryOptions
) -> dict

sign_directory_response

Sign a key directory response.
async def sign_directory_response(
    directory: dict,
    options: SignDirectoryOptions
) -> SignedDirectoryResponse

generate_web_bot_auth_setup

Generate complete Web Bot Auth setup.
async def generate_web_bot_auth_setup(
    base_url: str | None = None
) -> dict

Replay Protection

InMemoryJtiStore

In-memory JTI store for replay protection.
class InMemoryJtiStore:
    def __init__(self, ttl_seconds: int = 300): ...
    async def has(self, jti: str) -> bool: ...
    async def add(self, jti: str, exp: int) -> None: ...
    async def cleanup(self) -> None: ...

ReplayDetectedError

Error raised when replay is detected.
class ReplayDetectedError(BelticError):
    jti: str
    first_seen: datetime

Content Integrity

compute_content_hash

Compute hash of content.
def compute_content_hash(
    content: bytes,
    algorithm: ContentHashAlgorithm = "sha-256"
) -> bytes

verify_content_hash

Verify content against hash.
def verify_content_hash(
    content: bytes,
    hash: bytes,
    algorithm: ContentHashAlgorithm = "sha-256"
) -> bool

create_hash_claim

Create a hash claim for embedding in credentials.
def create_hash_claim(
    content: bytes,
    algorithm: ContentHashAlgorithm = "sha-256"
) -> dict

verify_hash_claim

Verify a hash claim.
def verify_hash_claim(content: bytes, claim: dict) -> bool

Hash Chains

CredentialChain

Credential hash chain for auditing.
class CredentialChain:
    def __init__(self): ...
    def add(self, credential: dict, token: str) -> ChainEntry: ...
    def get_proof(self) -> ChainProof: ...
    def verify(self) -> bool: ...
    def __len__(self) -> int: ...

compute_chain_hash

Compute hash for chain entry.
def compute_chain_hash(
    credential_hash: bytes,
    previous_hash: bytes | None
) -> bytes

verify_chain_integrity

Verify chain integrity.
def verify_chain_integrity(entries: list[ChainEntry]) -> bool

Multi-Signature

MultiSigCredential

Credential with multiple signatures.
class MultiSigCredential:
    def __init__(self, credential: dict): ...
    async def add_signature(
        self,
        private_key: CryptoKey,
        signer_id: str,
        options: SignOptions
    ) -> None: ...
    def get_signatures(self) -> list[SignatureEntry]: ...
    async def verify_all(
        self,
        key_resolver: MultiSigKeyResolver
    ) -> MultiSigVerificationResult: ...

add_signature

Add signature to multi-sig credential.
async def add_signature(
    credential: MultiSigCredential,
    private_key: CryptoKey,
    signer_id: str,
    options: SignOptions
) -> None

verify_multi_signatures

Verify all signatures on a multi-sig credential.
async def verify_multi_signatures(
    credential: MultiSigCredential,
    key_resolver: MultiSigKeyResolver
) -> MultiSigVerificationResult

Audit Logging

AuditLogger

Audit logger with pluggable handlers.
class AuditLogger:
    def __init__(self): ...
    def add_handler(self, handler: AuditHandler) -> None: ...
    def log(self, event: AuditEvent) -> None: ...

AuditEvent

Audit event structure.
@dataclass
class AuditEvent:
    type: AuditEventType
    timestamp: datetime
    credential_id: str | None
    action: str
    actor: str | None
    details: dict

AuditEventType

Audit event types.
class AuditEventType(Enum):
    CREDENTIAL_ISSUED = "credential_issued"
    CREDENTIAL_VERIFIED = "credential_verified"
    CREDENTIAL_REVOKED = "credential_revoked"
    SIGNATURE_CREATED = "signature_created"
    SIGNATURE_VERIFIED = "signature_verified"
    TRUST_CHAIN_VERIFIED = "trust_chain_verified"
    POLICY_VIOLATION = "policy_violation"

ConsoleAuditHandler

Log audit events to console.
class ConsoleAuditHandler(AuditHandler):
    def handle(self, event: AuditEvent) -> None: ...

FileAuditHandler

Log audit events to file.
class FileAuditHandler(AuditHandler):
    def __init__(self, path: str): ...
    def handle(self, event: AuditEvent) -> None: ...

InMemoryAuditHandler

Store audit events in memory.
class InMemoryAuditHandler(AuditHandler):
    events: list[AuditEvent]
    def handle(self, event: AuditEvent) -> None: ...
    def clear(self) -> None: ...

Cloud Signers

LocalSigner

Local key signer.
class LocalSigner(CloudSigner):
    def __init__(
        self,
        private_key: CryptoKey,
        algorithm: Literal["EdDSA", "ES256"]
    ): ...
    async def sign(self, data: bytes) -> bytes: ...
    def get_info(self) -> SignerInfo: ...

CloudSigner

Abstract base class for cloud signers.
class CloudSigner(ABC):
    @abstractmethod
    async def sign(self, data: bytes) -> bytes: ...
    @abstractmethod
    def get_info(self) -> SignerInfo: ...

Selective Disclosure (SD-JWT)

create_sd_jwt

Create a selective disclosure JWT.
async def create_sd_jwt(
    credential: dict,
    private_key: CryptoKey,
    options: CreateSdJwtOptions
) -> SDJwt

create_presentation

Create a presentation from SD-JWT with selected disclosures.
def create_presentation(
    sd_jwt: SDJwt,
    disclosed_paths: list[str]
) -> str

verify_sd_jwt

Verify an SD-JWT presentation.
async def verify_sd_jwt(
    presentation: str,
    options: VerifySdJwtOptions
) -> SdJwtVerificationResult

create_disclosure

Create a disclosure for a claim.
def create_disclosure(
    salt: str,
    claim_name: str,
    claim_value: Any
) -> Disclosure

decode_disclosure

Decode a disclosure from base64url.
def decode_disclosure(encoded: str) -> Disclosure

Error Classes

BelticError

Base error class.
class BelticError(Exception):
    code: str
    message: str

ValidationError

Validation error with detailed issues.
class ValidationError(BelticError):
    issues: list[ValidationErrorDetail]
    
    def format(self) -> str: ...
    def by_path(self) -> dict[str, list[ValidationErrorDetail]]: ...

SignatureError

Signature verification error.
class SignatureError(BelticError):
    step: Literal["PARSE", "KEY_RESOLUTION", "SIGNATURE", "CLAIMS", "SCHEMA"]
    code: str

TrustChainError

Trust chain verification error.
class TrustChainError(BelticError):
    step: str
    agent_credential: dict | None
    developer_credential: dict | None

PolicyViolationError

Policy violation error.
class PolicyViolationError(BelticError):
    violations: list[PolicyViolation]
    
    def has_type(self, violation_type: str) -> bool: ...

Types

SignOptions

@dataclass
class SignOptions:
    alg: Literal["EdDSA", "ES256"]
    issuer_did: str
    subject_did: str
    key_id: str | None = None
    audience: str | None = None

VerifyOptions

@dataclass
class VerifyOptions:
    key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
    expected_issuer: str | None = None
    expected_audience: str | None = None
    allowed_algorithms: list[str] | None = None

TrustChainOptions

@dataclass
class TrustChainOptions:
    key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
    fetch_developer_credential: Callable[[str], Awaitable[str]]
    check_status: Callable[[StatusCheckInput], Awaitable[CredentialStatus]] | None = None
    policy: TrustPolicy | None = None

TrustPolicy

@dataclass
class TrustPolicy:
    min_kyb_tier: str | None = None
    min_prompt_injection_score: int | None = None
    min_pii_leakage_score: int | None = None
    min_tool_abuse_score: int | None = None
    min_harm_refusal_score: int | None = None
    min_verification_level: str | None = None
    prohibited_data_categories: list[str] | None = None

HttpRequestInfo

@dataclass
class HttpRequestInfo:
    method: str
    url: str
    headers: dict[str, str] = field(default_factory=dict)
    body: bytes | None = None

HttpSignOptions

@dataclass
class HttpSignOptions:
    private_key: CryptoKey
    key_id: str
    key_directory_url: str
    components: list[str] | None = None
    expires_in: int = 60

See Also