Kez/python/kez/cli.py
Jason Tudisco b0cc1a74a0 feat(python,crosstest): mirror BIP-39 mnemonic to Python + add interop scenarios
Completes the three-way BIP-39 mnemonic surface (Rust + Node landed in
0058d9b) and pins down byte-for-byte agreement with crosstest scenarios.

Python (mirrors rust/crates/kez-core/src/mnemonic.rs + nodejs's mnemonic.ts):
  • python/kez/mnemonic.py — generate_mnemonic, seed_from_mnemonic,
    mnemonic_from_seed_24, ed25519_from_mnemonic,
    generate_ed25519_with_mnemonic. Same 24-word-bijection / 12-word-
    SHA-256-domain-tagged semantics. Uses Trezor's `mnemonic` library
    (v0.21) for the BIP-39 wordlist + entropy parsing; deliberately does
    NOT use BIP-39's PBKDF2 to_seed function.
  • python/kez/keys.py — Ed25519Secret.from_mnemonic() +
    generate_with_mnemonic() classmethods; signer_from_flags widened to
    accept --mnemonic.
  • python/kez/cli.py — identity new --mnemonic-words, identity
    mnemonic [--words], identity from-mnemonic; --mnemonic flag on
    claim create/dns and sigchain add/revoke/show/export. Output format
    matches Rust + Node verbatim so the crosstest harness can grep
    Primary/Public/Secret/Mnemonic lines.
  • python/tests/test_mnemonic.py — 19 tests covering all three
    canonical vectors (exact-match Secret + Public hex), round-trip,
    determinism, whitespace tolerance, bad-checksum, bad-word-count,
    the literal domain-tag bytes, and the 12-vs-24 entropy-overlap
    non-collision case.

Note: --mnemonic is NOT added to `sigchain publish` because that
subcommand doesn't exist in the Python CLI yet (rust + node only). When
the publish surface is ported, --mnemonic should follow it the same way.

Ground truth — python/MNEMONIC-TEST-VECTORS.md:
  V1: 24-word zero-entropy phrase ("abandon… art")
      seed   = 0000…0000
      pubkey = 3b6a27bcceb6a42d62a3a8d02a6f0d73653215771de243a63ac048a18b59da29
  V2: 12-word zero-entropy phrase ("abandon… about")
      seed   = 09451c0f06588db78205e32a793536e15ae263c8f9ee6d14f5c6fd82b8bd20da
      pubkey = 9403c32e0d3b4ce51105c0bcac09a0d73be0cca98a6bf7b3cd434651be866d70
  V3: 12-word "legal winner thank year wave sausage worth useful legal winner thank yellow"
      seed   = 9df434a2bd5dc767ee949d8ab95ca09c4ebbb88cefc3d0b1523f6b2a744ca824
      pubkey = cc99d06b15ccb83a5ca43f25dd3d27f50638c1c6fbe3a822352da3e07156ce03

  The domain tag for the 12-word derivation is exactly the 15 ASCII
  bytes of "kez-bip39-12-v1", documented in the spec doc.

crosstest.sh — new "BIP-39 mnemonic interop" section:
  • Vector match: each impl × each vector × Public hex == expected (9
    scenarios). Catches any silent derivation drift.
  • Cross-impl claim signing via --mnemonic: every signer ↔ verifier
    pair (rust↔node, rust↔py, node↔py), every format (json/compact/
    markdown). 6 pairings × 3 formats = 18 scenarios.
  • Bijection sanity: the 24-word phrase printed by `identity from-
    mnemonic` round-trips to itself byte-for-byte (rust + node).
  • Python-involving scenarios auto-skip if `python/.venv/bin/python
    kez_cli.py identity from-mnemonic` returns non-zero, so the harness
    stays runnable on machines where Python isn't set up.

Verified end-to-end: `bash crosstest.sh` reports
  "All 84 scenarios passed."

Test totals across implementations:
  Rust:   114 (9 mnemonic-specific in kez-core)
  Node:    99 (8 mnemonic-specific in @kez/core)
  Python:  19 (mnemonic only; was no test suite before)
  Crosstest: 84 scenarios end-to-end

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-05 17:50:34 -06:00

380 lines
13 KiB
Python

"""KEZ command-line interface (Python implementation).
Mirrors the Rust and Node CLIs command-for-command and byte-for-byte in its
output, so the cross-implementation interop suite (../crosstest.sh) passes in
every direction.
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from . import encodings, sigchain
from .channels import parse_proof
from .envelope import (
new_add_payload,
new_claim_payload,
new_revoke_payload,
sign_claim,
sign_sigchain_event,
verify_claim,
)
from .identity import Identity
from .keys import Ed25519Secret, NostrSecret, signer_from_flags
from .mnemonic import (
ed25519_from_mnemonic,
generate_ed25519_with_mnemonic,
generate_mnemonic,
)
def _eprint(msg: str) -> None:
print(msg, file=sys.stderr)
def write_or_print(out: str | None, output: str) -> None:
if out is not None:
Path(out).write_text(output, encoding="utf-8")
return
# Match Rust/Node: avoid double newlines if output already ends in one.
if output.endswith("\n"):
sys.stdout.write(output)
else:
print(output)
# ── identity ────────────────────────────────────────────────────────────────
def cmd_identity_new(args: argparse.Namespace) -> int:
mnemonic_words = getattr(args, "mnemonic_words", None)
if args.key_type == "nostr":
if mnemonic_words is not None:
raise ValueError("--mnemonic-words is only valid with --key-type ed25519")
secret = NostrSecret.generate()
print(f"Primary: nostr:{secret.npub()}")
print(f"Public: {secret.npub()}")
print(f"Secret: {secret.nsec()}")
print()
print("Store the secret somewhere safe. Anyone with the nsec can sign as this identity.")
return 0
# ed25519: default 24 words.
words = mnemonic_words if mnemonic_words is not None else 24
if words not in (12, 24):
raise ValueError(f"mnemonic word count must be 12 or 24, got {words}")
secret, phrase = generate_ed25519_with_mnemonic(words)
print(f"Primary: {secret.identity()}")
print(f"Public: {secret.pubkey_hex()}")
print(f"Secret: {secret.seed_hex()} (32-byte seed)")
print(f'Mnemonic ({words} words): "{phrase}"')
print()
if words == 24:
print(
"The 24-word phrase and the hex seed are equivalent backups —\n"
"either restores this identity. Store at least one safely."
)
else:
print(
"The 12-word phrase is the canonical backup. The hex seed is\n"
"derived from it (one-way) — you can't reconstruct the phrase\n"
"from the seed. Store the phrase safely."
)
return 0
def cmd_identity_mnemonic(args: argparse.Namespace) -> int:
words = args.words if args.words is not None else 24
if words not in (12, 24):
raise ValueError(f"mnemonic word count must be 12 or 24, got {words}")
print(generate_mnemonic(words))
return 0
def cmd_identity_from_mnemonic(args: argparse.Namespace) -> int:
phrase = args.phrase
if not phrase or not phrase.strip():
raise ValueError("identity from-mnemonic needs the phrase in quotes")
secret = ed25519_from_mnemonic(phrase)
word_count = len(phrase.split())
print(f"Primary: {secret.identity()}")
print(f"Public: {secret.pubkey_hex()}")
print(f"Secret: {secret.seed_hex()} (32-byte seed)")
print(f'Mnemonic ({word_count} words): "{phrase.strip()}"')
if word_count == 24:
# Confirm canonical round-trip; flag if not.
from .mnemonic import mnemonic_from_seed_24
derived = mnemonic_from_seed_24(bytes.fromhex(secret.seed_hex()))
if derived.strip() != phrase.strip():
print(f'(note: canonical form is "{derived}")')
return 0
# ── claim ─────────────────────────────────────────────────────────────────────
def _signer(args: argparse.Namespace):
return signer_from_flags(
args.nsec,
args.ed25519_seed,
getattr(args, "mnemonic", None),
)
def _build_claim(subject: str, args: argparse.Namespace):
signer = _signer(args)
primary = signer.identity()
payload = new_claim_payload(Identity.parse(subject), primary)
return sign_claim(payload, signer)
def cmd_claim_create(args: argparse.Namespace) -> int:
signed = _build_claim(args.subject, args)
if args.format == "markdown":
output = encodings.to_markdown(signed)
elif args.format == "compact":
output = encodings.to_compact(signed)
else:
output = encodings.to_pretty_json(signed)
write_or_print(args.out, output)
return 0
def cmd_claim_dns(args: argparse.Namespace) -> int:
domain = args.domain if args.domain.startswith("dns:") else f"dns:{args.domain}"
signed = _build_claim(domain, args)
name = encodings.dns_txt_name(signed["payload"]["subject"])
value = encodings.to_compact(signed)
print(f"Name: {name}")
print(f"Value: {value}")
print()
print("Zone file:")
print(f"{name} TXT {encodings.quote_dns_txt_value(value)}")
return 0
# ── verify ────────────────────────────────────────────────────────────────────
def _print_status(status: dict) -> None:
print(f"Primary: {status['primary']}")
print()
print("Verified identities:")
for identity in status["verified"]:
print(f"- {identity}")
print()
print(f"Status: {status['status']}")
print(f"Confidence: {status['confidence']}")
def cmd_verify_file(args: argparse.Namespace) -> int:
raw = Path(args.path).read_text(encoding="utf-8")
proof = parse_proof(raw)
status = verify_claim(proof)
_print_status(status)
return 0
def cmd_verify_id(args: argparse.Namespace) -> int:
_eprint(
"verify id requires network channel resolution, which is not implemented "
"in the Python CLI; use 'verify file' instead."
)
return 2
# ── sigchain ──────────────────────────────────────────────────────────────────
def _resolve_primary_readonly(args: argparse.Namespace) -> Identity:
if getattr(args, "primary", None):
return Identity.parse(args.primary)
signer = _signer(args)
return signer.identity()
def cmd_sigchain_add(args: argparse.Namespace) -> int:
signer = _signer(args)
primary = signer.identity()
chain = sigchain.load_chain(primary)
payload = new_add_payload(
primary,
chain.next_seq(),
chain.head_hash(),
Identity.parse(args.subject),
args.proof_url,
)
event = sign_sigchain_event(payload, signer)
chain.append(event)
sigchain.save_chain(chain)
print(
f"Appended add {args.subject} at seq {payload['seq']} "
f"(head hash: {chain.head_hash()})"
)
print(f"Chain saved to {sigchain.sigchain_path(primary)}")
return 0
def cmd_sigchain_revoke(args: argparse.Namespace) -> int:
signer = _signer(args)
primary = signer.identity()
chain = sigchain.load_chain(primary)
payload = new_revoke_payload(
primary,
chain.next_seq(),
chain.head_hash(),
Identity.parse(args.subject),
)
event = sign_sigchain_event(payload, signer)
chain.append(event)
sigchain.save_chain(chain)
print(
f"Appended revoke {args.subject} at seq {payload['seq']} "
f"(head hash: {chain.head_hash()})"
)
print(f"Chain saved to {sigchain.sigchain_path(primary)}")
return 0
def cmd_sigchain_show(args: argparse.Namespace) -> int:
primary = _resolve_primary_readonly(args)
chain = sigchain.load_chain(primary)
print(f"Primary: {primary}")
print(f"Path: {sigchain.sigchain_path(primary)}")
print(f"Length: {len(chain)} event(s)")
print()
for i, event in enumerate(chain.events()):
subject = sigchain.subject_of(event) or "<no subject>"
op = event["payload"]["op"]
seq = event["payload"]["seq"]
print(f" [{i}] seq={seq} op={op:<6} subject={subject}")
if not chain.is_empty():
print()
print(f"Head hash: {chain.head_hash()}")
return 0
def cmd_sigchain_export(args: argparse.Namespace) -> int:
primary = _resolve_primary_readonly(args)
chain = sigchain.load_chain(primary)
if chain.is_empty():
_eprint(f"no chain found for {primary}")
return 1
if args.format == "compact":
output = chain.to_compact_bundle()
else:
output = chain.to_jsonl()
write_or_print(args.out, output)
return 0
# ── argument parsing ──────────────────────────────────────────────────────────
def _add_key_flags(p: argparse.ArgumentParser) -> None:
p.add_argument("--nsec")
p.add_argument("--ed25519-seed", dest="ed25519_seed")
p.add_argument("--mnemonic")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="kez", description="KEZ portable identity CLI")
sub = parser.add_subparsers(dest="command", required=True)
# identity
p_identity = sub.add_parser("identity", help="key management")
identity_sub = p_identity.add_subparsers(dest="identity_command", required=True)
p_new = identity_sub.add_parser("new", help="generate a new identity")
p_new.add_argument("--key-type", dest="key_type", choices=["nostr", "ed25519"], default="nostr")
p_new.add_argument(
"--mnemonic-words",
dest="mnemonic_words",
type=int,
default=None,
help="(ed25519 only) generate from a 12- or 24-word BIP-39 phrase",
)
p_new.set_defaults(func=cmd_identity_new)
p_mn = identity_sub.add_parser(
"mnemonic", help="print a fresh BIP-39 phrase without deriving a key"
)
p_mn.add_argument("--words", type=int, default=None)
p_mn.set_defaults(func=cmd_identity_mnemonic)
p_fm = identity_sub.add_parser(
"from-mnemonic", help="derive an Ed25519 identity from a BIP-39 phrase"
)
p_fm.add_argument("phrase")
p_fm.set_defaults(func=cmd_identity_from_mnemonic)
# claim
p_claim = sub.add_parser("claim", help="create claims")
claim_sub = p_claim.add_subparsers(dest="claim_command", required=True)
p_create = claim_sub.add_parser("create", help="create a signed claim")
p_create.add_argument("subject")
_add_key_flags(p_create)
p_create.add_argument("--format", choices=["json", "compact", "markdown"], default="json")
p_create.add_argument("--out")
p_create.set_defaults(func=cmd_claim_create)
p_dns = claim_sub.add_parser("dns", help="create a DNS-zone proof for a domain")
p_dns.add_argument("domain")
_add_key_flags(p_dns)
p_dns.set_defaults(func=cmd_claim_dns)
# verify
p_verify = sub.add_parser("verify", help="verify proofs")
verify_sub = p_verify.add_subparsers(dest="verify_command", required=True)
p_vfile = verify_sub.add_parser("file", help="verify a proof file")
p_vfile.add_argument("path")
p_vfile.set_defaults(func=cmd_verify_file)
p_vid = verify_sub.add_parser("id", help="verify an identifier via its channels")
p_vid.add_argument("identifier")
p_vid.set_defaults(func=cmd_verify_id)
# sigchain
p_sig = sub.add_parser("sigchain", help="manage a sigchain")
sig_sub = p_sig.add_subparsers(dest="sigchain_command", required=True)
p_add = sig_sub.add_parser("add", help="append an add event")
p_add.add_argument("subject")
_add_key_flags(p_add)
p_add.add_argument("--proof-url", dest="proof_url")
p_add.set_defaults(func=cmd_sigchain_add)
p_revoke = sig_sub.add_parser("revoke", help="append a revoke event")
p_revoke.add_argument("subject")
_add_key_flags(p_revoke)
p_revoke.set_defaults(func=cmd_sigchain_revoke)
p_show = sig_sub.add_parser("show", help="show a sigchain")
p_show.add_argument("--primary")
_add_key_flags(p_show)
p_show.set_defaults(func=cmd_sigchain_show)
p_export = sig_sub.add_parser("export", help="export a sigchain")
p_export.add_argument("--primary")
_add_key_flags(p_export)
p_export.add_argument("--format", choices=["jsonl", "compact"], default="jsonl")
p_export.add_argument("--out")
p_export.set_defaults(func=cmd_sigchain_export)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
return args.func(args)
except Exception as exc: # noqa: BLE001 — top-level CLI error boundary
_eprint(f"error: {exc}")
return 1
if __name__ == "__main__":
sys.exit(main())