1
0
Files
vk-gramatvediba/tools/register-transcoder/transcode.py
Rihards Gailums 66ce42ea37 Spec-conformance fix: correct stub levels and add BPMN-DI
Three corrections grounded in the UAPF SSOT specification (UAPFormat/
UAPF-specification, specification/01-concepts.md, 04-folder-structure.md,
05-level-composition.md, 10-conformance-checklist.md), which had not been
read in full before the initial workspace build.

1. Level relabel. The FG3 sub-process stubs fg3-2, fg3-3 and fg3-6 had
   been marked level: 4 by template inheritance from fg3-1 at Step 4 of
   the build, despite carrying no BPMN and no resources. Per the spec
   conformance checklist this fails the L4 requirement. The three are
   composition placeholders, which the spec models as L3 (composed
   subprocess / variant). Their uapf.yaml is now level: 3 with
   cornerstones.bpmn: false — conformant: L1-L3 packages MUST NOT
   duplicate L4 content. The three real executables fg3-1, fg3-4 and
   fg3-5 remain L4.

2. BPMN Diagram Interchange. All five .bpmn files in the workspace now
   carry a bpmndi:BPMNDiagram with BPMNShape and BPMNEdge elements
   produced by a swim-lane left-to-right auto-layout, so the diagrams
   preview in bpmn.io, Camunda Modeler and ProcessGit's web view. The
   spec doesn't require DI (its own examples have none) but practical
   reviewability does.

3. Transcoder. tools/register-transcoder gains bpmn_di.py — also runnable
   standalone for retrofitting existing BPMN files. transcode.py now
   imports it and emits DI by default for newly generated skeletons.
   sample-output/3.5.2.skeleton.bpmn and 3.5.3.skeleton.bpmn regenerated
   with DI; the logical-model content is byte-identical to the previous
   commit, only DI is added.

docs/methodology.md updated: adds an explicit Workspace-structure section
grounding L0-L4 in the SSOT spec, a Conformance-correction section
documenting the Step-4 mislabel and its fix, and drops the now-untrue
'no DI' line from limitations.

Validation after the change, full L1-L4 sweep: uapf-cli validate green on
all 10 packages (domains/gramatvediba, fg1-fg6, fg3, fg3-1..fg3-6);
xmllint clean on all 8 .bpmn/.dmn; every .bpmn has BPMNDiagram present.
2026-05-20 06:44:14 +00:00

397 lines
14 KiB
Python

