"""Signature envelopes, claim payloads and verification (Spec §4, §5).""" from __future__ import annotations from datetime import datetime, timezone from typing import Any from .identity import Identity from .keys import verify_signature CLAIM_TYPE = "kez.claim" SIGCHAIN_EVENT_TYPE = "kez.sigchain.event" FORMAT_VERSION = 1 COMPACT_PROOF_PREFIX = "kez:z1:" COMPACT_CHAIN_PREFIX = "kez:zc1:" class VerificationError(Exception): pass def rfc3339_utc(dt: datetime | None = None) -> str: """RFC 3339 UTC timestamp with microsecond precision and a trailing ``Z``.""" if dt is None: dt = datetime.now(timezone.utc) dt = dt.astimezone(timezone.utc) return dt.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" def new_claim_payload( subject: Identity, primary: Identity, created_at: str | None = None, ) -> dict[str, Any]: return { "type": CLAIM_TYPE, "version": FORMAT_VERSION, "subject": str(subject), "primary": str(primary), "created_at": created_at or rfc3339_utc(), } def sign_claim(payload: dict[str, Any], signer) -> dict[str, Any]: key = signer.identity() if payload["primary"] != str(key): raise VerificationError( f"claim primary {payload['primary']!r} does not match signing key {key}" ) return { "kez": "claim", "payload": payload, "signature": { "alg": signer.alg(), "key": str(key), "sig": signer.sign_payload(payload), }, } def verify_claim(envelope: dict[str, Any]) -> dict[str, Any]: """Verify a claim envelope; return a status dict on success, else raise.""" if envelope.get("kez") != "claim": raise VerificationError(f"not a claim envelope: kez={envelope.get('kez')!r}") payload = envelope["payload"] signature = envelope["signature"] if signature["key"] != payload["primary"]: raise VerificationError("signature.key does not match payload.primary") primary = Identity.parse(payload["primary"]) key = Identity.parse(signature["key"]) if not verify_signature(payload, signature["alg"], key, signature["sig"]): raise VerificationError(f"signature did not verify (alg={signature['alg']})") subject = Identity.parse(payload["subject"]) return { "primary": primary, "verified": [subject], "status": "valid", "confidence": "strong", } # ── Sigchain event payloads ───────────────────────────────────────────────── def _event_payload( primary: Identity, seq: int, prev: str | None, op: str, op_payload: dict[str, Any], created_at: str | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = { "type": SIGCHAIN_EVENT_TYPE, "version": FORMAT_VERSION, "primary": str(primary), "seq": seq, } if prev is not None: payload["prev"] = prev payload["created_at"] = created_at or rfc3339_utc() payload["op"] = op payload["payload"] = op_payload return payload def new_add_payload( primary: Identity, seq: int, prev: str | None, subject: Identity, proof_url: str | None = None, created_at: str | None = None, ) -> dict[str, Any]: op_payload: dict[str, Any] = {"subject": str(subject)} if proof_url: op_payload["proof_url"] = proof_url return _event_payload(primary, seq, prev, "add", op_payload, created_at) def new_revoke_payload( primary: Identity, seq: int, prev: str | None, subject: Identity, created_at: str | None = None, ) -> dict[str, Any]: return _event_payload(primary, seq, prev, "revoke", {"subject": str(subject)}, created_at) def sign_sigchain_event(payload: dict[str, Any], signer) -> dict[str, Any]: return { "kez": "sigchain_event", "payload": payload, "signature": { "alg": signer.alg(), "key": str(signer.identity()), "sig": signer.sign_payload(payload), }, } def verify_sigchain_event(event: dict[str, Any]) -> None: if event.get("kez") != "sigchain_event": raise VerificationError(f"wrong envelope tag: {event.get('kez')!r}") payload = event["payload"] signature = event["signature"] if signature["key"] != payload["primary"]: raise VerificationError("signature.key does not match payload.primary") key = Identity.parse(signature["key"]) if not verify_signature(payload, signature["alg"], key, signature["sig"]): raise VerificationError("sigchain event signature did not verify")