Saltar a contenido

RF06 Sistema de monitorización y alertas de procesos

Entendimiento

Sistema de monitorización para controlar que todos los procesos estén funcionando correctamente y alertas para avisar de posibles errores en cualquier parte del proceso.

Situación Actual

El ecosistema de monitorización carece de estrategia unificada. Dependemos de procesos manuales, revisiones visuales y entornos no productivos para detectar la pérdida de datos.

  • Detección Humana y Reactiva (PoS): No hay alerta sistémica centralizada. La integridad de los datos de tiendas depende de una revisión manual diaria de un operador de Soporte, que coteja un informe en Looker Studio comparando FAT contra Silver en BigQuery.
  • Dependencia de Entornos de Prueba (UAT): Ante la falta de herramientas en Producción, usamos UAT como repositorio de seguridad. Un proceso carga los backups de las cajas en UAT solo para consultarlos, y la recuperación se ejecuta con scripts ad-hoc heredados que extraen datos de pruebas para inyectarlos en producción.
  • Monitorización Silenciosa (eCommerce): La detección de fallos web es reactiva. Se basa en procesos cíclicos en Yoda que revisan los últimos 2-3 días. Si una cola de mensajería se detiene silenciosamente (como ocurrió recientemente), no hay alerta de "latido" que avise de la ausencia de tráfico hasta que el impacto llega a negocio.
  • Logs Técnicos Inexplotables: Los procesos generan trazas desestructuradas (texto plano, print estándar). Esto impide buscar de forma automatizada o correlacionar un error técnico (ej. "NullPointerException") con una entidad de negocio (ej. "Ticket A100"), dificultando el diagnóstico raíz.

Solución Técnica

Para garantizar la gobernanza del dato y la mantenibilidad operativa de los nuevos flujos, proponemos una estrategia de monitorización doble y complementaria. Las necesidades de Operaciones (Business Ops) difieren de las de Ingeniería (SRE), así que la solución se articula en dos capas paralelas:

  1. Observabilidad de Negocio (El "QUÉ"): Cuadro de mando centralizado para controlar la integridad de las ventas.
  2. Diagnóstico Técnico (El "POR QUÉ"): Estandarización de trazas para resolución rápida de incidencias.

Detallamos cada pilar:

A. Nivel de Negocio: Tablero de Integridad Operacional

  • Objetivo: "¿Faltan datos de alguna tienda ahora mismo?"
  • Herramienta: Looker Studio conectado a BigQuery.
  • Fuente de Verdad: Explotación directa de las Tablas de Control Unificadas en Staging. No monitorizamos logs, sino el estado del dato.
  • Alcance: Visualización de indicadores de negocio (KPIs) generados por los procesos de auditoría (transaction_integrity_controller):
    • Tickets con integridad estructural fallida (NO_ITEMS).
    • Descuadre entre Backup y Cloud (BACKUP_MISSING).
    • Fallos de entrega a sistemas finales (RDO_ERROR, PUBSUB_PENDING).
    • Alertas de "latido" (ausencia de tráfico en colas).

B. Nivel Técnico: Estandarización de Logs Estructurados

  • Objetivo: "¿Por qué falló el proceso que debía integrar ese ticket?"
  • Herramienta: Google Cloud Logging (Consola GCP).
  • Implementación: Instrumentación de todas las Google Cloud Functions (Dataflow, GCF_01, GCF_02) bajo el estándar de "Log Estructurado JSON".
  • Mecanismo: Abandonamos el logging de texto plano. Cada traza de error incluirá metadatos obligatorios para correlación inmediata con el panel de negocio:
    • business_key: ID del Ticket, Tienda o Pedido (la llave que une Looker con los Logs).
    • error_type: Categorización técnica (TIMEOUT, VALIDATION, AUTH).
    • severity: Nivel de criticidad para filtrado rápido.

Estandarización de Logs

Nuestra misión se circunscribe a la instrumentación de las Google Cloud Functions (las nuevas GCF de recuperación y el Dataflow refactorizado) bajo el estándar de "Log Estructurado JSON".

El objetivo no es logarlo todo, sino generar "pocos logs, pero buenos". Implementamos en el código la escritura de logs en formato JSON nativo compatible con Cloud Logging, garantizando que cada entrada incluya:

  1. Contexto de Negocio (business_key): Cada error o evento relevante incluirá el ID del Ticket, ID de Tienda o Código de Pedido. Cuando Negocio detecte una falta, Ingeniería podrá encontrar la traza técnica exacta buscando solo por ese ID.
  2. Trazabilidad (correlation_id): Identificador único para seguir el recorrido de un dato a través de los distintos saltos del sistema.
  3. Clasificación Automática: Uso estricto de severity (ERROR, WARN, INFO) y error_type (VALIDATION, TIMEOUT, QUOTA) para facilitar el filtrado.
  4. Estructura Homogénea:
{
  "severity": "ERROR",
  "function_name": "gcf_pos_backup_reconciler",
  "business_key": "STORE_999_TICKET_1234",
  "error_type": "DATA_CORRUPTION",
  "message": "Header present but lines missing in binary payload"
}

