Kez/python/kez/envelope.py
Jason Tudisco b1240c13e5 Add Python implementation and cross-test interop
Add a Python port of the KEZ CLI under python/, mirroring the Rust and
Node implementations command-for-command and byte-for-byte:

- Pure-Python JCS (RFC 8785), BIP-340 Schnorr, and Bech32; cryptography
  for Ed25519 and zstandard for the compact zstd framing.
- Full CLI: identity new, claim create/dns, verify file, and
  sigchain add/revoke/show/export.

Wire Python into crosstest.sh with 35 new scenarios covering Python
against both Rust and Node, in every direction, across all wire formats,
both key types, DNS proofs, and sigchains (incl. JSONL byte parity).
All 55 scenarios pass.

Update root README and .gitignore for the new implementation.
2026-06-01 13:29:45 -06:00

155 lines
4.6 KiB
Python

"""Signature envelopes, claim payloads and verification (Spec §4, §5)."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from .identity import Identity
from .keys import verify_signature
CLAIM_TYPE = "kez.claim"
SIGCHAIN_EVENT_TYPE = "kez.sigchain.event"
FORMAT_VERSION = 1
COMPACT_PROOF_PREFIX = "kez:z1:"
COMPACT_CHAIN_PREFIX = "kez:zc1:"
class VerificationError(Exception):
pass
def rfc3339_utc(dt: datetime | None = None) -> str:
"""RFC 3339 UTC timestamp with microsecond precision and a trailing ``Z``."""
if dt is None:
dt = datetime.now(timezone.utc)
dt = dt.astimezone(timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z"
def new_claim_payload(
subject: Identity,
primary: Identity,
created_at: str | None = None,
) -> dict[str, Any]:
return {
"type": CLAIM_TYPE,
"version": FORMAT_VERSION,
"subject": str(subject),
"primary": str(primary),
"created_at": created_at or rfc3339_utc(),
}
def sign_claim(payload: dict[str, Any], signer) -> dict[str, Any]:
key = signer.identity()
if payload["primary"] != str(key):
raise VerificationError(
f"claim primary {payload['primary']!r} does not match signing key {key}"
)
return {
"kez": "claim",
"payload": payload,
"signature": {
"alg": signer.alg(),
"key": str(key),
"sig": signer.sign_payload(payload),
},
}
def verify_claim(envelope: dict[str, Any]) -> dict[str, Any]:
"""Verify a claim envelope; return a status dict on success, else raise."""
if envelope.get("kez") != "claim":
raise VerificationError(f"not a claim envelope: kez={envelope.get('kez')!r}")
payload = envelope["payload"]
signature = envelope["signature"]
if signature["key"] != payload["primary"]:
raise VerificationError("signature.key does not match payload.primary")
primary = Identity.parse(payload["primary"])
key = Identity.parse(signature["key"])
if not verify_signature(payload, signature["alg"], key, signature["sig"]):
raise VerificationError(f"signature did not verify (alg={signature['alg']})")
subject = Identity.parse(payload["subject"])
return {
"primary": primary,
"verified": [subject],
"status": "valid",
"confidence": "strong",
}
# ── Sigchain event payloads ─────────────────────────────────────────────────
def _event_payload(
primary: Identity,
seq: int,
prev: str | None,
op: str,
op_payload: dict[str, Any],
created_at: str | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"type": SIGCHAIN_EVENT_TYPE,
"version": FORMAT_VERSION,
"primary": str(primary),
"seq": seq,
}
if prev is not None:
payload["prev"] = prev
payload["created_at"] = created_at or rfc3339_utc()
payload["op"] = op
payload["payload"] = op_payload
return payload
def new_add_payload(
primary: Identity,
seq: int,
prev: str | None,
subject: Identity,
proof_url: str | None = None,
created_at: str | None = None,
) -> dict[str, Any]:
op_payload: dict[str, Any] = {"subject": str(subject)}
if proof_url:
op_payload["proof_url"] = proof_url
return _event_payload(primary, seq, prev, "add", op_payload, created_at)
def new_revoke_payload(
primary: Identity,
seq: int,
prev: str | None,
subject: Identity,
created_at: str | None = None,
) -> dict[str, Any]:
return _event_payload(primary, seq, prev, "revoke", {"subject": str(subject)}, created_at)
def sign_sigchain_event(payload: dict[str, Any], signer) -> dict[str, Any]:
return {
"kez": "sigchain_event",
"payload": payload,
"signature": {
"alg": signer.alg(),
"key": str(signer.identity()),
"sig": signer.sign_payload(payload),
},
}
def verify_sigchain_event(event: dict[str, Any]) -> None:
if event.get("kez") != "sigchain_event":
raise VerificationError(f"wrong envelope tag: {event.get('kez')!r}")
payload = event["payload"]
signature = event["signature"]
if signature["key"] != payload["primary"]:
raise VerificationError("signature.key does not match payload.primary")
key = Identity.parse(signature["key"])
if not verify_signature(payload, signature["alg"], key, signature["sig"]):
raise VerificationError("sigchain event signature did not verify")