#!/usr/bin/env python3 """ register-transcoder — Valsts Kase process register (.xlsx/.xlsm) -> BPMN skeleton. Part of the vk-gramatvediba UAPF workspace. Reads a published Valsts Kase "Grāmatvedības uzskeites procesu apraksts" function-group register and emits, for any sub-process in it, a BPMN process skeleton: one task per register step, swimlanes from the RACI columns, and sequence flows reconstructed from the register's own predecessor / successor step references. The output is a *skeleton*, not an executable package. It is the deterministic first pass of the transcription pipeline; turning a skeleton into a Level 4 executable (explicit gateways, DMN decision extraction, resource mappings, package manifest) is the human/AI-assisted refinement step — see the curated FG3-1, FG3-4 and FG3-5 packages and docs/methodology.md. Usage: transcode.py list transcode.py emit [-o ] Examples: transcode.py list fg3_process.xlsm transcode.py emit fg3_process.xlsm 3.5.2 -o 3.5.2.skeleton.bpmn Dependencies: openpyxl. """ import sys import re from xml.sax.saxutils import escape try: import openpyxl except ImportError: sys.exit("error: openpyxl is required (pip install openpyxl)") BPMN_NS = "http://www.omg.org/spec/BPMN/20100524/MODEL" # RACI actor columns, in register column order, mapped to BPMN lane ids/names. ACTORS = [ ("nodarbinatais", "Lane_Nodarbinatais", "Nodarbinātais"), ("iestade", "Lane_Iestade", "Iestāde"), ("vpc", "Lane_VPC", "VPC (Vienotais pakalpojumu centrs)"), ] # Header cell texts used to locate columns (substring match, case-insensitive). H_PRED = "no procesa darbības soļa" H_NR = "nr.p.k" H_NAME = "process, apakšprocess" H_RACI = "atbildības sadalījums" H_DESC = "darbību apraksts" H_SYSTEM = "izmantotā is" H_DEADLINE = "izpildes termiņš" H_OUTPUTS = "sagatavotie dati" H_SUCC = "uz procesa darbības soli" def norm_nr(s): """Normalise a step number for matching: trim, drop trailing dots.""" return (s or "").strip().strip(".").strip() def san(s): """Sanitise a string into a BPMN NCName fragment.""" out = re.sub(r"[^A-Za-z0-9]+", "_", (s or "").strip()).strip("_") return out or "x" def cell(ws, r, c): if c is None: return "" v = ws.cell(row=r, column=c).value return "" if v is None else str(v).strip() def find_sheet_and_header(wb): """Locate the function-group worksheet and its header row.""" for ws in wb.worksheets: for r in range(1, 12): for c in range(1, 20): v = ws.cell(row=r, column=c).value if v and H_NR in str(v).lower(): return ws, r sys.exit("error: could not find a register sheet (no 'Nr.p.k.' header)") def map_columns(ws, hrow): """Map logical fields to column indices using the header row.""" cols = {} for c in range(1, ws.max_column + 1): t = (ws.cell(row=hrow, column=c).value or "") t = str(t).lower().strip() if not t: continue if H_PRED in t: cols["pred_fg"] = c # predecessor FG-group column cols["pred_nr"] = c + 1 # predecessor step-number sub-column elif H_NR in t: cols["nr"] = c elif H_NAME in t: cols["name"] = c elif H_RACI in t: cols["raci"] = c # RACI block spans raci, +1, +2 elif H_DESC in t: cols["desc"] = c elif H_SYSTEM in t: cols["system"] = c elif H_DEADLINE in t: cols["deadline"] = c elif H_OUTPUTS in t: cols["outputs"] = c elif H_SUCC in t: cols["succ_fg"] = c # successor FG-group column cols["succ_nr"] = c + 1 # successor step-number sub-column for req in ("nr", "name", "raci"): if req not in cols: sys.exit(f"error: register header is missing the '{req}' column") return cols def parse_refs(fg_cell, nr_cell): """Parse a predecessor/successor cell pair into [(fg, nr_key), ...].""" fgs = [x.strip() for x in str(fg_cell).splitlines() if x.strip()] nrs = [x.strip() for x in str(nr_cell).splitlines() if x.strip()] if not nrs: return [] if len(fgs) == 1 and len(nrs) > 1: fgs = fgs * len(nrs) refs = [] for i, nr in enumerate(nrs): fg = fgs[i] if i < len(fgs) else (fgs[0] if fgs else "") key = norm_nr(nr) if key: refs.append((fg.upper(), key)) return refs def parse_register(path): """Return (steps, subprocesses). Each step is a dict; subprocesses maps a sub-process key -> its register name.""" wb = openpyxl.load_workbook(path, data_only=True) ws, hrow = find_sheet_and_header(wb) cols = map_columns(ws, hrow) own_fg = re.sub(r"[^A-Za-z0-9]", "", ws.title).upper() # e.g. FG3 steps = [] subprocesses = {} current_sub = None for r in range(hrow + 2, ws.max_row + 1): nr = cell(ws, r, cols["nr"]) name = cell(ws, r, cols["name"]) if not nr or not name: continue raci = [cell(ws, r, cols["raci"] + i) for i in range(3)] desc = cell(ws, r, cols.get("desc")) is_step = bool(desc) or any(raci) if not is_step: # section / sub-process header row current_sub = norm_nr(nr) subprocesses[current_sub] = name continue steps.append({ "nr": nr, "key": norm_nr(nr), "name": name, "sub": current_sub, "raci": raci, "desc": desc, "system": cell(ws, r, cols.get("system")), "deadline": cell(ws, r, cols.get("deadline")), "outputs": cell(ws, r, cols.get("outputs")), "pred": parse_refs(cell(ws, r, cols.get("pred_fg")), cell(ws, r, cols.get("pred_nr"))), "succ": parse_refs(cell(ws, r, cols.get("succ_fg")), cell(ws, r, cols.get("succ_nr"))), "own_fg": own_fg, }) return steps, subprocesses def primary_lane(raci): """Pick the swimlane for a step: the actor that is Responsible ('R').""" for i, v in enumerate(raci): if "R" in v.upper(): return ACTORS[i] for i, v in enumerate(raci): if "A" in v.upper(): return ACTORS[i] for i, v in enumerate(raci): if v: return ACTORS[i] return ACTORS[2] # default: VPC def build_flows(group): """Reconstruct in-group sequence flows from predecessor/successor links. Returns a set of (src_key, dst_key).""" keys = {s["key"] for s in group} edges = set() for s in group: for fg, nr in s["pred"]: if nr in keys and nr != s["key"]: edges.add((nr, s["key"])) for fg, nr in s["succ"]: if nr in keys and nr != s["key"]: edges.add((s["key"], nr)) return edges def doc_text(s): """Assemble the body for a step's task.""" parts = [] raci_bits = [f"{ACTORS[i][2].split(' ')[0]}={s['raci'][i]}" for i in range(3) if s["raci"][i]] parts.append(f"Nr.p.k.: {s['nr']} | RACI: " + "; ".join(raci_bits)) if s["desc"]: parts.append(s["desc"]) meta = [] if s["system"]: meta.append("Sistēma: " + s["system"].replace("\n", " ")) if s["deadline"]: meta.append("Izpildes termiņš: " + s["deadline"].replace("\n", " ")) if s["outputs"]: meta.append("Sagatavotie dati: " + s["outputs"].replace("\n", " ")) if meta: parts.append(" | ".join(meta)) ext_p = [f"{fg}/{nr}" for fg, nr in s["pred"] if nr not in s["_groupkeys"]] ext_s = [f"{fg}/{nr}" for fg, nr in s["succ"] if nr not in s["_groupkeys"]] if ext_p: parts.append("Ārējais priekštecis: " + ", ".join(ext_p)) if ext_s: parts.append("Ārējais pēctecis: " + ", ".join(ext_s)) return "\n".join(parts) def emit_bpmn(steps, subprocesses, sub): group = [s for s in steps if s["sub"] == sub] if not group: avail = ", ".join(sorted(subprocesses)) or "(none)" sys.exit(f"error: no steps for sub-process '{sub}'. Available: {avail}") gkeys = {s["key"] for s in group} for s in group: s["_groupkeys"] = gkeys edges = build_flows(group) indeg = {s["key"]: 0 for s in group} outdeg = {s["key"]: 0 for s in group} for a, b in edges: outdeg[a] += 1 indeg[b] += 1 entries = [s for s in group if indeg[s["key"]] == 0] or [group[0]] exits = [s for s in group if outdeg[s["key"]] == 0] or [group[-1]] tid = {s["key"]: "Task_" + san(s["nr"]) for s in group} lanes_used = {} for s in group: lane = primary_lane(s["raci"]) s["_lane"] = lane[1] lanes_used.setdefault(lane[1], (lane[1], lane[2])) name = subprocesses.get(sub, sub) proc_id = "Process_" + san(sub) L = [] L.append('') L.append('' % (BPMN_NS, san(sub))) L.append(' ' % (proc_id, escape(name))) # --- lanes --- node_lane = {} for s in group: node_lane[tid[s["key"]]] = s["_lane"] start_ids = ["Start_%d" % (i + 1) for i in range(len(entries))] end_ids = ["End_%d" % (i + 1) for i in range(len(exits))] L.append(' ' % san(sub)) # start/end events go in the lane of the step they touch extra = {} for sid, st in zip(start_ids, entries): extra.setdefault(st["_lane"], []).append(sid) for eid, st in zip(end_ids, exits): extra.setdefault(st["_lane"], []).append(eid) for lid, lname in lanes_used.values(): L.append(' ' % (lid, escape(lname))) for s in group: if s["_lane"] == lid: L.append(' %s' % tid[s["key"]]) for nid in extra.get(lid, []): L.append(' %s' % nid) L.append(' ') L.append(' ') # --- collect flows: start->entry, edges, exit->end --- flows = [] fc = 0 incoming = {} outgoing = {} def add_flow(src, dst): nonlocal fc fc += 1 fid = "Flow_%d" % fc flows.append((fid, src, dst)) outgoing.setdefault(src, []).append(fid) incoming.setdefault(dst, []).append(fid) return fid for sid, st in zip(start_ids, entries): add_flow(sid, tid[st["key"]]) for a, b in sorted(edges): add_flow(tid[a], tid[b]) for eid, st in zip(end_ids, exits): add_flow(tid[st["key"]], eid) # --- events + tasks --- for sid, st in zip(start_ids, entries): L.append(' ' % (sid, escape(st["nr"]))) for f in outgoing.get(sid, []): L.append(' %s' % f) L.append(' ') for s in group: t = tid[s["key"]] L.append(' ' % (t, escape(s["name"].replace("\n", " ")))) L.append(' %s' % escape(doc_text(s))) for f in incoming.get(t, []): L.append(' %s' % f) for f in outgoing.get(t, []): L.append(' %s' % f) L.append(' ') for eid, st in zip(end_ids, exits): L.append(' ' % (eid, escape(st["nr"]))) for f in incoming.get(eid, []): L.append(' %s' % f) L.append(' ') for fid, src, dst in flows: L.append(' ' % (fid, src, dst)) L.append(' ') L.append('') return "\n".join(L) + "\n" def cmd_list(path): steps, subs = parse_register(path) counts = {} for s in steps: counts[s["sub"]] = counts.get(s["sub"], 0) + 1 print(f"register: {path}") print(f"{len(steps)} steps in {len(counts)} sub-process(es) with steps:\n") for sub in sorted(counts): print(f" {sub:<10} {counts[sub]:>3} step(s) {subs.get(sub, '')}") print("\nemit a sub-process: transcode.py emit ") def cmd_emit(path, sub, out): steps, subs = parse_register(path) xml = emit_bpmn(steps, subs, sub) if out: with open(out, "w", encoding="utf-8") as fh: fh.write(xml) n = len([s for s in steps if s["sub"] == sub]) print(f"wrote {out} ({n} step(s), sub-process {sub} — {subs.get(sub,'')})") else: sys.stdout.write(xml) def main(argv): if len(argv) < 3 or argv[1] not in ("list", "emit"): sys.exit(__doc__.strip()) if argv[1] == "list": cmd_list(argv[2]) else: if len(argv) < 4: sys.exit("usage: transcode.py emit [-o out]") out = None if "-o" in argv: out = argv[argv.index("-o") + 1] cmd_emit(argv[2], argv[3], out) if __name__ == "__main__": main(sys.argv)