MOS — Management Operating System
MOS es la capa transversal de auditoría y automatización por eventos de
Gestión Civis. Vive en app/core/mos/ y ofrece tres capacidades:
- Auditoría e historial: registrar de forma trazable qué pasó, quién lo hizo
y con qué datos (
emit_event). - Reglas y acciones: ejecutar automatismos cuando se cumplen condiciones sobre un evento.
- Tareas programadas (crons): ejecutar acciones de forma periódica.
flowchart LR
H["Handler de módulo"] -->|emit_event| EV["events.py"]
EV --> AL[("core_audit_log")]
EV --> AP[("core_audit_payload")]
EV --> HE[("core_history_event")]
EV --> HP[("core_history_payload")]
CRON["core_cron_job"] --> SCH["scheduler.py"]
SCH --> Q["core_event_queue"]
Q --> REG["registry.py<br/>@register_action"]
REG --> ACT["Acción del módulo"]
Contexto de petición
El hook before_request llama a init_mos_context() (app/core/mos/context.py),
que fija en g:
g.request_id— elX-Request-Identrante o un UUID nuevo.g.user_id— desdeg.current_usersi existe.
Este contexto enriquece automáticamente la metadata de los eventos.
Auditoría: emit_event()
app/core/mos/events.py. Firma (argumentos por nombre):
def emit_event(
*,
entidad_id: int,
actor_user_id: int | None,
module: str,
resource_type: str,
resource_id: str | int,
action: str,
before: dict | None = None,
after: dict | None = None,
event_code: str | None = None,
message: str | None = None,
severity: str = "INFO",
meta: dict | None = None,
request_id: str | None = None,
actor_label: str | None = None,
):
...
Lo que hace:
- Crea un registro técnico en
core_audit_log(module, resource, action, actor, request_id). - Si hay
before/after/meta, guarda los snapshots encore_audit_payload(JSONB). - Si se aporta
event_code, crea la narrativa funcional encore_history_event(mensaje, motivo, aprobado por, severidad, vector de búsqueda de texto completo). - Si hay snapshots, calcula el
diffencore_history_payload. - Enriquece
metacon contexto HTTP (IP, endpoint, método, user-agent, request_id).
Auditoría ≠ automatización
emit_event solo registra. La automatización (disparo de reglas) la
gestiona dispatch_event().
Helper de auditoría por módulo
Patrón recomendado: cada módulo expone emit_<modulo>_event(...) que envuelve a
emit_event, construye la metadata funcional del recurso, añade el motivo
si lo hay y hace commit autónomo para que ningún evento se pierda.
# patrón general
emit_<modulo>_event(
current_user=...,
entidad_id=...,
<recurso>=...,
action="CREATE|UPDATE|DELETE|...",
event_code="MODULO_ACCION",
message="Descripción legible",
before=snapshot_antes, # snapshot_estado()
after=snapshot_despues, # snapshot_completo() en altas
motivo="si aplica",
severity="INFO|WARNING|ERROR",
)
Implementado en módulos como Registro (emit_registro_event) y Padrón
(emit_padron_event), con snapshots que omiten datos sensibles.
Regla del proyecto
Todo endpoint nuevo con una acción importante debe emitir un evento para
trazabilidad, preferiblemente a través del helper emit_<modulo>_event del
módulo (metadata funcional + before/after + motivo separado + severidad
razonable).
Reglas, condiciones y acciones
Modelos en app/core/mos/models.py:
| Modelo | Tabla | Propósito |
|---|---|---|
CoreEventCatalog |
core_event_catalog |
Catálogo maestro de eventos (code, module, activo). |
CoreEventRule |
core_event_rule |
Regla que vincula un evento con acciones (prioridad, activo). |
CoreEventCondition |
core_event_condition |
Condición (campo, operador, valor). |
CoreEventAction |
core_event_action |
Acción a ejecutar (accion, modo, parametros). |
CoreEventActionType |
core_event_action_type |
Catálogo de tipos de acción reutilizables. |
CoreEventQueue |
core_event_queue |
Cola de ejecución (async/scheduled) con reintentos. |
Operadores de condición soportados: =, !=, contains, in, >, <, >=,
<=.
Registro de acciones
app/core/mos/registry.py. Un módulo expone una acción con el decorador
@register_action(code):
@register_action("recaudacion.contabilizar_cobro")
def contabilizar_cobro(payload, params, entidad_id):
...
La acción se invoca con (payload, params, entidad_id) cuando la cola la procesa.
Tareas programadas (crons)
| Modelo | Tabla | Propósito |
|---|---|---|
CoreCronJob |
core_cron_job |
Tarea periódica (code, action, cron_expr, params, next_run). |
scheduler.py:
process_cron_jobs()detectaCoreCronJobconnext_run <= now, encola unCoreEventQueuey recalculanext_runconcroniter.process_event_queue()procesa los pendientes, ejecuta la acción registrada y reintenta con backoff (hasta 5 intentos).
Crons de fábrica
Los siguientes cron jobs se siembran automáticamente en cada entidad desde
app/core/mos/seed_cron_jobs.py (CRON_JOBS_DEFAULT). Son idempotentes:
la primera vez que se ejecuta el seed se crean; en sucesivas se respeta lo que
el administrador haya cambiado desde la UI de "Tareas programadas".
code |
Acción | Periodicidad | Módulo |
|---|---|---|---|
der_ing_facturar_auto |
contabilidad.derechos_ingreso.facturar_automatico |
0 6 * * * |
Contabilidad |
sla_facturas_sin_propuesta |
contabilidad.sla_facturas_sin_propuesta |
0 8 * * * |
Contabilidad |
sla_facturas_retenidas |
contabilidad.sla_facturas_retenidas |
0 8 * * * |
Contabilidad |
workflow_verificar_sla |
workflow.verificar_sla |
*/15 * * * * |
Documental |
recaudacion_prescripcion |
recaudacion.prescripcion.ejecutar |
0 4 * * * |
Recaudación |
tesoreria_pj_aviso |
tesoreria.pj.aviso_vencimiento |
0 9 * * * |
Tesorería |
recaudacion_plan_pago_sepa |
recaudacion.plan_pago.sepa_orquestar |
0 5 * * * |
Recaudación |
padron_renovaciones_detectar |
padron_habitantes.renovaciones.detectar |
0 5 1 * * |
Padrón |
padron_inspeccion_diaria |
padron_habitantes.inspeccion.detectar |
0 4 * * * |
Padrón |
padron_snapshot_anual |
padron_habitantes.snapshot.generar_anual |
0 3 2 1 * |
Padrón |
padron_ida_mensual |
padron_habitantes.ida.generar_mensual |
0 6 5 * * |
Padrón |
padron_cert_sello_caducidad |
padron_habitantes.cert_sello.vigilar_caducidad |
0 7 * * * |
Padrón |
notificaciones_vigilar_vencimientos |
notificaciones.vigilar_vencimientos |
0 5 * * * |
Notificaciones |
Crons registrados como acciones pero no sembrados de fábrica (se activan cuando el módulo entra en producción real):
| Acción | Periodicidad sugerida | Módulo |
|---|---|---|
registro.sir.enviar_pendientes |
*/5 * * * * |
Registro (SIR salida) |
registro.sir.descargar_pendientes |
*/10 * * * * |
Registro (SIR entrada) |
Arranque del scheduler
Para que los crons de la tabla anterior se ejecuten, el sistema necesita un
proceso permanente que invoque process_cron_jobs() y process_event_queue()
de forma continua. Hay dos comandos en la plataforma y conviene no
confundirlos:
| Comando | Para qué sirve | Modo de invocación |
|---|---|---|
python run_scheduler.py |
Scheduler MOS real: procesa CoreCronJob y CoreEventQueue en un bucle infinito. Es el que ejecuta los 14 crons de fábrica. |
Worker permanente (systemd / Servicio Windows / Cloud Run). |
flask civis-system-run |
Tareas de mantenimiento del propio MOS: particiones mensuales de core_audit_log y core_history_event (cobertura 12 meses adelante) y política de retención (audit 5 años, history 3 años). |
Programación periódica (cron del SO / Programador de tareas), basta una vez al día. |
Es obligatorio arrancar run_scheduler.py como servicio
Sin ese proceso permanente vivo, los 14 cron jobs de fábrica no se
ejecutan: las renovaciones de Padrón no se detectan, las prescripciones de
Recaudación no se aplican, los SLA de facturas no avisan, las
notificaciones DEHú vencidas no se marcan como rechazadas, etc.
flask civis-system-run por sí solo no es suficiente — solo se ocupa del
mantenimiento de las tablas particionadas.
Una sola instancia
El scheduler usa SELECT ... FOR UPDATE SKIP LOCKED en la cola y un
advisory lock PostgreSQL en las tareas de mantenimiento, por lo que
arrancar varios run_scheduler.py contra la misma BD es seguro (uno toma
cada job). Aun así, para el despliegue típico de un ayuntamiento se
recomienda un único worker por simplicidad operativa.
Detalles concretos de cómo dejar esto arrancado en cada plataforma:
- Despliegue en servidor Linux
→ unidad
systemdque mantenga vivorun_scheduler.py+crontabparacivis-system-rundiario. - Despliegue en servidor Windows
→ servicio Windows con
nssmpararun_scheduler.py+ tarea programada paracivis-system-run. - Despliegue en Google Cloud
→ Cloud Run service (mínimo 1 instancia, sin escalado a cero) para el
scheduler + Cloud Scheduler diario para
civis-system-run.
Pausar el scheduler
Existe un kill-switch global: el valor scheduler_enabled en
core_system_setting. Si vale "false", el bucle queda en espera (dormido 5 s)
sin tomar jobs. Se cambia desde el endpoint POST /api/core/scheduler/toggle
(útil durante una migración delicada de BD).
Endpoints MOS
Blueprint mos_bp, prefijo /api/core. Selección:
| Método | Ruta | Propósito |
|---|---|---|
GET |
/api/core/audit |
Listado de auditoría técnica (cursor). |
GET |
/api/core/historial |
Historial funcional con búsqueda de texto. |
GET |
/api/core/catalog |
Catálogo de eventos. |
GET/POST |
/api/core/rules |
Reglas de automatización. |
GET/POST |
/api/core/conditions |
Condiciones de reglas. |
GET/POST |
/api/core/actions |
Acciones de reglas. |
GET/POST |
/api/core/cron-jobs |
Tareas programadas. |
POST |
/api/core/cron-jobs/<id>/run-now |
Ejecutar un cron manualmente. |
GET |
/api/core/scheduler/status |
Estado de cola y jobs. |
GET |
/api/core/admin/system-health |
Tamaño de tablas particionadas. |
Tablas particionadas
core_audit_log y core_history_event están particionadas por rango sobre
created_at para sostener el volumen de auditoría.