Entregable: Todas las funciones desarrolladas escriben sus logs siguiendo esta plantilla JSON directamente en stdout, lo que permite su ingesta automática por Cloud Logging.

Las Tablas de Control como Bitácora Universal

El sistema de monitorización no consultará logs dispersos ni realizará conteos en tiempo real sobre tablas masivas (raw o sales). Se basa exclusivamente en las Tablas de Control Unificadas alojadas en el dataset de staging de BigQuery.

La función supervisora GCF_02 (transaction_integrity_controller) actúa como auditor, escaneando periódicamente el sistema y escribiendo/actualizando el estado de cada ticket o pedido. Esto convierte la tabla de control en un registro histórico y operacional con dos visiones simultáneas:

  1. Foto Fija (Snapshot): ¿Qué está roto ahora mismo?
  2. Histórico (Time-Series): ¿Cuándo falló el sistema la semana pasada?

Estrategia de Visualización (Tablero Looker Studio)

El tablero ofrece dos perspectivas temporales sobre los mismos datos:

Vista 1: Sala de Operaciones (Current Status / Actionable)

  • Objetivo: Respuesta inmediata a incidencias.
  • Filtro: Solo tickets/pedidos con estados de error activos y no resueltos (status != OK).
  • Visualización: Tablas de detalle con ID de Tienda, ID de Ticket/Pedido y Tipo de Error.
  • Uso: El operador (Soporte) filtra por error (ej. "NO_ITEMS"), extrae los IDs y ejecuta acción correctiva.

Vista 2: Análisis de Estabilidad (Historical / Trends)

  • Objetivo: Análisis de calidad del software y detección de patrones.
  • Filtro: Evolución temporal de errores, incluyendo los ya resueltos.
  • Visualización: Gráficos de barras apiladas por día y tipo de error.
  • Valor: Permite medir la eficacia de las GCFs de autorecuperación (GCF_01 y GCF_02) observando la disminución de incidencias sin intervención humana.

Fuera de Alcance

Exclusiones explícitas

Aunque los logs se generarán con una estructura perfecta para análisis, la explotación masiva y visualización de estos logs técnicos en dashboards (Looker Studio) queda FUERA DEL ALCANCE.

La arquitectura de referencia establece que el enrutado de logs a BigQuery "no aplica de momento" y se reserva para una futura monitorización corporativa.

¿Cómo se explotaría a futuro? (Roadmap Corporativo): Para que estos logs fueran visibles en un cuadro de mando analítico, habría que activar un Log Sink (Router) en GCP:

  1. Exportación: Configurar el Log Router para volcar los logs JSON desde Cloud Logging hacia tablas particionadas en BigQuery.
  2. Consulta: Dado que el log ya es JSON estructurado, BigQuery permite consultas SQL directas sobre los campos (ej. SELECT count(*) FROM logs WHERE jsonPayload.error_type = 'TIMEOUT').
  3. Visualización: Looker Studio se conectaría a estas tablas de BigQuery para mostrar gráficas de latencia, tasas de error por versión o tiempos de ejecución.

Enfoque del Proyecto Actual: Nos limitamos al Nivel 1 de Diagnóstico:

  • El tablero de Looker (RF06) consulta las Tablas de Control (Negocio/Integridad).
  • Los Logs JSON (Técnicos) residen en Cloud Logging para consulta puntual por ingenieros cuando necesiten investigar la causa raíz de una alerta de negocio.

Arquitectura

graph TB
    subgraph "Google Cloud Functions"
        GCF1["GCF_1 (Reconciliador)"]
        GCF2["GCF_2 (Supervisor)"]
        DF["Dataflow"]
    end

    subgraph "Staging"
        TC["Tablas de Control Unificadas"]
    end

    subgraph "Observabilidad de Negocio"
        LOOKER["Looker Studio<br/>(Panel)"]
    end

    subgraph "Diagnóstico Técnico"
        CL["Cloud Logging"]
    end

    GCF1 -->|escribe estado| TC
    GCF2 -->|escribe estado| TC
    DF -->|escribe estado| TC
    TC --> LOOKER
    GCF1 -->|log JSON| CL
    GCF2 -->|log JSON| CL
    DF -->|log JSON| CL
    LOOKER -.->|"business_key"| CL

Indicadores y Casuísticas de Error (KPIs)

Clasificamos los indicadores en dos categorías: Calidad de Dato (Visible en Panel) y Salud de Infraestructura (Alertado Operativo).

A. Tablero de Calidad de Dato (Panel Visible)

