"""KEZ command-line interface (Python implementation). Mirrors the Rust and Node CLIs command-for-command and byte-for-byte in its output, so the cross-implementation interop suite (../crosstest.sh) passes in every direction. """ from __future__ import annotations import argparse import sys from pathlib import Path from . import encodings, sigchain from .channels import parse_proof from .envelope import ( new_add_payload, new_claim_payload, new_revoke_payload, sign_claim, sign_sigchain_event, verify_claim, ) from .identity import Identity from .keys import Ed25519Secret, NostrSecret, signer_from_flags from .mnemonic import ( ed25519_from_mnemonic, generate_ed25519_with_mnemonic, generate_mnemonic, ) def _eprint(msg: str) -> None: print(msg, file=sys.stderr) def write_or_print(out: str | None, output: str) -> None: if out is not None: Path(out).write_text(output, encoding="utf-8") return # Match Rust/Node: avoid double newlines if output already ends in one. if output.endswith("\n"): sys.stdout.write(output) else: print(output) # ── identity ──────────────────────────────────────────────────────────────── def cmd_identity_new(args: argparse.Namespace) -> int: mnemonic_words = getattr(args, "mnemonic_words", None) if args.key_type == "nostr": if mnemonic_words is not None: raise ValueError("--mnemonic-words is only valid with --key-type ed25519") secret = NostrSecret.generate() print(f"Primary: nostr:{secret.npub()}") print(f"Public: {secret.npub()}") print(f"Secret: {secret.nsec()}") print() print("Store the secret somewhere safe. Anyone with the nsec can sign as this identity.") return 0 # ed25519: default 24 words. words = mnemonic_words if mnemonic_words is not None else 24 if words not in (12, 24): raise ValueError(f"mnemonic word count must be 12 or 24, got {words}") secret, phrase = generate_ed25519_with_mnemonic(words) print(f"Primary: {secret.identity()}") print(f"Public: {secret.pubkey_hex()}") print(f"Secret: {secret.seed_hex()} (32-byte seed)") print(f'Mnemonic ({words} words): "{phrase}"') print() if words == 24: print( "The 24-word phrase and the hex seed are equivalent backups —\n" "either restores this identity. Store at least one safely." ) else: print( "The 12-word phrase is the canonical backup. The hex seed is\n" "derived from it (one-way) — you can't reconstruct the phrase\n" "from the seed. Store the phrase safely." ) return 0 def cmd_identity_mnemonic(args: argparse.Namespace) -> int: words = args.words if args.words is not None else 24 if words not in (12, 24): raise ValueError(f"mnemonic word count must be 12 or 24, got {words}") print(generate_mnemonic(words)) return 0 def cmd_identity_from_mnemonic(args: argparse.Namespace) -> int: phrase = args.phrase if not phrase or not phrase.strip(): raise ValueError("identity from-mnemonic needs the phrase in quotes") secret = ed25519_from_mnemonic(phrase) word_count = len(phrase.split()) print(f"Primary: {secret.identity()}") print(f"Public: {secret.pubkey_hex()}") print(f"Secret: {secret.seed_hex()} (32-byte seed)") print(f'Mnemonic ({word_count} words): "{phrase.strip()}"') if word_count == 24: # Confirm canonical round-trip; flag if not. from .mnemonic import mnemonic_from_seed_24 derived = mnemonic_from_seed_24(bytes.fromhex(secret.seed_hex())) if derived.strip() != phrase.strip(): print(f'(note: canonical form is "{derived}")') return 0 # ── claim ───────────────────────────────────────────────────────────────────── def _signer(args: argparse.Namespace): return signer_from_flags( args.nsec, args.ed25519_seed, getattr(args, "mnemonic", None), ) def _build_claim(subject: str, args: argparse.Namespace): signer = _signer(args) primary = signer.identity() payload = new_claim_payload(Identity.parse(subject), primary) return sign_claim(payload, signer) def cmd_claim_create(args: argparse.Namespace) -> int: signed = _build_claim(args.subject, args) if args.format == "markdown": output = encodings.to_markdown(signed) elif args.format == "compact": output = encodings.to_compact(signed) else: output = encodings.to_pretty_json(signed) write_or_print(args.out, output) return 0 def cmd_claim_dns(args: argparse.Namespace) -> int: domain = args.domain if args.domain.startswith("dns:") else f"dns:{args.domain}" signed = _build_claim(domain, args) name = encodings.dns_txt_name(signed["payload"]["subject"]) value = encodings.to_compact(signed) print(f"Name: {name}") print(f"Value: {value}") print() print("Zone file:") print(f"{name} TXT {encodings.quote_dns_txt_value(value)}") return 0 # ── verify ──────────────────────────────────────────────────────────────────── def _print_status(status: dict) -> None: print(f"Primary: {status['primary']}") print() print("Verified identities:") for identity in status["verified"]: print(f"- {identity}") print() print(f"Status: {status['status']}") print(f"Confidence: {status['confidence']}") def cmd_verify_file(args: argparse.Namespace) -> int: raw = Path(args.path).read_text(encoding="utf-8") proof = parse_proof(raw) status = verify_claim(proof) _print_status(status) return 0 def cmd_verify_id(args: argparse.Namespace) -> int: _eprint( "verify id requires network channel resolution, which is not implemented " "in the Python CLI; use 'verify file' instead." ) return 2 # ── sigchain ────────────────────────────────────────────────────────────────── def _resolve_primary_readonly(args: argparse.Namespace) -> Identity: if getattr(args, "primary", None): return Identity.parse(args.primary) signer = _signer(args) return signer.identity() def cmd_sigchain_add(args: argparse.Namespace) -> int: signer = _signer(args) primary = signer.identity() chain = sigchain.load_chain(primary) payload = new_add_payload( primary, chain.next_seq(), chain.head_hash(), Identity.parse(args.subject), args.proof_url, ) event = sign_sigchain_event(payload, signer) chain.append(event) sigchain.save_chain(chain) print( f"Appended add {args.subject} at seq {payload['seq']} " f"(head hash: {chain.head_hash()})" ) print(f"Chain saved to {sigchain.sigchain_path(primary)}") return 0 def cmd_sigchain_revoke(args: argparse.Namespace) -> int: signer = _signer(args) primary = signer.identity() chain = sigchain.load_chain(primary) payload = new_revoke_payload( primary, chain.next_seq(), chain.head_hash(), Identity.parse(args.subject), ) event = sign_sigchain_event(payload, signer) chain.append(event) sigchain.save_chain(chain) print( f"Appended revoke {args.subject} at seq {payload['seq']} " f"(head hash: {chain.head_hash()})" ) print(f"Chain saved to {sigchain.sigchain_path(primary)}") return 0 def cmd_sigchain_show(args: argparse.Namespace) -> int: primary = _resolve_primary_readonly(args) chain = sigchain.load_chain(primary) print(f"Primary: {primary}") print(f"Path: {sigchain.sigchain_path(primary)}") print(f"Length: {len(chain)} event(s)") print() for i, event in enumerate(chain.events()): subject = sigchain.subject_of(event) or "" op = event["payload"]["op"] seq = event["payload"]["seq"] print(f" [{i}] seq={seq} op={op:<6} subject={subject}") if not chain.is_empty(): print() print(f"Head hash: {chain.head_hash()}") return 0 def cmd_sigchain_export(args: argparse.Namespace) -> int: primary = _resolve_primary_readonly(args) chain = sigchain.load_chain(primary) if chain.is_empty(): _eprint(f"no chain found for {primary}") return 1 if args.format == "compact": output = chain.to_compact_bundle() else: output = chain.to_jsonl() write_or_print(args.out, output) return 0 # ── argument parsing ────────────────────────────────────────────────────────── def _add_key_flags(p: argparse.ArgumentParser) -> None: p.add_argument("--nsec") p.add_argument("--ed25519-seed", dest="ed25519_seed") p.add_argument("--mnemonic") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="kez", description="KEZ portable identity CLI") sub = parser.add_subparsers(dest="command", required=True) # identity p_identity = sub.add_parser("identity", help="key management") identity_sub = p_identity.add_subparsers(dest="identity_command", required=True) p_new = identity_sub.add_parser("new", help="generate a new identity") p_new.add_argument("--key-type", dest="key_type", choices=["nostr", "ed25519"], default="nostr") p_new.add_argument( "--mnemonic-words", dest="mnemonic_words", type=int, default=None, help="(ed25519 only) generate from a 12- or 24-word BIP-39 phrase", ) p_new.set_defaults(func=cmd_identity_new) p_mn = identity_sub.add_parser( "mnemonic", help="print a fresh BIP-39 phrase without deriving a key" ) p_mn.add_argument("--words", type=int, default=None) p_mn.set_defaults(func=cmd_identity_mnemonic) p_fm = identity_sub.add_parser( "from-mnemonic", help="derive an Ed25519 identity from a BIP-39 phrase" ) p_fm.add_argument("phrase") p_fm.set_defaults(func=cmd_identity_from_mnemonic) # claim p_claim = sub.add_parser("claim", help="create claims") claim_sub = p_claim.add_subparsers(dest="claim_command", required=True) p_create = claim_sub.add_parser("create", help="create a signed claim") p_create.add_argument("subject") _add_key_flags(p_create) p_create.add_argument("--format", choices=["json", "compact", "markdown"], default="json") p_create.add_argument("--out") p_create.set_defaults(func=cmd_claim_create) p_dns = claim_sub.add_parser("dns", help="create a DNS-zone proof for a domain") p_dns.add_argument("domain") _add_key_flags(p_dns) p_dns.set_defaults(func=cmd_claim_dns) # verify p_verify = sub.add_parser("verify", help="verify proofs") verify_sub = p_verify.add_subparsers(dest="verify_command", required=True) p_vfile = verify_sub.add_parser("file", help="verify a proof file") p_vfile.add_argument("path") p_vfile.set_defaults(func=cmd_verify_file) p_vid = verify_sub.add_parser("id", help="verify an identifier via its channels") p_vid.add_argument("identifier") p_vid.set_defaults(func=cmd_verify_id) # sigchain p_sig = sub.add_parser("sigchain", help="manage a sigchain") sig_sub = p_sig.add_subparsers(dest="sigchain_command", required=True) p_add = sig_sub.add_parser("add", help="append an add event") p_add.add_argument("subject") _add_key_flags(p_add) p_add.add_argument("--proof-url", dest="proof_url") p_add.set_defaults(func=cmd_sigchain_add) p_revoke = sig_sub.add_parser("revoke", help="append a revoke event") p_revoke.add_argument("subject") _add_key_flags(p_revoke) p_revoke.set_defaults(func=cmd_sigchain_revoke) p_show = sig_sub.add_parser("show", help="show a sigchain") p_show.add_argument("--primary") _add_key_flags(p_show) p_show.set_defaults(func=cmd_sigchain_show) p_export = sig_sub.add_parser("export", help="export a sigchain") p_export.add_argument("--primary") _add_key_flags(p_export) p_export.add_argument("--format", choices=["jsonl", "compact"], default="jsonl") p_export.add_argument("--out") p_export.set_defaults(func=cmd_sigchain_export) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: return args.func(args) except Exception as exc: # noqa: BLE001 — top-level CLI error boundary _eprint(f"error: {exc}") return 1 if __name__ == "__main__": sys.exit(main())