#!/usr/bin/env python3
"""
register-transcoder — Valsts Kase process register (.xlsx/.xlsm) -> BPMN skeleton.
Part of the vk-gramatvediba UAPF workspace. Reads a published Valsts Kase
"Grāmatvedības uzskeites procesu apraksts" function-group register and emits,
for any sub-process in it, a BPMN process skeleton: one task per register
step, swimlanes from the RACI columns, and sequence flows reconstructed from
the register's own predecessor / successor step references.
The output is a *skeleton*, not an executable package. It is the deterministic
first pass of the transcription pipeline; turning a skeleton into a Level 4
executable (explicit gateways, DMN decision extraction, resource mappings,
package manifest) is the human/AI-assisted refinement step — see the curated
FG3-1, FG3-4 and FG3-5 packages and docs/methodology.md.
Usage:
transcode.py list <register.xlsx>
transcode.py emit <register.xlsx> <subprocess> [-o <output.bpmn>]
Examples:
transcode.py list fg3_process.xlsm
transcode.py emit fg3_process.xlsm 3.5.2 -o 3.5.2.skeleton.bpmn
Dependencies: openpyxl.
"""
import sys
import os
import re
from xml.sax.saxutils import escape
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
import openpyxl
except ImportError:
sys.exit("error: openpyxl is required (pip install openpyxl)")
import bpmn_di
BPMN_NS = "http://www.omg.org/spec/BPMN/20100524/MODEL"
# RACI actor columns, in register column order, mapped to BPMN lane ids/names.
ACTORS = [
("nodarbinatais", "Lane_Nodarbinatais", "Nodarbinātais"),
("iestade", "Lane_Iestade", "Iestāde"),
("vpc", "Lane_VPC", "VPC (Vienotais pakalpojumu centrs)"),
]
# Header cell texts used to locate columns (substring match, case-insensitive).
H_PRED = "no procesa darbības soļa"
H_NR = "nr.p.k"
H_NAME = "process, apakšprocess"
H_RACI = "atbildības sadalījums"
H_DESC = "darbību apraksts"
H_SYSTEM = "izmantotā is"
H_DEADLINE = "izpildes termiņš"
H_OUTPUTS = "sagatavotie dati"
H_SUCC = "uz procesa darbības soli"
def norm_nr(s):
"""Normalise a step number for matching: trim, drop trailing dots."""
return (s or "").strip().strip(".").strip()
def san(s):
"""Sanitise a string into a BPMN NCName fragment."""
out = re.sub(r"[^A-Za-z0-9]+", "_", (s or "").strip()).strip("_")
return out or "x"
def cell(ws, r, c):
if c is None:
return ""
v = ws.cell(row=r, column=c).value
return "" if v is None else str(v).strip()
def find_sheet_and_header(wb):
"""Locate the function-group worksheet and its header row."""
for ws in wb.worksheets:
for r in range(1, 12):
for c in range(1, 20):
v = ws.cell(row=r, column=c).value
if v and H_NR in str(v).lower():
return ws, r
sys.exit("error: could not find a register sheet (no 'Nr.p.k.' header)")
def map_columns(ws, hrow):
"""Map logical fields to column indices using the header row."""
cols = {}
for c in range(1, ws.max_column + 1):
t = (ws.cell(row=hrow, column=c).value or "")
t = str(t).lower().strip()
if not t:
continue
if H_PRED in t:
cols["pred_fg"] = c # predecessor FG-group column
cols["pred_nr"] = c + 1 # predecessor step-number sub-column
elif H_NR in t:
cols["nr"] = c
elif H_NAME in t:
cols["name"] = c
elif H_RACI in t:
cols["raci"] = c # RACI block spans raci, +1, +2
elif H_DESC in t:
cols["desc"] = c
elif H_SYSTEM in t:
cols["system"] = c
elif H_DEADLINE in t:
cols["deadline"] = c
elif H_OUTPUTS in t:
cols["outputs"] = c
elif H_SUCC in t:
cols["succ_fg"] = c # successor FG-group column
cols["succ_nr"] = c + 1 # successor step-number sub-column
for req in ("nr", "name", "raci"):
if req not in cols:
sys.exit(f"error: register header is missing the '{req}' column")
return cols
def parse_refs(fg_cell, nr_cell):
"""Parse a predecessor/successor cell pair into [(fg, nr_key), ...]."""
fgs = [x.strip() for x in str(fg_cell).splitlines() if x.strip()]
nrs = [x.strip() for x in str(nr_cell).splitlines() if x.strip()]
if not nrs:
return []
if len(fgs) == 1 and len(nrs) > 1:
fgs = fgs * len(nrs)
refs = []
for i, nr in enumerate(nrs):
fg = fgs[i] if i < len(fgs) else (fgs[0] if fgs else "")
key = norm_nr(nr)
if key:
refs.append((fg.upper(), key))
return refs
def parse_register(path):
"""Return (steps, subprocesses). Each step is a dict; subprocesses maps
a sub-process key -> its register name."""
wb = openpyxl.load_workbook(path, data_only=True)
ws, hrow = find_sheet_and_header(wb)
cols = map_columns(ws, hrow)
own_fg = re.sub(r"[^A-Za-z0-9]", "", ws.title).upper() # e.g. FG3
steps = []
subprocesses = {}
current_sub = None
for r in range(hrow + 2, ws.max_row + 1):
nr = cell(ws, r, cols["nr"])
name = cell(ws, r, cols["name"])
if not nr or not name:
continue
raci = [cell(ws, r, cols["raci"] + i) for i in range(3)]
desc = cell(ws, r, cols.get("desc"))
is_step = bool(desc) or any(raci)
if not is_step:
# section / sub-process header row
current_sub = norm_nr(nr)
subprocesses[current_sub] = name
continue
steps.append({
"nr": nr, "key": norm_nr(nr), "name": name,
"sub": current_sub, "raci": raci, "desc": desc,
"system": cell(ws, r, cols.get("system")),
"deadline": cell(ws, r, cols.get("deadline")),
"outputs": cell(ws, r, cols.get("outputs")),
"pred": parse_refs(cell(ws, r, cols.get("pred_fg")),
cell(ws, r, cols.get("pred_nr"))),
"succ": parse_refs(cell(ws, r, cols.get("succ_fg")),
cell(ws, r, cols.get("succ_nr"))),
"own_fg": own_fg,
})
return steps, subprocesses
def primary_lane(raci):
"""Pick the swimlane for a step: the actor that is Responsible ('R')."""
for i, v in enumerate(raci):
if "R" in v.upper():
return ACTORS[i]
for i, v in enumerate(raci):
if "A" in v.upper():
return ACTORS[i]
for i, v in enumerate(raci):
if v:
return ACTORS[i]
return ACTORS[2] # default: VPC
def build_flows(group):
"""Reconstruct in-group sequence flows from predecessor/successor links.
Returns a set of (src_key, dst_key)."""
keys = {s["key"] for s in group}
edges = set()
for s in group:
for fg, nr in s["pred"]:
if nr in keys and nr != s["key"]:
edges.add((nr, s["key"]))
for fg, nr in s["succ"]:
if nr in keys and nr != s["key"]:
edges.add((s["key"], nr))
return edges
def doc_text(s):
"""Assemble the <documentation> body for a step's task."""
parts = []
raci_bits = [f"{ACTORS[i][2].split(' ')[0]}={s['raci'][i]}"
for i in range(3) if s["raci"][i]]
parts.append(f"Nr.p.k.: {s['nr']} | RACI: " + "; ".join(raci_bits))
if s["desc"]:
parts.append(s["desc"])
meta = []
if s["system"]:
meta.append("Sistēma: " + s["system"].replace("\n", " "))
if s["deadline"]:
meta.append("Izpildes termiņš: " + s["deadline"].replace("\n", " "))
if s["outputs"]:
meta.append("Sagatavotie dati: " + s["outputs"].replace("\n", " "))
if meta:
parts.append(" | ".join(meta))
ext_p = [f"{fg}/{nr}" for fg, nr in s["pred"]
if nr not in s["_groupkeys"]]
ext_s = [f"{fg}/{nr}" for fg, nr in s["succ"]
if nr not in s["_groupkeys"]]
if ext_p:
parts.append("Ārējais priekštecis: " + ", ".join(ext_p))
if ext_s:
parts.append("Ārējais pēctecis: " + ", ".join(ext_s))
return "\n".join(parts)
def emit_bpmn(steps, subprocesses, sub):
group = [s for s in steps if s["sub"] == sub]
if not group:
avail = ", ".join(sorted(subprocesses)) or "(none)"
sys.exit(f"error: no steps for sub-process '{sub}'. Available: {avail}")
gkeys = {s["key"] for s in group}
for s in group:
s["_groupkeys"] = gkeys
edges = build_flows(group)
indeg = {s["key"]: 0 for s in group}
outdeg = {s["key"]: 0 for s in group}
for a, b in edges:
outdeg[a] += 1
indeg[b] += 1
entries = [s for s in group if indeg[s["key"]] == 0] or [group[0]]
exits = [s for s in group if outdeg[s["key"]] == 0] or [group[-1]]
tid = {s["key"]: "Task_" + san(s["nr"]) for s in group}
lanes_used = {}
for s in group:
lane = primary_lane(s["raci"])
s["_lane"] = lane[1]
lanes_used.setdefault(lane[1], (lane[1], lane[2]))
name = subprocesses.get(sub, sub)
proc_id = "Process_" + san(sub)
L = []
L.append('<?xml version="1.0" encoding="UTF-8"?>')
L.append('<bpmn:definitions '
'xmlns:bpmn="%s" id="Defs_%s" '
'targetNamespace="https://uapf.dev/vk-gramatvediba/transcoded">'
% (BPMN_NS, san(sub)))
L.append(' <bpmn:process id="%s" name="%s" isExecutable="false">'
% (proc_id, escape(name)))
# --- lanes ---
node_lane = {}
for s in group:
node_lane[tid[s["key"]]] = s["_lane"]
start_ids = ["Start_%d" % (i + 1) for i in range(len(entries))]
end_ids = ["End_%d" % (i + 1) for i in range(len(exits))]
L.append(' <bpmn:laneSet id="LaneSet_%s">' % san(sub))
# start/end events go in the lane of the step they touch
extra = {}
for sid, st in zip(start_ids, entries):
extra.setdefault(st["_lane"], []).append(sid)
for eid, st in zip(end_ids, exits):
extra.setdefault(st["_lane"], []).append(eid)
for lid, lname in lanes_used.values():
L.append(' <bpmn:lane id="%s" name="%s">' % (lid, escape(lname)))
for s in group:
if s["_lane"] == lid:
L.append(' <bpmn:flowNodeRef>%s</bpmn:flowNodeRef>'
% tid[s["key"]])
for nid in extra.get(lid, []):
L.append(' <bpmn:flowNodeRef>%s</bpmn:flowNodeRef>' % nid)
L.append(' </bpmn:lane>')
L.append(' </bpmn:laneSet>')
# --- collect flows: start->entry, edges, exit->end ---
flows = []
fc = 0
incoming = {}
outgoing = {}
def add_flow(src, dst):
nonlocal fc
fc += 1
fid = "Flow_%d" % fc
flows.append((fid, src, dst))
outgoing.setdefault(src, []).append(fid)
incoming.setdefault(dst, []).append(fid)
return fid
for sid, st in zip(start_ids, entries):
add_flow(sid, tid[st["key"]])
for a, b in sorted(edges):
add_flow(tid[a], tid[b])
for eid, st in zip(end_ids, exits):
add_flow(tid[st["key"]], eid)
# --- events + tasks ---
for sid, st in zip(start_ids, entries):
L.append(' <bpmn:startEvent id="%s" name="Ieeja: %s">'
% (sid, escape(st["nr"])))
for f in outgoing.get(sid, []):
L.append(' <bpmn:outgoing>%s</bpmn:outgoing>' % f)
L.append(' </bpmn:startEvent>')
for s in group:
t = tid[s["key"]]
L.append(' <bpmn:userTask id="%s" name="%s">'
% (t, escape(s["name"].replace("\n", " "))))
L.append(' <bpmn:documentation>%s</bpmn:documentation>'
% escape(doc_text(s)))
for f in incoming.get(t, []):
L.append(' <bpmn:incoming>%s</bpmn:incoming>' % f)
for f in outgoing.get(t, []):
L.append(' <bpmn:outgoing>%s</bpmn:outgoing>' % f)
L.append(' </bpmn:userTask>')
for eid, st in zip(end_ids, exits):
L.append(' <bpmn:endEvent id="%s" name="Izeja: %s">'
% (eid, escape(st["nr"])))
for f in incoming.get(eid, []):
L.append(' <bpmn:incoming>%s</bpmn:incoming>' % f)
L.append(' </bpmn:endEvent>')
for fid, src, dst in flows:
L.append(' <bpmn:sequenceFlow id="%s" sourceRef="%s" '
'targetRef="%s"/>' % (fid, src, dst))
L.append(' </bpmn:process>')
L.append('</bpmn:definitions>')
return "\n".join(L) + "\n"
def cmd_list(path):
steps, subs = parse_register(path)
counts = {}
for s in steps:
counts[s["sub"]] = counts.get(s["sub"], 0) + 1
print(f"register: {path}")
print(f"{len(steps)} steps in {len(counts)} sub-process(es) with steps:\n")
for sub in sorted(counts):
print(f" {sub:<10} {counts[sub]:>3} step(s) {subs.get(sub, '')}")
print("\nemit a sub-process: transcode.py emit <register> <subprocess>")
def cmd_emit(path, sub, out):
steps, subs = parse_register(path)
xml = emit_bpmn(steps, subs, sub)
xml, _ = bpmn_di.annotate_text(xml)
if out:
with open(out, "w", encoding="utf-8") as fh:
fh.write(xml)
n = len([s for s in steps if s["sub"] == sub])
print(f"wrote {out} ({n} step(s), sub-process {sub}{subs.get(sub,'')})")
else:
sys.stdout.write(xml)
def main(argv):
if len(argv) < 3 or argv[1] not in ("list", "emit"):
sys.exit(__doc__.strip())
if argv[1] == "list":
cmd_list(argv[2])
else:
if len(argv) < 4:
sys.exit("usage: transcode.py emit <register> <subprocess> [-o out]")
out = None
if "-o" in argv:
out = argv[argv.index("-o") + 1]
cmd_emit(argv[2], argv[3], out)
if __name__ == "__main__":
main(sys.argv)