#!/usr/bin/env python3 from __future__ import annotations import argparse import sys from typing import Dict, Iterable, List, Tuple # ---------- IO ---------- def read_bytes(path: str) -> bytes: if path == "-" or path is None: return sys.stdin.buffer.read() with open(path, "rb") as f: return f.read() def write_bytes(path: str, data: bytes) -> None: if path == "-" or path is None: sys.stdout.buffer.write(data) return with open(path, "wb") as f: f.write(data) def read_text(path: str) -> str: if path == "-" or path is None: return sys.stdin.read() with open(path, "r", encoding="utf-8") as f: return f.read() def write_text(path: str, text: str) -> None: if path == "-" or path is None: sys.stdout.write(text) return with open(path, "w", encoding="utf-8", newline="") as f: f.write(text) # ---------- bits (MSB-first, 0-padding) ---------- def bytes_to_bits_msb(data: bytes) -> List[int]: out: List[int] = [] for b in data: for i in range(7, -1, -1): out.append((b >> i) & 1) return out def bits_to_bytes_msb_trim(bits: Iterable[int]) -> bytes: b = list(bits) b = b[: (len(b) // 8) * 8] # DROP partial byte out = bytearray() for i in range(0, len(b), 8): v = 0 for bit in b[i:i+8]: v = (v << 1) | (bit & 1) out.append(v) return bytes(out) def decode_bytes_nul_terminated(data: bytes) -> str: if b"\x00" in data: data = data.split(b"\x00", 1)[0] for enc in ("utf-8", "ascii"): try: return data.decode(enc) except UnicodeDecodeError: pass return data.decode("latin-1", errors="replace") # ---------- 2-bit symbols packed into bytes ---------- def symbols_from_bytes(data: bytes) -> List[int]: syms: List[int] = [] for b in data: syms.append((b >> 6) & 0b11) syms.append((b >> 4) & 0b11) syms.append((b >> 2) & 0b11) syms.append(b & 0b11) return syms def bytes_from_symbols(symbols: List[int]) -> bytes: syms = list(symbols) if len(syms) % 4: syms.extend([0] * ((4 - (len(syms) % 4)) % 4)) # pad with 00 out = bytearray() for i in range(0, len(syms), 4): b = ((syms[i] & 3) << 6) | ((syms[i+1] & 3) << 4) | ((syms[i+2] & 3) << 2) | (syms[i+3] & 3) out.append(b) return bytes(out) # ---------- configurable symbols ---------- def build_symbol_maps(symbols: str) -> tuple[Dict[int, str], Dict[str, int]]: if len(symbols) != 4: raise ValueError("--symbols must be exactly 4 characters, e.g. --symbols ACGT") if len(set(symbols)) != 4: raise ValueError("--symbols characters must be distinct (no repeats)") bits_to_sym = { 0b00: symbols[0], 0b01: symbols[1], 0b10: symbols[2], 0b11: symbols[3], } sym_to_bits = {v: k for k, v in bits_to_sym.items()} return bits_to_sym, sym_to_bits # ---------- pretty/unpretty (combined binary <-> chosen symbols) ---------- def pretty_print_combined(data: bytes, bits_to_sym: Dict[int, str], wrap: int = 0) -> str: glyphs = [bits_to_sym[s] for s in symbols_from_bytes(data)] if wrap and wrap > 0: lines = ["".join(glyphs[i:i+wrap]) for i in range(0, len(glyphs), wrap)] return "\n".join(lines) + "\n" return "".join(glyphs) + "\n" def unpretty_print_combined(text: str, sym_to_bits: Dict[str, int]) -> bytes: syms: List[int] = [] for ch in text: v = sym_to_bits.get(ch) if v is not None: syms.append(v) return bytes_from_symbols(syms) # ---------- mux (TEXT messages; NUL-terminated; 0-padded) ---------- def ensure_shape_capacity(shape_bits: List[int], squares_needed: int, lines_needed: int) -> List[int]: have_sq = shape_bits.count(0) have_ln = shape_bits.count(1) missing_sq = max(0, squares_needed - have_sq) missing_ln = max(0, lines_needed - have_ln) if missing_sq or missing_ln: shape_bits = shape_bits + ([0] * missing_sq) + ([1] * missing_ln) return shape_bits def mux_encode_text(first: str, second: str, third: str, *, encoding: str = "utf-8") -> bytes: shape_b = first.encode(encoding) #+ b"\x00" squares_b = second.encode(encoding) #+ b"\x00" lines_b = third.encode(encoding) #+ b"\x00" shape_bits = bytes_to_bits_msb(shape_b) squares_bits = bytes_to_bits_msb(squares_b) lines_bits = bytes_to_bits_msb(lines_b) shape_bits = ensure_shape_capacity(shape_bits, len(squares_bits), len(lines_bits)) si = 0 li = 0 syms: List[int] = [] for sb in shape_bits: if sb == 0: if si < len(squares_bits): bit = squares_bits[si] & 1 si += 1 else: bit = 0 # pad once squares payload is exhausted lsb = bit syms.append((0 << 1) | lsb) else: if li < len(lines_bits): bit = lines_bits[li] & 1 li += 1 else: bit = 0 # pad once lines payload is exhausted lsb = 1 - bit # keep your "lines flipped" convention syms.append((1 << 1) | lsb) # Only fail if we DIDN'T manage to consume the full payloads if si < len(squares_bits) or li < len(lines_bits): raise ValueError( f"shape mask too small: squares {si}/{len(squares_bits)} lines {li}/{len(lines_bits)}" ) return bytes_from_symbols(syms) def mux_decode_text(combined: bytes) -> Tuple[str, str, str]: syms = symbols_from_bytes(combined) shape_bits: List[int] = [] squares_bits: List[int] = [] lines_bits: List[int] = [] for s in syms: shape = (s >> 1) & 1 lsb = s & 1 shape_bits.append(shape) if shape == 0: squares_bits.append(lsb) else: lines_bits.append(1 - lsb) # unflip shape_txt = decode_bytes_nul_terminated(bits_to_bytes_msb_trim(shape_bits)) squares_txt = decode_bytes_nul_terminated(bits_to_bytes_msb_trim(squares_bits)) lines_txt = decode_bytes_nul_terminated(bits_to_bytes_msb_trim(lines_bits)) return shape_txt, squares_txt, lines_txt # ---------- CLI ---------- def cmd_pretty(args: argparse.Namespace) -> int: bits_to_sym, _ = build_symbol_maps(args.symbols) data = read_bytes(args.input) write_text(args.output, pretty_print_combined(data, bits_to_sym, wrap=args.wrap)) return 0 def cmd_unpretty(args: argparse.Namespace) -> int: _, sym_to_bits = build_symbol_maps(args.symbols) text = read_text(args.input) write_bytes(args.output, unpretty_print_combined(text, sym_to_bits)) return 0 def cmd_mux_encode(args: argparse.Namespace) -> int: out = mux_encode_text(args.first, args.second, args.third, encoding=args.encoding) if getattr(args, "binary", False): write_bytes(args.output, out) else: bits_to_sym, _ = build_symbol_maps(args.symbols) write_text(args.output, pretty_print_combined(out, bits_to_sym, wrap=args.wrap)) return 0 def cmd_mux_decode(args: argparse.Namespace) -> int: if getattr(args, "binary", False): combined = read_bytes(args.input) else: _, sym_to_bits = build_symbol_maps(args.symbols) text = read_text(args.input) combined = unpretty_print_combined(text, sym_to_bits) a, b, c = mux_decode_text(combined) sys.stdout.write(a + "\n") sys.stdout.write(b + "\n") sys.stdout.write(c + "\n") return 0 def main() -> int: p = argparse.ArgumentParser( description="Binary mux for 3 UTF-8/ASCII messages (NUL-terminated, 0-padded). " "Pretty/unpretty uses a configurable 4-symbol alphabet." ) # ✅ global: applies to both pretty + unpretty p.add_argument( "--symbols", default="■□│┃", help="4 symbols for 00,01,10,11 (e.g. --symbols ACGT). Default: ■□│┃", ) # is this a dna code? p.add_argument( "--dna", dest="symbols", action="store_const", const="ACGT", help="Shortcut for --symbols ACGT", ) sub = p.add_subparsers(dest="cmd", required=True) pr = sub.add_parser("pretty", help="combined.bin -> symbols") pr.add_argument("input", nargs="?", default="-") pr.add_argument("-o", "--output", default="-") pr.add_argument("--wrap", type=int, default=0) pr.set_defaults(func=cmd_pretty) up = sub.add_parser("unpretty", help="symbols -> combined.bin") up.add_argument("input", nargs="?", default="-") up.add_argument("-o", "--output", default="-") up.set_defaults(func=cmd_unpretty) me = sub.add_parser("encode", help="3 messages -> combined.bin") me.add_argument("--first", required=True) me.add_argument("--second", required=True) me.add_argument("--third", required=True) me.add_argument("--encoding", default="utf-8") me.add_argument("-o", "--output", default="-") me.add_argument("--binary", action="store_true", help="Output binary instead of symbols") me.add_argument("--wrap", type=int, default=0, help="Wrap width for --pretty output (0 = no wrap)") me.set_defaults(func=cmd_mux_encode) md = sub.add_parser("decode", help="combined.txt -> prints 3 messages") md.add_argument("input", nargs="?", default="-") md.add_argument("--binary", action="store_true", help="Treat input as binary symbols") md.set_defaults(func=cmd_mux_decode) args = p.parse_args() return args.func(args) if __name__ == "__main__": raise SystemExit(main())