"""Append-only signed sigchain (Spec §8) and on-disk storage.""" from __future__ import annotations import hashlib import os from pathlib import Path from typing import Any from . import encodings from .envelope import verify_sigchain_event from .identity import Identity from .jcs import canonical_bytes class SigchainError(Exception): pass def event_hash(event: dict[str, Any]) -> str: """``sha256:`` of the JCS bytes of the entire signed envelope.""" digest = hashlib.sha256(canonical_bytes(event)).hexdigest() return f"sha256:{digest}" class Sigchain: def __init__(self, primary: Identity) -> None: self._primary = primary self._events: list[dict[str, Any]] = [] @property def primary(self) -> Identity: return self._primary def events(self) -> list[dict[str, Any]]: return self._events def __len__(self) -> int: return len(self._events) def is_empty(self) -> bool: return not self._events def next_seq(self) -> int: return len(self._events) def head_hash(self) -> str | None: if not self._events: return None return event_hash(self._events[-1]) def append(self, event: dict[str, Any]) -> None: if event.get("kez") != "sigchain_event": raise SigchainError(f"wrong envelope tag: {event.get('kez')!r}") payload = event["payload"] if payload["primary"] != str(self._primary): raise SigchainError("event primary does not match chain primary") expected_seq = self.next_seq() if payload["seq"] != expected_seq: raise SigchainError(f"seq mismatch: expected {expected_seq}, got {payload['seq']}") expected_prev = self.head_hash() if payload.get("prev") != expected_prev: raise SigchainError("prev hash mismatch") verify_sigchain_event(event) self._events.append(event) # ── serialization ── def to_jsonl(self) -> str: return encodings.chain_to_jsonl(self._events) def to_compact_bundle(self) -> str: return encodings.chain_to_compact_bundle(self._events) @classmethod def from_jsonl(cls, primary: Identity, text: str) -> "Sigchain": chain = cls(primary) for event in encodings.chain_from_jsonl(text): chain.append(event) return chain def subject_of(event: dict[str, Any]) -> str | None: op_payload = event.get("payload", {}).get("payload", {}) subject = op_payload.get("subject") return subject if isinstance(subject, str) else None # ── storage ───────────────────────────────────────────────────────────────── def sigchain_dir() -> Path: return Path(os.path.expanduser("~")) / ".kez" / "sigchains" def sigchain_path(primary: Identity) -> Path: d = sigchain_dir() d.mkdir(parents=True, exist_ok=True) safe = str(primary).replace(":", "_") return d / f"{safe}.jsonl" def load_chain(primary: Identity) -> Sigchain: path = sigchain_path(primary) if not path.exists(): return Sigchain(primary) return Sigchain.from_jsonl(primary, path.read_text(encoding="utf-8")) def save_chain(chain: Sigchain) -> None: path = sigchain_path(chain.primary) path.write_text(chain.to_jsonl(), encoding="utf-8")