diff --git a/.gitignore b/.gitignore index 7b60514..d1306dc 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,13 @@ npm-debug.log* yarn-debug.log* yarn-error.log* +# Python +.venv/ +__pycache__/ +*.py[cod] +*.egg-info/ +.pytest_cache/ + # Local runtime state *.db *.db-journal diff --git a/README.md b/README.md index 73db254..1e3fc69 100644 --- a/README.md +++ b/README.md @@ -17,13 +17,15 @@ relay event). Anyone can verify the graph without trusting a server. ├── SPEC.md ← The protocol. Language-agnostic, normative. ├── rust/ ← Rust implementation (kez-core, kez-channels, kez-cli) ├── nodejs/ ← TypeScript/Node implementation (same shape, same CLI) +├── python/ ← Python implementation (same shape, same CLI) ├── rust-sig-server/ ← Optional HTTP store for sigchains (axum + SQLite) ├── crosstest.sh ← Interop test: artifacts move between implementations └── README.md ← (this file) ``` -Two parallel implementations. **Wire-compatible**: a claim signed in Rust -verifies in Node and vice versa. The cross-test harness proves it. +Three parallel implementations. **Wire-compatible**: a claim signed in Rust +verifies in Node and Python and vice versa, in every direction. The cross-test +harness proves it. A separate [`rust-sig-server/`](rust-sig-server/) crate provides an optional HTTP storage tier for sigchains — useful when a user doesn't want to set up @@ -41,6 +43,9 @@ Start here: - [**`nodejs/README.md`**](nodejs/README.md) — Node/TypeScript port: same shape as Rust, npm workspaces layout, crypto stack rationale, CLI reference. +- [**`python/README.md`**](python/README.md) — Python port: single + `kez` package, virtualenv setup, crypto stack rationale (pure-Python + BIP-340 Schnorr + `cryptography` for Ed25519), CLI reference. - [**`rust-sig-server/README.md`**](rust-sig-server/README.md) — the optional storage server: API reference, no-auth design + threat model, deployment recipes (bare-metal, Docker, PaaS), and how @@ -67,6 +72,15 @@ npm run cli -- verify id github:jason ``` Full guide: [`nodejs/README.md`](nodejs/README.md). +### Python +```sh +cd python +python3 -m venv .venv +.venv/bin/pip install -r requirements.txt +.venv/bin/python kez_cli.py identity new +``` +Full guide: [`python/README.md`](python/README.md). + ### Sigchain storage server (optional) ```sh cd rust-sig-server @@ -81,27 +95,20 @@ Full guide: [`rust-sig-server/README.md`](rust-sig-server/README.md). ./crosstest.sh ``` -Runs 19 scenarios that swap implementations at the artifact boundary: +Runs 55 scenarios that swap implementations at the artifact boundary: -| # | Scenario | +| # | Scenarios | |---|---| -| 1–2 | nostr-signed JSON claim, both directions | -| 3–4 | nostr-signed compact claim, both directions | -| 5–6 | nostr-signed markdown claim, both directions | -| 7–8 | nostr-signed DNS zone form, both directions | -| 9–10 | ed25519-signed JSON claim, both directions | -| 11–12 | ed25519-signed compact claim, both directions | -| 13–14 | ed25519-signed markdown claim, both directions | -| 15 | rust builds 3-event nostr sigchain → node parses + shows | -| 16 | rust-exported sigchain JSONL == node-exported JSONL (byte-identical) | -| 17 | node builds 3-event nostr sigchain → rust parses + shows | -| 18 | rust builds ed25519 sigchain → node parses + shows | -| 19 | node builds ed25519 sigchain → rust parses + shows | +| 1–14 | Rust ↔ Node: JSON / compact / markdown / DNS claims, nostr + ed25519 | +| 15–20 | Rust ↔ Node sigchains: build in one, parse + show in the other; JSONL byte parity | +| 21–44 | **Python ↔ Rust and Python ↔ Node** claims: every format × key type, both directions | +| — | Python ↔ both peers DNS zone form, both directions | +| — | Python ↔ both peers sigchains: build/show both ways, JSONL byte parity, ed25519 | -If all 19 pass: JCS canonicalization, both signature suites (BIP-340 Schnorr +If all 55 pass: JCS canonicalization, both signature suites (BIP-340 Schnorr and Ed25519), the compact `kez:z1:` zstd+base64url encoding, the Markdown fence, the DNS TXT shape, and the sigchain JSONL bundle format are all -byte-compatible across implementations. +byte-compatible across all three implementations. Pass `-v` for verbose output (echoes intermediate commands and proofs). diff --git a/crosstest.sh b/crosstest.sh index b2ea405..9173d8c 100755 --- a/crosstest.sh +++ b/crosstest.sh @@ -23,6 +23,14 @@ if [[ ! -f "$TSX_LOADER" ]]; then exit 1 fi NODE_CLI=(node --import "$TSX_LOADER" "$ROOT/nodejs/packages/kez-cli/src/cli.ts") +# Python CLI runs inside its own virtualenv so its native deps (cryptography, +# zstandard) are isolated from the system interpreter. +PYTHON_VENV="$ROOT/python/.venv/bin/python" +if [[ ! -x "$PYTHON_VENV" ]]; then + printf "Python venv not found at %s — run 'cd python && python3 -m venv .venv && .venv/bin/pip install -r requirements.txt' first\n" "$PYTHON_VENV" >&2 + exit 1 +fi +PYTHON_CLI=("$PYTHON_VENV" "$ROOT/python/kez_cli.py") VERBOSE=0 [[ "${1:-}" == "-v" ]] && VERBOSE=1 @@ -73,6 +81,35 @@ assert_verify_valid() { fi } +# Dispatch to one of the three implementations by name. Keeps the Python +# interop scenarios below readable without juggling array variables. +run_cli() { + local which="$1"; shift + case "$which" in + rust) "${RUST_CLI[@]}" "$@" ;; + node) "${NODE_CLI[@]}" "$@" ;; + py) "${PYTHON_CLI[@]}" "$@" ;; + *) printf "unknown impl: %s\n" "$which" >&2; return 2 ;; + esac +} + +# Sign a claim with one impl and verify it with another, for a given wire +# format. Remaining args are the signing-key flags (--nsec / --ed25519-seed). +claim_roundtrip() { + local title="$1" signer="$2" verifier="$3" fmt="$4"; shift 4 + scenario "$title" + case "$fmt" in + json) + run_cli "$signer" claim create github:jason "$@" > "$TMP/rt.proof" 2>"$TMP/rt.err" ;; + markdown) + run_cli "$signer" claim create github:jason "$@" --format markdown --out "$TMP/rt.proof" 2>"$TMP/rt.err" ;; + *) + run_cli "$signer" claim create github:jason "$@" --format "$fmt" > "$TMP/rt.proof" 2>"$TMP/rt.err" ;; + esac + run_cli "$verifier" verify file "$TMP/rt.proof" > "$TMP/rt.out" 2>&1 + assert_verify_valid "$title" "$TMP/rt.out" && ok +} + # Pre-flight: build Rust release once (much faster reruns). printf "%sBuilding Rust impl…%s\n" "$YELLOW" "$RESET" cargo build --quiet --manifest-path "$ROOT/rust/Cargo.toml" -p kez-cli @@ -181,6 +218,40 @@ scenario "node ed25519 markdown ⇒ rust verify file" "${RUST_CLI[@]}" verify file "$TMP/n.kez.md" > "$TMP/n.out" 2>&1 assert_verify_valid "node→rust ed25519 markdown" "$TMP/n.out" && ok +# ── Python interop (claims) ───────────────────────────────────────────────── +# The Python implementation must round-trip with BOTH peers in BOTH directions, +# across every wire encoding and both key types — proving its JCS bytes, +# signatures (pure-Python BIP-340 Schnorr + ed25519) and zstd compact framing +# are all byte-compatible with Rust and Node. +printf "%sPython interop (claims):%s\n" "$YELLOW" "$RESET" +for peer in node rust; do + for fmt in json compact markdown; do + for kt in nostr ed25519; do + if [[ "$kt" == nostr ]]; then key=(--nsec "$NSEC"); else key=(--ed25519-seed "$SEED"); fi + claim_roundtrip "py $kt $fmt ⇒ $peer verify" py "$peer" "$fmt" "${key[@]}" + claim_roundtrip "$peer $kt $fmt ⇒ py verify" "$peer" py "$fmt" "${key[@]}" + done + done +done + +# Python DNS zone form → both peers verify the extracted compact token. +for peer in node rust; do + scenario "py DNS zone form ⇒ $peer verify file" + "${PYTHON_CLI[@]}" claim dns jason.example.com --nsec "$NSEC" > "$TMP/pd.dns" + awk '/^Value:/ {print $2}' "$TMP/pd.dns" > "$TMP/pd.compact" + run_cli "$peer" verify file "$TMP/pd.compact" > "$TMP/pd.out" 2>&1 + assert_verify_valid "py DNS→$peer compact" "$TMP/pd.out" && ok +done + +# Each peer's DNS compact token → Python verifies. +for peer in node rust; do + scenario "$peer DNS zone form ⇒ py verify file" + run_cli "$peer" claim dns jason.example.com --nsec "$NSEC" > "$TMP/xd.dns" + awk '/^Value:/ {print $2}' "$TMP/xd.dns" > "$TMP/xd.compact" + "${PYTHON_CLI[@]}" verify file "$TMP/xd.compact" > "$TMP/xd.out" 2>&1 + assert_verify_valid "$peer DNS→py compact" "$TMP/xd.out" && ok +done + # ── Sigchain interop ──────────────────────────────────────────────────────── # Sigchain state lives in ~/.kez/sigchains/.jsonl. Both CLIs # read/write the same paths, so we can build a chain in one and inspect it @@ -289,6 +360,89 @@ if grep -q "Length: 2 event(s)" "$TMP/sc_f.out"; then ok; else fi rm -f "$SC_ED_FILE" +# ── Python sigchain interop ───────────────────────────────────────────────── +# Build chains with Python and inspect them from each peer (and vice versa), +# over the same ~/.kez/sigchains state files. Uses fresh keys to avoid +# colliding with the scenarios above. +printf "%sPython interop (sigchains):%s\n" "$YELLOW" "$RESET" + +PY_NOSTR_OUT="$("${PYTHON_CLI[@]}" identity new)" +PY_NSEC="$(printf '%s\n' "$PY_NOSTR_OUT" | extract_nsec /dev/stdin)" +PY_NOSTR_PRIMARY="$(printf '%s\n' "$PY_NOSTR_OUT" | sed -n 's/^Primary:[[:space:]]*//p' | head -1)" +PY_NOSTR_FILE="$(chain_file_for "$PY_NOSTR_PRIMARY")" + +PY_ED_OUT="$("${PYTHON_CLI[@]}" identity new --key-type ed25519)" +PY_SEED="$(printf '%s\n' "$PY_ED_OUT" | extract_ed25519_seed /dev/stdin)" +PY_ED_PRIMARY="$(printf '%s\n' "$PY_ED_OUT" | sed -n 's/^Primary:[[:space:]]*//p' | head -1)" +PY_ED_FILE="$(chain_file_for "$PY_ED_PRIMARY")" + +rm -f "$PY_NOSTR_FILE" "$PY_ED_FILE" + +# Python builds a nostr chain; each peer must see all 3 events incl the revoke. +"${PYTHON_CLI[@]}" sigchain add github:jason --nsec "$PY_NSEC" > /dev/null +"${PYTHON_CLI[@]}" sigchain add dns:jason.example --nsec "$PY_NSEC" > /dev/null +"${PYTHON_CLI[@]}" sigchain revoke github:jason --nsec "$PY_NSEC" > /dev/null +for peer in node rust; do + scenario "py nostr chain ⇒ $peer sigchain show" + run_cli "$peer" sigchain show --primary "$PY_NOSTR_PRIMARY" > "$TMP/psc.out" 2>&1 + if grep -q "Length: 3 event(s)" "$TMP/psc.out" \ + && grep -q "op=revoke subject=github:jason" "$TMP/psc.out"; then + ok + else + bad "py→$peer sigchain show" "see $TMP/psc.out" + cat "$TMP/psc.out" >&2 + fi +done +rm -f "$PY_NOSTR_FILE" + +# Each peer builds a nostr chain; Python must see all 3 events. +for peer in node rust; do + rm -f "$PY_NOSTR_FILE" + run_cli "$peer" sigchain add github:jason --nsec "$PY_NSEC" > /dev/null + run_cli "$peer" sigchain add dns:jason.example --nsec "$PY_NSEC" > /dev/null + run_cli "$peer" sigchain revoke github:jason --nsec "$PY_NSEC" > /dev/null + scenario "$peer nostr chain ⇒ py sigchain show" + "${PYTHON_CLI[@]}" sigchain show --primary "$PY_NOSTR_PRIMARY" > "$TMP/psc2.out" 2>&1 + if grep -q "Length: 3 event(s)" "$TMP/psc2.out" \ + && grep -q "op=revoke subject=github:jason" "$TMP/psc2.out"; then + ok + else + bad "$peer→py sigchain show" "see $TMP/psc2.out" + cat "$TMP/psc2.out" >&2 + fi + rm -f "$PY_NOSTR_FILE" +done + +# JSONL byte parity: Python and each peer must export the same on-disk chain +# to byte-identical JSONL. +"${PYTHON_CLI[@]}" sigchain add github:jason --nsec "$PY_NSEC" > /dev/null +"${PYTHON_CLI[@]}" sigchain add dns:jason.example --nsec "$PY_NSEC" > /dev/null +for peer in node rust; do + scenario "py JSONL == $peer JSONL for same chain" + "${PYTHON_CLI[@]}" sigchain export --nsec "$PY_NSEC" --format jsonl > "$TMP/pj_py.jsonl" + run_cli "$peer" sigchain export --nsec "$PY_NSEC" --format jsonl > "$TMP/pj_peer.jsonl" + if diff -q "$TMP/pj_py.jsonl" "$TMP/pj_peer.jsonl" > /dev/null; then + ok + else + bad "py/$peer JSONL parity" "exported bytes differ" + diff "$TMP/pj_py.jsonl" "$TMP/pj_peer.jsonl" | head -20 >&2 + fi +done +rm -f "$PY_NOSTR_FILE" + +# Python ed25519 chain ⇒ each peer validates. +"${PYTHON_CLI[@]}" sigchain add github:jason --ed25519-seed "$PY_SEED" > /dev/null +"${PYTHON_CLI[@]}" sigchain add dns:jason.example --ed25519-seed "$PY_SEED" > /dev/null +for peer in node rust; do + scenario "py ed25519 chain ⇒ $peer sigchain show" + run_cli "$peer" sigchain show --primary "$PY_ED_PRIMARY" > "$TMP/pse.out" 2>&1 + if grep -q "Length: 2 event(s)" "$TMP/pse.out"; then ok; else + bad "py→$peer ed25519 chain" "$peer did not see all events" + cat "$TMP/pse.out" >&2 + fi +done +rm -f "$PY_ED_FILE" + printf "\n" if [[ $FAIL -eq 0 ]]; then printf "%sAll %d scenarios passed.%s\n" "$GREEN" "$PASS" "$RESET" diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..b2f6be0 --- /dev/null +++ b/python/README.md @@ -0,0 +1,106 @@ +# KEZ — Python Implementation + +KEZ is a portable, decentralized identity graph. It lets one person say: + +> "These accounts, keys, domains, and identities are all me." + +…without depending on any central authority. Every connection is proven by a +signature against a key the user already controls. The protocol is specified in +[`../SPEC.md`](../SPEC.md); this directory is the Python implementation of that +spec. + +It is **wire-compatible** with the [Rust](../rust/) and [Node](../nodejs/) +implementations: a claim signed here verifies there and vice versa, in every +direction. The repo-root [`crosstest.sh`](../crosstest.sh) proves it. + +--- + +## What's in this directory + +``` +python/ +├── pyproject.toml Package metadata + entry point (`kez`) +├── requirements.txt Runtime deps (cryptography, zstandard) +├── kez_cli.py Standalone launcher (used by ../crosstest.sh) +└── kez/ + ├── jcs.py RFC 8785 JSON canonicalization + ├── bech32.py Bech32 (nsec/npub) encode/decode + ├── schnorr.py Pure-Python BIP-340 Schnorr over secp256k1 + ├── identity.py `system:identifier` parsing + normalization + ├── keys.py NostrSecret / Ed25519Secret signers + verification + ├── envelope.py Envelope, claim & sigchain-event payloads, sign/verify + ├── encodings.py JSON / compact (kez:z1:) / markdown / DNS / JSONL bundle + ├── sigchain.py Append-only signed sigchain + on-disk storage + ├── channels.py parse_proof across all four wire encodings + └── cli.py The `kez` command-line interface +``` + +--- + +## Setup + +```sh +cd python +python3 -m venv .venv +.venv/bin/pip install -r requirements.txt +``` + +Then run the CLI either through the launcher or the installed entry point: + +```sh +.venv/bin/python kez_cli.py identity new +# or, after `.venv/bin/pip install -e .`: +.venv/bin/kez identity new +``` + +--- + +## Crypto stack + +| Concern | Choice | Why | +|---|---|---| +| JCS (RFC 8785) | hand-rolled (`jcs.py`) | KEZ payloads are strings/ints/objects only; a tiny dependency-free canonicalizer guarantees byte-identical output | +| secp256k1 Schnorr (BIP-340) | pure-Python reference (`schnorr.py`) | the native `coincurve`/`secp256k1` bindings fail to build on recent CPython; signing fixed-size digests is fast enough for a CLI. Signs with zero aux-rand to match Rust/Node exactly | +| Ed25519 (RFC 8032) | [`cryptography`](https://cryptography.io) | well-maintained, ships wheels | +| zstd | [`zstandard`](https://pypi.org/project/zstandard/) | level 3, matching the other impls; `decompressobj` handles frames without a content-size header | +| Bech32 | hand-rolled (`bech32.py`) | the BIP-173 reference is small and avoids a dependency | + +All signing is **deterministic**, so the same claim signs identically every +time. + +--- + +## CLI reference + +``` +kez identity new [--key-type nostr|ed25519] + +kez claim create (--nsec | --ed25519-seed ) + [--format json|compact|markdown] [--out ] +kez claim dns (--nsec | --ed25519-seed ) + +kez verify file + +kez sigchain add (--nsec | --ed25519-seed) [--proof-url ] +kez sigchain revoke (--nsec | --ed25519-seed) +kez sigchain show [--primary | --nsec | --ed25519-seed] +kez sigchain export [--primary | --nsec | --ed25519-seed] + [--format jsonl|compact] [--out ] +``` + +Sigchain state lives in `~/.kez/sigchains/.jsonl` +— the same paths the Rust and Node CLIs use, so chains built by one are +readable by the others. + +--- + +## What's not done yet + +Matching the gap list in [`../rust/README.md`](../rust/README.md), the Python +CLI implements `claim`, `verify file`, and `sigchain add/revoke/show/export`. +Not yet ported: `verify id` channel resolution (network fetch), `sigchain +publish`, and the `rotate`/`add_device` ops. + +## License + +Dual-licensed under MIT or Apache-2.0. diff --git a/python/kez/__init__.py b/python/kez/__init__.py new file mode 100644 index 0000000..bf27091 --- /dev/null +++ b/python/kez/__init__.py @@ -0,0 +1,7 @@ +"""KEZ — portable identity graph, Python implementation. + +Byte-compatible with the Rust and Node.js implementations: claims signed by one +verify in the others, in every direction (see ../../crosstest.sh). +""" + +__version__ = "0.3.0" diff --git a/python/kez/__main__.py b/python/kez/__main__.py new file mode 100644 index 0000000..dbdd066 --- /dev/null +++ b/python/kez/__main__.py @@ -0,0 +1,6 @@ +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/kez/bech32.py b/python/kez/bech32.py new file mode 100644 index 0000000..adbd1df --- /dev/null +++ b/python/kez/bech32.py @@ -0,0 +1,94 @@ +"""Bech32 encoding (BIP-173 variant) for nostr nsec/npub strings. + +Reference implementation adapted from BIP-173 (Pieter Wuille, MIT licensed). +KEZ uses the original Bech32 checksum constant (not Bech32m), matching the +nostr NIP-19 convention and the Rust/Node implementations. +""" + +from __future__ import annotations + +CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" + + +def _polymod(values: list[int]) -> int: + generator = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3] + chk = 1 + for v in values: + b = chk >> 25 + chk = ((chk & 0x1FFFFFF) << 5) ^ v + for i in range(5): + chk ^= generator[i] if ((b >> i) & 1) else 0 + return chk + + +def _hrp_expand(hrp: str) -> list[int]: + return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] + + +def _verify_checksum(hrp: str, data: list[int]) -> bool: + return _polymod(_hrp_expand(hrp) + data) == 1 + + +def _create_checksum(hrp: str, data: list[int]) -> list[int]: + values = _hrp_expand(hrp) + data + polymod = _polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 + return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] + + +def _bech32_encode(hrp: str, data: list[int]) -> str: + combined = data + _create_checksum(hrp, data) + return hrp + "1" + "".join(CHARSET[d] for d in combined) + + +def _bech32_decode(bech: str) -> tuple[str, list[int]]: + if any(ord(x) < 33 or ord(x) > 126 for x in bech): + raise ValueError("bech32: invalid character") + if bech.lower() != bech and bech.upper() != bech: + raise ValueError("bech32: mixed case") + bech = bech.lower() + pos = bech.rfind("1") + if pos < 1 or pos + 7 > len(bech): + raise ValueError("bech32: invalid separator position") + hrp = bech[:pos] + if any(c not in CHARSET for c in bech[pos + 1 :]): + raise ValueError("bech32: invalid data character") + data = [CHARSET.find(c) for c in bech[pos + 1 :]] + if not _verify_checksum(hrp, data): + raise ValueError("bech32: bad checksum") + return hrp, data[:-6] + + +def _convertbits(data: bytes | list[int], frombits: int, tobits: int, pad: bool) -> list[int]: + acc = 0 + bits = 0 + ret: list[int] = [] + maxv = (1 << tobits) - 1 + max_acc = (1 << (frombits + tobits - 1)) - 1 + for value in data: + if value < 0 or (value >> frombits): + raise ValueError("bech32: invalid value in convertbits") + acc = ((acc << frombits) | value) & max_acc + bits += frombits + while bits >= tobits: + bits -= tobits + ret.append((acc >> bits) & maxv) + if pad: + if bits: + ret.append((acc << (tobits - bits)) & maxv) + elif bits >= frombits or ((acc << (tobits - bits)) & maxv): + raise ValueError("bech32: invalid padding in convertbits") + return ret + + +def encode(hrp: str, payload: bytes) -> str: + """Encode raw ``payload`` bytes as a bech32 string under ``hrp``.""" + data = _convertbits(payload, 8, 5, True) + return _bech32_encode(hrp, data) + + +def decode(expected_hrp: str, bech: str) -> bytes: + """Decode a bech32 string, asserting its HRP and returning the raw bytes.""" + hrp, data = _bech32_decode(bech) + if hrp != expected_hrp: + raise ValueError(f"bech32: expected hrp {expected_hrp!r}, got {hrp!r}") + return bytes(_convertbits(data, 5, 8, False)) diff --git a/python/kez/channels.py b/python/kez/channels.py new file mode 100644 index 0000000..998ae79 --- /dev/null +++ b/python/kez/channels.py @@ -0,0 +1,42 @@ +"""Proof parsing across the four wire encodings (Spec §6).""" + +from __future__ import annotations + +import json +from typing import Any + +from .encodings import extract_markdown_proof, from_compact +from .envelope import COMPACT_PROOF_PREFIX + +_B64URL = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_") + + +def extract_compact_token(text: str) -> str | None: + idx = text.find(COMPACT_PROOF_PREFIX) + if idx < 0: + return None + body_chars = [] + for ch in text[idx + len(COMPACT_PROOF_PREFIX) :]: + if ch in _B64URL: + body_chars.append(ch) + else: + break + if not body_chars: + return None + return COMPACT_PROOF_PREFIX + "".join(body_chars) + + +def parse_proof(raw: str) -> dict[str, Any]: + trimmed = raw.strip() + + # Markdown fence is the most specific marker — check it first. + if "```kez" in trimmed: + return extract_markdown_proof(trimmed) + # Raw JSON envelope. + if trimmed.startswith("{"): + return json.loads(trimmed) + # Compact: extract the kez:z1: token anywhere in the input. + token = extract_compact_token(trimmed) + if token is not None: + return from_compact(token) + raise ValueError("unknown KEZ proof format") diff --git a/python/kez/cli.py b/python/kez/cli.py new file mode 100644 index 0000000..84ad65c --- /dev/null +++ b/python/kez/cli.py @@ -0,0 +1,299 @@ +"""KEZ command-line interface (Python implementation). + +Mirrors the Rust and Node CLIs command-for-command and byte-for-byte in its +output, so the cross-implementation interop suite (../crosstest.sh) passes in +every direction. +""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +from . import encodings, sigchain +from .channels import parse_proof +from .envelope import ( + new_add_payload, + new_claim_payload, + new_revoke_payload, + sign_claim, + sign_sigchain_event, + verify_claim, +) +from .identity import Identity +from .keys import Ed25519Secret, NostrSecret, signer_from_flags + + +def _eprint(msg: str) -> None: + print(msg, file=sys.stderr) + + +def write_or_print(out: str | None, output: str) -> None: + if out is not None: + Path(out).write_text(output, encoding="utf-8") + return + # Match Rust/Node: avoid double newlines if output already ends in one. + if output.endswith("\n"): + sys.stdout.write(output) + else: + print(output) + + +# ── identity ──────────────────────────────────────────────────────────────── + + +def cmd_identity_new(args: argparse.Namespace) -> int: + if args.key_type == "ed25519": + secret = Ed25519Secret.generate() + print(f"Primary: {secret.identity()}") + print(f"Public: {secret.pubkey_hex()}") + print(f"Secret: {secret.seed_hex()} (32-byte seed)") + print() + print("Store the secret somewhere safe. Anyone with the seed can sign as this identity.") + else: + secret = NostrSecret.generate() + print(f"Primary: nostr:{secret.npub()}") + print(f"Public: {secret.npub()}") + print(f"Secret: {secret.nsec()}") + print() + print("Store the secret somewhere safe. Anyone with the nsec can sign as this identity.") + return 0 + + +# ── claim ───────────────────────────────────────────────────────────────────── + + +def _build_claim(subject: str, args: argparse.Namespace): + signer = signer_from_flags(args.nsec, args.ed25519_seed) + primary = signer.identity() + payload = new_claim_payload(Identity.parse(subject), primary) + return sign_claim(payload, signer) + + +def cmd_claim_create(args: argparse.Namespace) -> int: + signed = _build_claim(args.subject, args) + if args.format == "markdown": + output = encodings.to_markdown(signed) + elif args.format == "compact": + output = encodings.to_compact(signed) + else: + output = encodings.to_pretty_json(signed) + write_or_print(args.out, output) + return 0 + + +def cmd_claim_dns(args: argparse.Namespace) -> int: + domain = args.domain if args.domain.startswith("dns:") else f"dns:{args.domain}" + signed = _build_claim(domain, args) + name = encodings.dns_txt_name(signed["payload"]["subject"]) + value = encodings.to_compact(signed) + print(f"Name: {name}") + print(f"Value: {value}") + print() + print("Zone file:") + print(f"{name} TXT {encodings.quote_dns_txt_value(value)}") + return 0 + + +# ── verify ──────────────────────────────────────────────────────────────────── + + +def _print_status(status: dict) -> None: + print(f"Primary: {status['primary']}") + print() + print("Verified identities:") + for identity in status["verified"]: + print(f"- {identity}") + print() + print(f"Status: {status['status']}") + print(f"Confidence: {status['confidence']}") + + +def cmd_verify_file(args: argparse.Namespace) -> int: + raw = Path(args.path).read_text(encoding="utf-8") + proof = parse_proof(raw) + status = verify_claim(proof) + _print_status(status) + return 0 + + +def cmd_verify_id(args: argparse.Namespace) -> int: + _eprint( + "verify id requires network channel resolution, which is not implemented " + "in the Python CLI; use 'verify file' instead." + ) + return 2 + + +# ── sigchain ────────────────────────────────────────────────────────────────── + + +def _resolve_primary_readonly(args: argparse.Namespace) -> Identity: + if getattr(args, "primary", None): + return Identity.parse(args.primary) + signer = signer_from_flags(args.nsec, args.ed25519_seed) + return signer.identity() + + +def cmd_sigchain_add(args: argparse.Namespace) -> int: + signer = signer_from_flags(args.nsec, args.ed25519_seed) + primary = signer.identity() + chain = sigchain.load_chain(primary) + payload = new_add_payload( + primary, + chain.next_seq(), + chain.head_hash(), + Identity.parse(args.subject), + args.proof_url, + ) + event = sign_sigchain_event(payload, signer) + chain.append(event) + sigchain.save_chain(chain) + print( + f"Appended add {args.subject} at seq {payload['seq']} " + f"(head hash: {chain.head_hash()})" + ) + print(f"Chain saved to {sigchain.sigchain_path(primary)}") + return 0 + + +def cmd_sigchain_revoke(args: argparse.Namespace) -> int: + signer = signer_from_flags(args.nsec, args.ed25519_seed) + primary = signer.identity() + chain = sigchain.load_chain(primary) + payload = new_revoke_payload( + primary, + chain.next_seq(), + chain.head_hash(), + Identity.parse(args.subject), + ) + event = sign_sigchain_event(payload, signer) + chain.append(event) + sigchain.save_chain(chain) + print( + f"Appended revoke {args.subject} at seq {payload['seq']} " + f"(head hash: {chain.head_hash()})" + ) + print(f"Chain saved to {sigchain.sigchain_path(primary)}") + return 0 + + +def cmd_sigchain_show(args: argparse.Namespace) -> int: + primary = _resolve_primary_readonly(args) + chain = sigchain.load_chain(primary) + print(f"Primary: {primary}") + print(f"Path: {sigchain.sigchain_path(primary)}") + print(f"Length: {len(chain)} event(s)") + print() + for i, event in enumerate(chain.events()): + subject = sigchain.subject_of(event) or "" + op = event["payload"]["op"] + seq = event["payload"]["seq"] + print(f" [{i}] seq={seq} op={op:<6} subject={subject}") + if not chain.is_empty(): + print() + print(f"Head hash: {chain.head_hash()}") + return 0 + + +def cmd_sigchain_export(args: argparse.Namespace) -> int: + primary = _resolve_primary_readonly(args) + chain = sigchain.load_chain(primary) + if chain.is_empty(): + _eprint(f"no chain found for {primary}") + return 1 + if args.format == "compact": + output = chain.to_compact_bundle() + else: + output = chain.to_jsonl() + write_or_print(args.out, output) + return 0 + + +# ── argument parsing ────────────────────────────────────────────────────────── + + +def _add_key_flags(p: argparse.ArgumentParser) -> None: + p.add_argument("--nsec") + p.add_argument("--ed25519-seed", dest="ed25519_seed") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="kez", description="KEZ portable identity CLI") + sub = parser.add_subparsers(dest="command", required=True) + + # identity + p_identity = sub.add_parser("identity", help="key management") + identity_sub = p_identity.add_subparsers(dest="identity_command", required=True) + p_new = identity_sub.add_parser("new", help="generate a new identity") + p_new.add_argument("--key-type", dest="key_type", choices=["nostr", "ed25519"], default="nostr") + p_new.set_defaults(func=cmd_identity_new) + + # claim + p_claim = sub.add_parser("claim", help="create claims") + claim_sub = p_claim.add_subparsers(dest="claim_command", required=True) + p_create = claim_sub.add_parser("create", help="create a signed claim") + p_create.add_argument("subject") + _add_key_flags(p_create) + p_create.add_argument("--format", choices=["json", "compact", "markdown"], default="json") + p_create.add_argument("--out") + p_create.set_defaults(func=cmd_claim_create) + + p_dns = claim_sub.add_parser("dns", help="create a DNS-zone proof for a domain") + p_dns.add_argument("domain") + _add_key_flags(p_dns) + p_dns.set_defaults(func=cmd_claim_dns) + + # verify + p_verify = sub.add_parser("verify", help="verify proofs") + verify_sub = p_verify.add_subparsers(dest="verify_command", required=True) + p_vfile = verify_sub.add_parser("file", help="verify a proof file") + p_vfile.add_argument("path") + p_vfile.set_defaults(func=cmd_verify_file) + p_vid = verify_sub.add_parser("id", help="verify an identifier via its channels") + p_vid.add_argument("identifier") + p_vid.set_defaults(func=cmd_verify_id) + + # sigchain + p_sig = sub.add_parser("sigchain", help="manage a sigchain") + sig_sub = p_sig.add_subparsers(dest="sigchain_command", required=True) + + p_add = sig_sub.add_parser("add", help="append an add event") + p_add.add_argument("subject") + _add_key_flags(p_add) + p_add.add_argument("--proof-url", dest="proof_url") + p_add.set_defaults(func=cmd_sigchain_add) + + p_revoke = sig_sub.add_parser("revoke", help="append a revoke event") + p_revoke.add_argument("subject") + _add_key_flags(p_revoke) + p_revoke.set_defaults(func=cmd_sigchain_revoke) + + p_show = sig_sub.add_parser("show", help="show a sigchain") + p_show.add_argument("--primary") + _add_key_flags(p_show) + p_show.set_defaults(func=cmd_sigchain_show) + + p_export = sig_sub.add_parser("export", help="export a sigchain") + p_export.add_argument("--primary") + _add_key_flags(p_export) + p_export.add_argument("--format", choices=["jsonl", "compact"], default="jsonl") + p_export.add_argument("--out") + p_export.set_defaults(func=cmd_sigchain_export) + + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + return args.func(args) + except Exception as exc: # noqa: BLE001 — top-level CLI error boundary + _eprint(f"error: {exc}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/kez/encodings.py b/python/kez/encodings.py new file mode 100644 index 0000000..fe82c03 --- /dev/null +++ b/python/kez/encodings.py @@ -0,0 +1,129 @@ +"""Wire encodings: JSON, compact (kez:z1:), markdown, DNS (Spec §6).""" + +from __future__ import annotations + +import base64 +import json +from typing import Any + +import zstandard + +from .envelope import COMPACT_CHAIN_PREFIX, COMPACT_PROOF_PREFIX + +_ZSTD_LEVEL = 3 + + +def _b64url_nopad(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") + + +def _b64url_decode(s: str) -> bytes: + pad = "=" * (-len(s) % 4) + return base64.urlsafe_b64decode(s + pad) + + +def _zstd_compress(data: bytes) -> bytes: + return zstandard.ZstdCompressor(level=_ZSTD_LEVEL).compress(data) + + +def _zstd_decompress(data: bytes) -> bytes: + # decompressobj handles frames that omit the content-size header, which + # some encoders (e.g. Node's zstd) produce. + dobj = zstandard.ZstdDecompressor().decompressobj() + return dobj.decompress(data) + dobj.flush() + + +def to_pretty_json(envelope: dict[str, Any]) -> str: + return json.dumps(envelope, indent=2, ensure_ascii=False) + + +def to_compact_json(envelope: dict[str, Any]) -> str: + return json.dumps(envelope, separators=(",", ":"), ensure_ascii=False) + + +def to_compact(envelope: dict[str, Any]) -> str: + raw = to_compact_json(envelope).encode("utf-8") + return COMPACT_PROOF_PREFIX + _b64url_nopad(_zstd_compress(raw)) + + +def from_compact(value: str) -> dict[str, Any]: + trimmed = value.strip() + if not trimmed.startswith(COMPACT_PROOF_PREFIX): + raise ValueError("compact proof missing kez:z1: prefix") + body = trimmed[len(COMPACT_PROOF_PREFIX) :] + raw = _zstd_decompress(_b64url_decode(body)) + return json.loads(raw.decode("utf-8")) + + +def to_markdown(envelope: dict[str, Any]) -> str: + payload = envelope["payload"] + return ( + "# KEZ Proof\n\n" + "This account publishes a signed KEZ identity claim.\n\n" + f"- Primary: `{payload['primary']}`\n" + f"- Subject: `{payload['subject']}`\n" + f"- Created: `{payload['created_at']}`\n\n" + "```kez\n" + f"{to_pretty_json(envelope)}\n" + "```\n" + ) + + +def extract_markdown_proof(markdown: str) -> dict[str, Any]: + fence = "```kez" + start = markdown.find(fence) + if start < 0: + raise ValueError("missing ```kez proof block") + body_start = start + len(fence) + end = markdown.find("```", body_start) + if end < 0: + raise ValueError("unterminated ```kez proof block") + return json.loads(markdown[body_start:end].strip()) + + +# ── Sigchain compact bundle (kez:zc1:) ────────────────────────────────────── + + +def chain_to_jsonl(events: list[dict[str, Any]]) -> str: + if not events: + return "" + return "\n".join(json.dumps(e, separators=(",", ":"), ensure_ascii=False) for e in events) + "\n" + + +def chain_from_jsonl(text: str) -> list[dict[str, Any]]: + return [json.loads(line) for line in text.splitlines() if line.strip()] + + +def chain_to_compact_bundle(events: list[dict[str, Any]]) -> str: + raw = chain_to_jsonl(events).encode("utf-8") + return COMPACT_CHAIN_PREFIX + _b64url_nopad(_zstd_compress(raw)) + + +def chain_from_compact_bundle(value: str) -> list[dict[str, Any]]: + trimmed = value.strip() + if not trimmed.startswith(COMPACT_CHAIN_PREFIX): + raise ValueError("compact chain missing kez:zc1: prefix") + body = trimmed[len(COMPACT_CHAIN_PREFIX) :] + raw = _zstd_decompress(_b64url_decode(body)) + return chain_from_jsonl(raw.decode("utf-8")) + + +# ── DNS TXT helpers ───────────────────────────────────────────────────────── + + +def dns_txt_name(subject) -> str: + from .identity import Identity + + ident = subject if isinstance(subject, Identity) else Identity.parse(str(subject)) + if ident.scheme != "dns": + raise ValueError("DNS TXT proof requires a dns: subject") + return f"_kez.{ident.value}" + + +def quote_dns_txt_value(value: str) -> str: + chunks = [value[i : i + 240] for i in range(0, len(value), 240)] + quoted = [] + for chunk in chunks: + escaped = chunk.replace("\\", "\\\\").replace('"', '\\"') + quoted.append(f'"{escaped}"') + return " ".join(quoted) diff --git a/python/kez/envelope.py b/python/kez/envelope.py new file mode 100644 index 0000000..57e54d4 --- /dev/null +++ b/python/kez/envelope.py @@ -0,0 +1,154 @@ +"""Signature envelopes, claim payloads and verification (Spec §4, §5).""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from .identity import Identity +from .keys import verify_signature + +CLAIM_TYPE = "kez.claim" +SIGCHAIN_EVENT_TYPE = "kez.sigchain.event" +FORMAT_VERSION = 1 +COMPACT_PROOF_PREFIX = "kez:z1:" +COMPACT_CHAIN_PREFIX = "kez:zc1:" + + +class VerificationError(Exception): + pass + + +def rfc3339_utc(dt: datetime | None = None) -> str: + """RFC 3339 UTC timestamp with microsecond precision and a trailing ``Z``.""" + if dt is None: + dt = datetime.now(timezone.utc) + dt = dt.astimezone(timezone.utc) + return dt.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" + + +def new_claim_payload( + subject: Identity, + primary: Identity, + created_at: str | None = None, +) -> dict[str, Any]: + return { + "type": CLAIM_TYPE, + "version": FORMAT_VERSION, + "subject": str(subject), + "primary": str(primary), + "created_at": created_at or rfc3339_utc(), + } + + +def sign_claim(payload: dict[str, Any], signer) -> dict[str, Any]: + key = signer.identity() + if payload["primary"] != str(key): + raise VerificationError( + f"claim primary {payload['primary']!r} does not match signing key {key}" + ) + return { + "kez": "claim", + "payload": payload, + "signature": { + "alg": signer.alg(), + "key": str(key), + "sig": signer.sign_payload(payload), + }, + } + + +def verify_claim(envelope: dict[str, Any]) -> dict[str, Any]: + """Verify a claim envelope; return a status dict on success, else raise.""" + if envelope.get("kez") != "claim": + raise VerificationError(f"not a claim envelope: kez={envelope.get('kez')!r}") + payload = envelope["payload"] + signature = envelope["signature"] + if signature["key"] != payload["primary"]: + raise VerificationError("signature.key does not match payload.primary") + + primary = Identity.parse(payload["primary"]) + key = Identity.parse(signature["key"]) + if not verify_signature(payload, signature["alg"], key, signature["sig"]): + raise VerificationError(f"signature did not verify (alg={signature['alg']})") + + subject = Identity.parse(payload["subject"]) + return { + "primary": primary, + "verified": [subject], + "status": "valid", + "confidence": "strong", + } + + +# ── Sigchain event payloads ───────────────────────────────────────────────── + + +def _event_payload( + primary: Identity, + seq: int, + prev: str | None, + op: str, + op_payload: dict[str, Any], + created_at: str | None = None, +) -> dict[str, Any]: + payload: dict[str, Any] = { + "type": SIGCHAIN_EVENT_TYPE, + "version": FORMAT_VERSION, + "primary": str(primary), + "seq": seq, + } + if prev is not None: + payload["prev"] = prev + payload["created_at"] = created_at or rfc3339_utc() + payload["op"] = op + payload["payload"] = op_payload + return payload + + +def new_add_payload( + primary: Identity, + seq: int, + prev: str | None, + subject: Identity, + proof_url: str | None = None, + created_at: str | None = None, +) -> dict[str, Any]: + op_payload: dict[str, Any] = {"subject": str(subject)} + if proof_url: + op_payload["proof_url"] = proof_url + return _event_payload(primary, seq, prev, "add", op_payload, created_at) + + +def new_revoke_payload( + primary: Identity, + seq: int, + prev: str | None, + subject: Identity, + created_at: str | None = None, +) -> dict[str, Any]: + return _event_payload(primary, seq, prev, "revoke", {"subject": str(subject)}, created_at) + + +def sign_sigchain_event(payload: dict[str, Any], signer) -> dict[str, Any]: + return { + "kez": "sigchain_event", + "payload": payload, + "signature": { + "alg": signer.alg(), + "key": str(signer.identity()), + "sig": signer.sign_payload(payload), + }, + } + + +def verify_sigchain_event(event: dict[str, Any]) -> None: + if event.get("kez") != "sigchain_event": + raise VerificationError(f"wrong envelope tag: {event.get('kez')!r}") + payload = event["payload"] + signature = event["signature"] + if signature["key"] != payload["primary"]: + raise VerificationError("signature.key does not match payload.primary") + key = Identity.parse(signature["key"]) + if not verify_signature(payload, signature["alg"], key, signature["sig"]): + raise VerificationError("sigchain event signature did not verify") diff --git a/python/kez/identity.py b/python/kez/identity.py new file mode 100644 index 0000000..cba9019 --- /dev/null +++ b/python/kez/identity.py @@ -0,0 +1,77 @@ +"""KEZ identifiers: always ``system:identifier`` (Spec §3).""" + +from __future__ import annotations + +import re + +from . import bech32 + +_HEX64 = re.compile(r"^[0-9a-f]{64}$") + + +class IdentityError(ValueError): + pass + + +class Identity: + """A canonical ``system:identifier`` string.""" + + __slots__ = ("_raw",) + + def __init__(self, raw: str) -> None: + self._raw = raw + + @classmethod + def parse(cls, raw: str) -> "Identity": + trimmed = raw.strip() + if not trimmed: + raise IdentityError("empty identity") + + # CLI ergonomics: a bare npub normalizes to nostr:npub... + if trimmed.startswith("npub1"): + _validate_npub(trimmed) + return cls(f"nostr:{trimmed}") + + colon = trimmed.find(":") + if colon <= 0 or colon == len(trimmed) - 1: + raise IdentityError(f"invalid identity (need scheme:value): {raw!r}") + scheme = trimmed[:colon] + rest = trimmed[colon + 1 :] + if scheme == "nostr": + _validate_npub(rest) + elif scheme == "ed25519": + _validate_ed25519_hex(rest) + return cls(f"{scheme}:{rest}") + + @property + def scheme(self) -> str: + return self._raw.split(":", 1)[0] + + @property + def value(self) -> str: + return self._raw.split(":", 1)[1] + + def __str__(self) -> str: + return self._raw + + def __repr__(self) -> str: + return f"Identity({self._raw!r})" + + def __eq__(self, other: object) -> bool: + return isinstance(other, Identity) and other._raw == self._raw + + def __hash__(self) -> int: + return hash(self._raw) + + +def _validate_npub(value: str) -> None: + if not value.startswith("npub1"): + raise IdentityError(f"invalid nostr identifier (expected npub1...): {value!r}") + raw = bech32.decode("npub", value) + if len(raw) != 32: + raise IdentityError("invalid npub: expected 32-byte key") + + +def _validate_ed25519_hex(value: str) -> None: + if not _HEX64.match(value): + raise IdentityError("invalid ed25519 identifier: expected 64 lowercase hex chars") diff --git a/python/kez/jcs.py b/python/kez/jcs.py new file mode 100644 index 0000000..3d02734 --- /dev/null +++ b/python/kez/jcs.py @@ -0,0 +1,78 @@ +"""RFC 8785 JSON Canonicalization Scheme (JCS). + +This is the heart of cross-implementation interop: signatures are computed over +the JCS-canonicalized bytes of a payload, so two implementations that agree on +these bytes produce universally-verifiable signatures. + +Payloads in KEZ only ever contain strings, integers, booleans, nulls, arrays +and objects — never floating-point numbers — so we implement the integer-only +subset of the number rules. A float would be a bug, so we reject it loudly. +""" + +from __future__ import annotations + +from typing import Any + + +def _canon_string(s: str) -> str: + out = ['"'] + for ch in s: + c = ord(ch) + if ch == '"': + out.append('\\"') + elif ch == "\\": + out.append("\\\\") + elif c == 0x08: + out.append("\\b") + elif c == 0x09: + out.append("\\t") + elif c == 0x0A: + out.append("\\n") + elif c == 0x0C: + out.append("\\f") + elif c == 0x0D: + out.append("\\r") + elif c < 0x20: + out.append("\\u%04x" % c) + else: + out.append(ch) + out.append('"') + return "".join(out) + + +def _canon(value: Any) -> str: + if value is True: + return "true" + if value is False: + return "false" + if value is None: + return "null" + if isinstance(value, str): + return _canon_string(value) + if isinstance(value, bool): # unreachable (handled above) but explicit + return "true" if value else "false" + if isinstance(value, int): + return str(value) + if isinstance(value, float): + # KEZ payloads never carry floats; refuse rather than risk a + # non-canonical number serialization. + if value.is_integer(): + return str(int(value)) + raise ValueError("JCS: floating-point numbers are not supported in KEZ payloads") + if isinstance(value, (list, tuple)): + return "[" + ",".join(_canon(v) for v in value) + "]" + if isinstance(value, dict): + # RFC 8785: sort object members by their UTF-16 code-unit sequence. + items = sorted(value.items(), key=lambda kv: kv[0].encode("utf-16-be")) + return "{" + ",".join(_canon_string(k) + ":" + _canon(v) for k, v in items) + "}" + raise TypeError(f"JCS: unsupported type {type(value)!r}") + + +def canonicalize(value: Any) -> str: + """Return the RFC 8785 canonical JSON string for ``value``.""" + return _canon(value) + + +def canonical_bytes(value: Any) -> bytes: + """Return the RFC 8785 canonical JSON bytes (UTF-8) for ``value``.""" + return _canon(value).encode("utf-8") diff --git a/python/kez/keys.py b/python/kez/keys.py new file mode 100644 index 0000000..e5d87ea --- /dev/null +++ b/python/kez/keys.py @@ -0,0 +1,142 @@ +"""Signing keys: nostr (secp256k1 Schnorr) and ed25519.""" + +from __future__ import annotations + +import hashlib +import os + +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) +from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, PublicFormat + +from . import bech32, schnorr +from .identity import Identity +from .jcs import canonical_bytes + +NOSTR_SCHNORR_ALG = "nostr-secp256k1-schnorr-sha256-jcs" +ED25519_SHA512_ALG = "ed25519-sha512-jcs" + + +class NostrSecret: + """A nostr secp256k1 secret key, addressed by its npub.""" + + __slots__ = ("_sk", "_pub") + + def __init__(self, secret_key: bytes) -> None: + if len(secret_key) != 32: + raise ValueError("nostr secret key must be 32 bytes") + self._sk = secret_key + self._pub = schnorr.pubkey_gen(secret_key) + + @classmethod + def generate(cls) -> "NostrSecret": + while True: + sk = os.urandom(32) + i = int.from_bytes(sk, "big") + if 1 <= i < schnorr.n: + return cls(sk) + + @classmethod + def from_nsec(cls, nsec: str) -> "NostrSecret": + raw = bech32.decode("nsec", nsec.strip()) + if len(raw) != 32: + raise ValueError("invalid nsec: expected 32-byte key") + return cls(raw) + + def nsec(self) -> str: + return bech32.encode("nsec", self._sk) + + def npub(self) -> str: + return bech32.encode("npub", self._pub) + + def identity(self) -> Identity: + return Identity.parse(f"nostr:{self.npub()}") + + def alg(self) -> str: + return NOSTR_SCHNORR_ALG + + def sign_payload(self, payload) -> str: + digest = hashlib.sha256(canonical_bytes(payload)).digest() + return schnorr.sign(digest, self._sk).hex() + + +class Ed25519Secret: + """An ed25519 secret seed, addressed by its hex public key.""" + + __slots__ = ("_seed", "_key", "_pub") + + def __init__(self, seed: bytes) -> None: + if len(seed) != 32: + raise ValueError("ed25519 seed must be 32 bytes") + self._seed = seed + self._key = Ed25519PrivateKey.from_private_bytes(seed) + self._pub = self._key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + + @classmethod + def generate(cls) -> "Ed25519Secret": + key = Ed25519PrivateKey.generate() + seed = key.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) + return cls(seed) + + @classmethod + def from_seed_hex(cls, seed_hex: str) -> "Ed25519Secret": + seed = bytes.fromhex(seed_hex.strip()) + if len(seed) != 32: + raise ValueError("invalid ed25519 seed: expected 32-byte (64 hex char) seed") + return cls(seed) + + def seed_hex(self) -> str: + return self._seed.hex() + + def pubkey_hex(self) -> str: + return self._pub.hex() + + def identity(self) -> Identity: + return Identity.parse(f"ed25519:{self.pubkey_hex()}") + + def alg(self) -> str: + return ED25519_SHA512_ALG + + def sign_payload(self, payload) -> str: + return self._key.sign(canonical_bytes(payload)).hex() + + +def verify_signature(payload, alg: str, key: Identity, sig_hex: str) -> bool: + """Verify a signature block against an arbitrary JCS-canonicalizable payload.""" + try: + sig = bytes.fromhex(sig_hex) + except ValueError: + return False + if len(sig) != 64: + return False + + if alg == NOSTR_SCHNORR_ALG: + if key.scheme != "nostr": + return False + pubkey = bech32.decode("npub", key.value) + digest = hashlib.sha256(canonical_bytes(payload)).digest() + return schnorr.verify(digest, pubkey, sig) + + if alg == ED25519_SHA512_ALG: + if key.scheme != "ed25519": + return False + try: + pub = Ed25519PublicKey.from_public_bytes(bytes.fromhex(key.value)) + pub.verify(sig, canonical_bytes(payload)) + return True + except Exception: + return False + + return False + + +def signer_from_flags(nsec: str | None, ed25519_seed: str | None): + if nsec and ed25519_seed: + raise ValueError("pass only one of --nsec or --ed25519-seed") + if nsec: + return NostrSecret.from_nsec(nsec) + if ed25519_seed: + return Ed25519Secret.from_seed_hex(ed25519_seed) + raise ValueError("missing key: pass --nsec or --ed25519-seed") diff --git a/python/kez/schnorr.py b/python/kez/schnorr.py new file mode 100644 index 0000000..c18ddc6 --- /dev/null +++ b/python/kez/schnorr.py @@ -0,0 +1,151 @@ +"""BIP-340 Schnorr signatures over secp256k1 (pure Python). + +This is the reference implementation from BIP-340 (public domain), used for the +``nostr-secp256k1-schnorr-sha256-jcs`` suite. We sign with an all-zero auxiliary +randomness value, matching the Rust ``sign_schnorr_no_aux_rand`` and the Node +``schnorr.sign(digest, sk, ZERO_AUX)`` calls — so every implementation produces +byte-identical, deterministic signatures. + +We use a pure-Python implementation because the native ``coincurve``/``secp256k1`` +bindings fail to build on bleeding-edge CPython. Signing/verifying short +fixed-size digests is well within pure-Python performance for a CLI tool. +""" + +from __future__ import annotations + +import hashlib + +p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F +n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 +G = ( + 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, + 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, +) + +Point = tuple[int, int] | None + + +def tagged_hash(tag: str, msg: bytes) -> bytes: + tag_hash = hashlib.sha256(tag.encode()).digest() + return hashlib.sha256(tag_hash + tag_hash + msg).digest() + + +def is_infinite(P: Point) -> bool: + return P is None + + +def x(P: Point) -> int: + assert P is not None + return P[0] + + +def y(P: Point) -> int: + assert P is not None + return P[1] + + +def point_add(P1: Point, P2: Point) -> Point: + if P1 is None: + return P2 + if P2 is None: + return P1 + if x(P1) == x(P2) and y(P1) != y(P2): + return None + if P1 == P2: + lam = (3 * x(P1) * x(P1) * pow(2 * y(P1), p - 2, p)) % p + else: + lam = ((y(P2) - y(P1)) * pow(x(P2) - x(P1), p - 2, p)) % p + x3 = (lam * lam - x(P1) - x(P2)) % p + return (x3, (lam * (x(P1) - x3) - y(P1)) % p) + + +def point_mul(P: Point, scalar: int) -> Point: + R: Point = None + for i in range(256): + if (scalar >> i) & 1: + R = point_add(R, P) + P = point_add(P, P) + return R + + +def bytes_from_int(x_: int) -> bytes: + return x_.to_bytes(32, byteorder="big") + + +def bytes_from_point(P: Point) -> bytes: + return bytes_from_int(x(P)) + + +def lift_x(b: bytes) -> Point: + x_ = int.from_bytes(b, byteorder="big") + if x_ >= p: + return None + y_sq = (pow(x_, 3, p) + 7) % p + y_ = pow(y_sq, (p + 1) // 4, p) + if pow(y_, 2, p) != y_sq: + return None + return (x_, y_ if y_ & 1 == 0 else p - y_) + + +def int_from_bytes(b: bytes) -> int: + return int.from_bytes(b, byteorder="big") + + +def has_even_y(P: Point) -> bool: + assert P is not None + return y(P) % 2 == 0 + + +def pubkey_gen(seckey: bytes) -> bytes: + """Return the 32-byte x-only public key for a 32-byte secret key.""" + d0 = int_from_bytes(seckey) + if not (1 <= d0 <= n - 1): + raise ValueError("schnorr: secret key out of range") + P = point_mul(G, d0) + assert P is not None + return bytes_from_point(P) + + +def sign(msg: bytes, seckey: bytes, aux_rand: bytes = b"\x00" * 32) -> bytes: + """Produce a 64-byte BIP-340 Schnorr signature over ``msg``.""" + d0 = int_from_bytes(seckey) + if not (1 <= d0 <= n - 1): + raise ValueError("schnorr: secret key out of range") + P = point_mul(G, d0) + assert P is not None + d = d0 if has_even_y(P) else n - d0 + t = (d ^ int_from_bytes(tagged_hash("BIP0340/aux", aux_rand))).to_bytes(32, "big") + k0 = int_from_bytes(tagged_hash("BIP0340/nonce", t + bytes_from_point(P) + msg)) % n + if k0 == 0: + raise ValueError("schnorr: nonce generation failed") + R = point_mul(G, k0) + assert R is not None + k = k0 if has_even_y(R) else n - k0 + e = ( + int_from_bytes( + tagged_hash("BIP0340/challenge", bytes_from_point(R) + bytes_from_point(P) + msg) + ) + % n + ) + sig = bytes_from_point(R) + ((k + e * d) % n).to_bytes(32, "big") + if not verify(msg, bytes_from_point(P), sig): + raise ValueError("schnorr: produced an invalid signature") + return sig + + +def verify(msg: bytes, pubkey: bytes, sig: bytes) -> bool: + """Verify a 64-byte BIP-340 Schnorr signature ``sig`` over ``msg``.""" + if len(pubkey) != 32 or len(sig) != 64: + return False + P = lift_x(pubkey) + r = int_from_bytes(sig[0:32]) + s = int_from_bytes(sig[32:64]) + if P is None or r >= p or s >= n: + return False + e = ( + int_from_bytes(tagged_hash("BIP0340/challenge", sig[0:32] + pubkey + msg)) % n + ) + R = point_add(point_mul(G, s), point_mul(P, n - e)) + if R is None or not has_even_y(R) or x(R) != r: + return False + return True diff --git a/python/kez/sigchain.py b/python/kez/sigchain.py new file mode 100644 index 0000000..38aa5b2 --- /dev/null +++ b/python/kez/sigchain.py @@ -0,0 +1,112 @@ +"""Append-only signed sigchain (Spec §8) and on-disk storage.""" + +from __future__ import annotations + +import hashlib +import os +from pathlib import Path +from typing import Any + +from . import encodings +from .envelope import verify_sigchain_event +from .identity import Identity +from .jcs import canonical_bytes + + +class SigchainError(Exception): + pass + + +def event_hash(event: dict[str, Any]) -> str: + """``sha256:`` of the JCS bytes of the entire signed envelope.""" + digest = hashlib.sha256(canonical_bytes(event)).hexdigest() + return f"sha256:{digest}" + + +class Sigchain: + def __init__(self, primary: Identity) -> None: + self._primary = primary + self._events: list[dict[str, Any]] = [] + + @property + def primary(self) -> Identity: + return self._primary + + def events(self) -> list[dict[str, Any]]: + return self._events + + def __len__(self) -> int: + return len(self._events) + + def is_empty(self) -> bool: + return not self._events + + def next_seq(self) -> int: + return len(self._events) + + def head_hash(self) -> str | None: + if not self._events: + return None + return event_hash(self._events[-1]) + + def append(self, event: dict[str, Any]) -> None: + if event.get("kez") != "sigchain_event": + raise SigchainError(f"wrong envelope tag: {event.get('kez')!r}") + payload = event["payload"] + if payload["primary"] != str(self._primary): + raise SigchainError("event primary does not match chain primary") + expected_seq = self.next_seq() + if payload["seq"] != expected_seq: + raise SigchainError(f"seq mismatch: expected {expected_seq}, got {payload['seq']}") + expected_prev = self.head_hash() + if payload.get("prev") != expected_prev: + raise SigchainError("prev hash mismatch") + verify_sigchain_event(event) + self._events.append(event) + + # ── serialization ── + + def to_jsonl(self) -> str: + return encodings.chain_to_jsonl(self._events) + + def to_compact_bundle(self) -> str: + return encodings.chain_to_compact_bundle(self._events) + + @classmethod + def from_jsonl(cls, primary: Identity, text: str) -> "Sigchain": + chain = cls(primary) + for event in encodings.chain_from_jsonl(text): + chain.append(event) + return chain + + +def subject_of(event: dict[str, Any]) -> str | None: + op_payload = event.get("payload", {}).get("payload", {}) + subject = op_payload.get("subject") + return subject if isinstance(subject, str) else None + + +# ── storage ───────────────────────────────────────────────────────────────── + + +def sigchain_dir() -> Path: + return Path(os.path.expanduser("~")) / ".kez" / "sigchains" + + +def sigchain_path(primary: Identity) -> Path: + d = sigchain_dir() + d.mkdir(parents=True, exist_ok=True) + safe = str(primary).replace(":", "_") + return d / f"{safe}.jsonl" + + +def load_chain(primary: Identity) -> Sigchain: + path = sigchain_path(primary) + if not path.exists(): + return Sigchain(primary) + return Sigchain.from_jsonl(primary, path.read_text(encoding="utf-8")) + + +def save_chain(chain: Sigchain) -> None: + path = sigchain_path(chain.primary) + path.write_text(chain.to_jsonl(), encoding="utf-8") diff --git a/python/kez_cli.py b/python/kez_cli.py new file mode 100644 index 0000000..db5b9ca --- /dev/null +++ b/python/kez_cli.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +"""Standalone launcher for the KEZ Python CLI. + +Lets the cross-implementation test harness invoke the CLI from any working +directory without installing the package: + + python/.venv/bin/python python/kez_cli.py +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from kez.cli import main # noqa: E402 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 0000000..2d3f202 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "kez" +version = "0.3.0" +description = "KEZ portable identity graph — Python implementation" +requires-python = ">=3.10" +dependencies = [ + "cryptography>=42", + "zstandard>=0.22", +] + +[project.scripts] +kez = "kez.cli:main" + +[build-system] +requires = ["setuptools>=61"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +packages = ["kez"] diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 0000000..0f9027e --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1,2 @@ +cryptography>=42 +zstandard>=0.22