Kez/python/kez/sigchain.py
Jason Tudisco b1240c13e5 Add Python implementation and cross-test interop
Add a Python port of the KEZ CLI under python/, mirroring the Rust and
Node implementations command-for-command and byte-for-byte:

- Pure-Python JCS (RFC 8785), BIP-340 Schnorr, and Bech32; cryptography
  for Ed25519 and zstandard for the compact zstd framing.
- Full CLI: identity new, claim create/dns, verify file, and
  sigchain add/revoke/show/export.

Wire Python into crosstest.sh with 35 new scenarios covering Python
against both Rust and Node, in every direction, across all wire formats,
both key types, DNS proofs, and sigchains (incl. JSONL byte parity).
All 55 scenarios pass.

Update root README and .gitignore for the new implementation.
2026-06-01 13:29:45 -06:00

113 lines
3.4 KiB
Python

"""Append-only signed sigchain (Spec §8) and on-disk storage."""
from __future__ import annotations
import hashlib
import os
from pathlib import Path
from typing import Any
from . import encodings
from .envelope import verify_sigchain_event
from .identity import Identity
from .jcs import canonical_bytes
class SigchainError(Exception):
pass
def event_hash(event: dict[str, Any]) -> str:
"""``sha256:<hex>`` of the JCS bytes of the entire signed envelope."""
digest = hashlib.sha256(canonical_bytes(event)).hexdigest()
return f"sha256:{digest}"
class Sigchain:
def __init__(self, primary: Identity) -> None:
self._primary = primary
self._events: list[dict[str, Any]] = []
@property
def primary(self) -> Identity:
return self._primary
def events(self) -> list[dict[str, Any]]:
return self._events
def __len__(self) -> int:
return len(self._events)
def is_empty(self) -> bool:
return not self._events
def next_seq(self) -> int:
return len(self._events)
def head_hash(self) -> str | None:
if not self._events:
return None
return event_hash(self._events[-1])
def append(self, event: dict[str, Any]) -> None:
if event.get("kez") != "sigchain_event":
raise SigchainError(f"wrong envelope tag: {event.get('kez')!r}")
payload = event["payload"]
if payload["primary"] != str(self._primary):
raise SigchainError("event primary does not match chain primary")
expected_seq = self.next_seq()
if payload["seq"] != expected_seq:
raise SigchainError(f"seq mismatch: expected {expected_seq}, got {payload['seq']}")
expected_prev = self.head_hash()
if payload.get("prev") != expected_prev:
raise SigchainError("prev hash mismatch")
verify_sigchain_event(event)
self._events.append(event)
# ── serialization ──
def to_jsonl(self) -> str:
return encodings.chain_to_jsonl(self._events)
def to_compact_bundle(self) -> str:
return encodings.chain_to_compact_bundle(self._events)
@classmethod
def from_jsonl(cls, primary: Identity, text: str) -> "Sigchain":
chain = cls(primary)
for event in encodings.chain_from_jsonl(text):
chain.append(event)
return chain
def subject_of(event: dict[str, Any]) -> str | None:
op_payload = event.get("payload", {}).get("payload", {})
subject = op_payload.get("subject")
return subject if isinstance(subject, str) else None
# ── storage ─────────────────────────────────────────────────────────────────
def sigchain_dir() -> Path:
return Path(os.path.expanduser("~")) / ".kez" / "sigchains"
def sigchain_path(primary: Identity) -> Path:
d = sigchain_dir()
d.mkdir(parents=True, exist_ok=True)
safe = str(primary).replace(":", "_")
return d / f"{safe}.jsonl"
def load_chain(primary: Identity) -> Sigchain:
path = sigchain_path(primary)
if not path.exists():
return Sigchain(primary)
return Sigchain.from_jsonl(primary, path.read_text(encoding="utf-8"))
def save_chain(chain: Sigchain) -> None:
path = sigchain_path(chain.primary)
path.write_text(chain.to_jsonl(), encoding="utf-8")