Estos indicadores generan filas específicas en Looker Studio (Store ID + Ticket ID) y requieren acción correctiva sobre entidades de negocio.

1. Integridad Estructural (CORRUPTED_TICKET)

  • Origen: RF01 / GCF_2.
  • Qué vigila: "Tickets zombis" — cabecera pero sin líneas (o viceversa).
  • Mecanismo: Cruce SQL (ticket_to_reprocess_no_items.sql).
💾 Ver Lógica SQL (ticket_to_reprocess_no_items.sql)
with headers as (
  SELECT transaction_id FROM `alcampo-data-silver-dev.sales.headers` WHERE transaction_date_local between "DAY-TO-REPROCESS_INIT" and "DAY-TO-REPROCESS_END" and net_unit_quantity != 0
), tickets as (
  SELECT ticket_id, transaction_payload  FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` WHERE transaction_journey between "DAY-TO-REPROCESS_INIT" and "DAY-TO-REPROCESS_END"  and type = 'pubsub_success_row'
    QUALIFY ROW_NUMBER() OVER (PARTITION BY ticket_id ORDER BY created_at DESC) = 1
) 
select ticket_id, transaction_payload from tickets 
  left join headers on transaction_id = ticket_id 
  where
    transaction_id not in ( SELECT distinct transaction_id  FROM `alcampo-data-silver-dev.sales.items` WHERE transaction_date_local between "DAY-TO-REPROCESS_INIT" and "DAY-TO-REPROCESS_END" )

2. Desincronización de Backup (BACKUP_MISSING_IN_CONTROL)

  • Origen: RF01 / GCF_1.
  • Qué vigila: Tickets presentes en el ZIP de la caja pero desconocidos para BigQuery.
  • Mecanismo: Comparación en memoria (Python) dentro de gcf_load_backup.py.
🐍 Ver Lógica Python
# Pseudocódigo extraído de backup_zip.md
def reconcile(zip_content, db_inventory):
    for ticket in zip_content.binaries:
        if ticket.id not in db_inventory:
             mark_as_missing_in_control(ticket.id) # KPI: Backup Missing
        if ticket.id not in zip_content.texts:
             mark_as_missing_text(ticket.id)       # KPI: Missing Physical Text

3. Incompletitud Física (MISSING_PHYSICAL_TEXT)

  • Origen: gcf_load_backup.py.
  • Qué vigila: Transacción fiscal correcta (binario) pero falta el ticket visual (texto) para el cliente.
  • Mecanismo: Procesamiento del ZIP detecta paridad ADX_IDT4 vs ALC_TDGT.

4. Cierre Diario Operacional (RDO_DELIVERY_FAILURE)

  • Origen: RF01 / GCF_2.
  • Qué vigila: Tickets ingeridos que no llegaron a Smart v2 (RDO) al cierre del día.
  • Mecanismo: Consulta de estado (digital_ticket_control_rdo_previous_day.sql).
💾 Ver Lógica SQL (digital_ticket_control_rdo_previous_day.sql)
with received as (
  Select store_number,  count(*) as tickets  from ( 
    SELECT store_number, ticket_id FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` WHERE transaction_journey = CURRENT_DATE()-1
          and  type = "first_row" and to_rdo = true
        group by store_number, ticket_id
    ) group by store_number
),upload_rdo as (
    Select store_number,  count(*) as tickets  from ( 
      SELECT store_number, ticket_id FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` WHERE transaction_journey = CURRENT_DATE()-1
        and type = 'rdo_success_row'
      group by store_number, ticket_id
    ) group by store_number
)
select received.store_number, received.tickets-upload_rdo.tickets tickets_left from received left join upload_rdo on received.store_number = upload_rdo.store_number 
where 
 received.tickets-upload_rdo.tickets <> 0

5. Cierre Diario Eventos (PUBSUB_DELIVERY_FAILURE)

  • Origen: RF01 / GCF_2.
  • Qué vigila: Tickets que no generaron evento para sistemas satélites.
  • Mecanismo: Consulta de estado (digital_ticket_control_pubsub.sql).
💾 Ver Lógica SQL (digital_ticket_control_pubsub_previous_day.sql)
with received as (
  Select store_number,  count(*) as tickets  from ( 
    SELECT store_number, ticket_id FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` WHERE transaction_journey = CURRENT_DATE()-1
          and  type = "first_row" and to_rdo = true
        group by store_number, ticket_id
    ) group by store_number
),upload_rdo as (
    Select store_number,  count(*) as tickets  from ( 
      SELECT store_number, ticket_id FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` WHERE transaction_journey = CURRENT_DATE()-1
        and type = 'pubsub_success_row'
      group by store_number, ticket_id
    ) group by store_number
)
select received.store_number, received.tickets-upload_rdo.tickets tickets_left from received left join upload_rdo on received.store_number = upload_rdo.store_number 
where 
 received.tickets-upload_rdo.tickets <> 0

6. eCommerce Gap (MISSING_ORDER_BRONZE)

  • Origen: RF01.B / RF07.
  • Qué vigila: Pedidos cobrados (raw_osp) que no aparecen en el Data Lake (bronze) en una ventana de 3 días.
  • Mecanismo: Comparación de ventana deslizante.

B. Alertas de Infraestructura (Alertas Operativas)

Estos indicadores señalan problemas sistémicos ("Tubería Rota"). No suelen aparecer en el panel como filas de error, sino como Alertas Prioritarias vía email/chat.

7. Latido (PIPELINE_STALLED)

  • Origen: RF07 / GCF_2.
  • Qué vigila: "Muerte Silenciosa" — falta repentina de ingesta en un periodo operativo.
  • Mecanismo: Conteo de volumen reciente (digital_ticket_control.sql).
💾 Ver Lógica SQL (digital_ticket_control.sql)
with tickets_upload_yoda as (
  SELECT 'tickets_upload_yoda' as clave, count(*) valor
    FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` 
      WHERE transaction_journey = CURRENT_DATE() and type = 'first_row' 
      and created_at BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
                      AND CURRENT_TIMESTAMP() 
),
tickets_upload_yoda_rdo as (
  SELECT 'tickets_upload_rdo', count(*) 
    FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` 
      WHERE transaction_journey = CURRENT_DATE() and to_rdo = true and type = 'first_row' 
      and created_at BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
                      AND CURRENT_TIMESTAMP() 
) ,
tickets_upload_rdo as (
  SELECT 'tickets_upload_rdo', count(*) 
    FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` 
        WHERE transaction_journey = CURRENT_DATE() and to_rdo = true and type = 'rdo_success_row'
              AND created_at BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
                              AND CURRENT_TIMESTAMP() 
),
tickets_upload_pubsub as (
  SELECT 'tickets_upload_pubsub', count(*) 
    FROM `alcampo-data-staging-dev.pos_stores_sales_staging.ticket_publication_control` 
        WHERE transaction_journey = CURRENT_DATE() and type = 'pubsub_success_row'
              AND created_at BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
                             AND CURRENT_TIMESTAMP() 
),
tickets_upload_datapool as (
  SELECT 'tickets_upload_datapool', count(*) 
    FROM `alcampo-data-bronze-dev.pos_stores_sales_raw.logtran` 
      WHERE transaction_journey = CURRENT_DATE()
          and transaction_datetime  BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
                                    AND CURRENT_TIMESTAMP() 
)               
select * from tickets_upload_yoda 
Union ALL
select * from tickets_upload_yoda_rdo
Union ALL
select * from tickets_upload_rdo
Union ALL
select * from tickets_upload_pubsub
Union ALL
select * from tickets_upload_datapool

Validación de Tienda Abierta:

💾 Ver Lógica SQL (active_stores.sql)
SELECT store_id FROM `alcampo-data-silver-dev.core.sites` where is_active = true and dbt_valid_to is null and  type_name = 'MAG' and format_name != 'CFC' order by store_id

8. Ausencia de Ficheros (DATAFEED_MISSING)

  • Origen: RF08.
  • Qué vigila: El fichero batch diario no ha llegado a la hora de corte.
  • Mecanismo: Verificación de conteo 0 para la partición de fecha actual.

Resumen Maestro de Indicadores

Canal Casuística de Fallo Indicador Estandarizado Detección (Mecanismo) Acción Correctiva Automática Visibilidad
PoS Integridad Estructural CORRUPTED_TICKET GCF_2 (SQL Join) Reprocesa API PoS Panel
PoS Backup vs Cloud BACKUP_MISSING_IN_CONTROL GCF_1 (Python Reconcile) Invoca API Ticket Digital Panel
PoS Ticket Físico Faltante MISSING_PHYSICAL_TEXT GCF_1 (Python Check) Invoca API Ticket Digital Panel
PoS Fallo Entrega RDO RDO_DELIVERY_FAILURE GCF_2 (Audit T-1) Re-publica Pub/Sub Panel
PoS Fallo PubSub PUBSUB_DELIVERY_FAILURE GCF_2 (Audit T-1) Re-publica Pub/Sub Panel
eCom Pedidos Perdidos MISSING_ORDER_BRONZE GCF_2 (Gap Analysis) Re-inyecta Pedido Panel
eCom Colas Caídas PIPELINE_STALLED GCF_2 (Latido) Alerta SRE (Tubería Rota) Alerta / Email
Data Fichero Missing DATAFEED_MISSING GCF_2 (Zero-row check) Alerta Operativa Alerta / Email
PoS Duplicados N/A Procedimiento SQL Eliminación Batch -