Refery.

04. El motor del pipeline

El motor del pipeline es responsable de uno de los problemas más difíciles en cualquier plataforma de reclutamiento: mantener sincronizada con la realidad la visión que el sistema tiene de la etapa de cada candidato. La realidad reside en conversaciones de Gmail, invitaciones de calendario, mensajes internos de Slack, actualizaciones de scouts y confirmaciones verbales. El motor del pipeline reconcilia todas esas fuentes de vuelta en la base de datos sin mover nunca una fila hacia atrás sin autorización humana explícita.

El motor se construye sobre tres principios:

  1. Máquina de estados de solo avance. Las etapas solo pueden avanzar, nunca retroceder. Esto elimina toda una clase de errores.
  2. Transiciones ligadas a evidencia. Cada cambio de etapa requiere evidencia, y la evidencia se registra junto con la transición.
  3. Historial de solo adición. Ninguna transición de etapa se sobrescribe jamás. El registro de auditoría completo reside en pipeline_stage_history.

La máquina de estados

stateDiagram-v2
  [*] --> sourced
  sourced --> job_matched: par candidato × puesto creado
  job_matched --> job_shared: correo saliente que nombra<br/>puesto + empresa específicos
  job_shared --> interest_confirmed: respuesta positiva<br/>explícita del candidato
  interest_confirmed --> screening: el cliente recibe<br/>el perfil del candidato
  screening --> interview: el cliente agenda una llamada
  interview --> offer: oferta extendida
  offer --> hired: oferta aceptada
  job_matched --> rejected: el cliente declina
  job_shared --> rejected
  interest_confirmed --> rejected
  screening --> rejected
  interview --> rejected
  offer --> rejected
  job_matched --> withdrawn: el candidato abandona el proceso
  job_shared --> withdrawn
  interest_confirmed --> withdrawn
  screening --> withdrawn
  interview --> withdrawn
  hired --> [*]
  rejected --> [*]
  withdrawn --> [*]

Por qué solo avance

Las transiciones de solo avance son una decisión de ingeniería deliberada. La alternativa, un estado libremente reversible, suena más flexible pero en la práctica es la fuente de cada ticket de soporte de "a dónde fue este candidato" en toda plataforma de reclutamiento que haya existido.

Si un candidato en screening produce evidencia ambigua que podría interpretarse como interest_confirmed, el sistema no lo degrada automáticamente. Expone la ambigüedad al operador junto con la evidencia y solicita una decisión explícita. Esto es más lento para casos límite individuales y dramáticamente más correcto en conjunto.

Enum de etapas

El conjunto completo de etapas válidas, aplicado a nivel de base de datos mediante una restricción CHECK:

ALTER TABLE job_candidate_pipeline
  ADD CONSTRAINT pipeline_stage_check
  CHECK (stage IN (
    'sourced',
    'job_matched',
    'job_shared',
    'interest_confirmed',
    'screening',
    'interview',
    'offer',
    'hired',
    'rejected',
    'withdrawn'
  ));

La restricción en la capa SQL es el muro de carga. El código de la aplicación puede tener errores, pero la base de datos rechazará cualquier valor de etapa no válido. Este es el mismo patrón usado en sistemas financieros donde la integridad de los datos importa más que la agilidad.

Escaleras de evidencia

Cada transición tiene un requisito de evidencia explícito. El motor de reconciliación recorre la escalera hacia abajo y se detiene en la primera coincidencia. Las etapas nunca se omiten sin autorización explícita del operador.

job_matched → job_shared

Evidencia requerida. Un correo saliente de Refery al candidato que nombre este puesto y empresa específicos en el cuerpo del correo. Las lecturas a nivel de fragmento son insuficientes. El motor de reconciliación obtiene el contenido completo del mensaje y verifica que el título del puesto se mencione realmente.

Un caso límite típico: un correo dice "Me encantaría compartir tres puestos contigo: Stripe SWE, Anthropic ML Eng, más otros tres." Los dos primeros puestos transicionan a job_shared. Las filas de "otros tres" no transicionan hasta que esos puestos se nombren explícitamente en un correo de seguimiento. Esta es la diferencia entre un sistema que tiene un 80% de precisión de datos y uno que tiene un 99%.

job_shared → interest_confirmed

Evidencia requerida. El candidato responde con lenguaje positivo explícito sobre este puesto específico: "Estoy interesado", "procedamos", "con gusto me postulo", "comparte mi perfil, por favor". El reconocimiento pasivo ("gracias por compartir", "entendido", "anotado") no cuenta. "Quizás más tarde" no cuenta. "Cuéntame más primero" no cuenta.

Este umbral es intencionalmente alto. Un falso positivo de transición aquí contamina las métricas posteriores del pipeline y hace que Refery presente candidatos no confirmados a los clientes, lo que daña la relación con el cliente. Los falsos negativos son recuperables; los falsos positivos no lo son.

interest_confirmed → screening

Evidencia requerida. Refery envía un correo al cliente (contacto de contratación en la empresa) presentando el perfil del candidato, con el correo haciendo referencia a este candidato específico. Las invitaciones de calendario o los mensajes de Slack con confirmación equivalente también cuentan.

screening → interview

Evidencia requerida. El cliente responde agendando una llamada o entrevista con el candidato, o se crea una invitación de calendario que involucra al candidato.

* → rejected

Evidencia requerida. El cliente declina ("no encaja", "seguimos adelante", "vamos en otra dirección") o el candidato ya no está siendo considerado. El campo notes en la fila de historial debe especificar si el rechazo provino del cliente o del candidato.

* → withdrawn

Evidencia requerida. El candidato se retira explícitamente: "ya no estoy interesado", "acepté otra oferta", "no es el momento adecuado".

El motor de reconciliación

El motor de reconciliación se invoca bajo demanda. Lee Gmail, hace coincidir los hilos con filas específicas del pipeline y emite transiciones de etapa según las escaleras de evidencia anteriores.

Paso 1: Definir el alcance

Alcance predeterminado: cada fila del pipeline actualmente en movimiento (job_matched, job_shared, interest_confirmed, screening). Los estados terminales (hired, rejected, withdrawn, interview una vez concluida) se excluyen a menos que se soliciten explícitamente.

SELECT
  p.id AS pipeline_id,
  p.candidate_id,
  p.job_id,
  p.stage AS current_stage,
  p.updated_at,
  c.name AS candidate_name,
  c.email AS candidate_email,
  j.title AS role,
  COALESCE(co.name, j.company_name) AS company
FROM job_candidate_pipeline p
JOIN candidates c ON c.id = p.candidate_id
JOIN jobs j ON j.id = p.job_id
LEFT JOIN companies co ON co.id = j.company_id
WHERE p.stage IN ('job_matched', 'job_shared', 'interest_confirmed', 'screening')
ORDER BY c.name, company, role;

Paso 2: Lectura por lotes de Gmail

Para minimizar las idas y vueltas a la API, el motor combina todos los correos de candidatos en movimiento en una única consulta de búsqueda de Gmail.

// reconciliation/gmail-batch.ts

export async function batchReadGmail(
  candidates: Candidate[],
  windowDays: number = 30
): Promise<GmailThread[]> {
  // Combina todos los correos de candidatos en una sola consulta
  const emailClauses = candidates
    .map(c => `to:${c.email} OR from:${c.email}`)
    .join(' OR ');

  const query = `(${emailClauses}) newer_than:${windowDays}d`;

  const threads = await gmail.searchThreads(query);

  // Para cada hilo, obtiene FULL_CONTENT.
  // El nivel de fragmento omite nombres de puestos, negociación de compensación,
  // y señales explícitas de interés en el puesto.
  const fullThreads = await Promise.all(
    threads.map(t => gmail.getThread(t.id, { messageFormat: 'FULL_CONTENT' }))
  );

  return fullThreads;
}

Una única consulta combinada es aproximadamente 30x más eficiente que N consultas individuales contra la API de Gmail, y se mantiene cómodamente dentro de los límites de tasa de la API de Gmail incluso con cientos de filas del pipeline en movimiento.

Paso 3: Decidir cada transición

Para cada fila del pipeline, el motor hace coincidir los hilos de Gmail con esa fila específica y recorre la escalera de evidencia.

// reconciliation/decide-transition.ts

export function decideTransition(
  row: PipelineRow,
  threads: GmailThread[]
): Transition | null {
  const relevantThreads = threads.filter(t =>
    threadInvolvesParty(t, row.candidate_email) ||
    threadInvolvesParty(t, row.client_hiring_contact_email)
  );

  // Recorre la escalera desde la etapa actual. Se detiene en la primera transición coincidente.
  switch (row.current_stage) {
    case 'job_matched':
      const sharedEvidence = findRoleNamedOutbound(
        relevantThreads,
        row.role,
        row.company
      );
      if (sharedEvidence) {
        return {
          to: 'job_shared',
          evidence: `Email ${formatDate(sharedEvidence.date)}: outbound naming "${row.role}" at ${row.company}`,
        };
      }
      return null;

    case 'job_shared':
      const interestEvidence = findExplicitInterestReply(
        relevantThreads,
        row.candidate_email,
        row.role
      );
      if (interestEvidence) {
        return {
          to: 'interest_confirmed',
          evidence: `Email ${formatDate(interestEvidence.date)}: candidate reply "${interestEvidence.snippet}"`,
        };
      }
      return null;

    case 'interest_confirmed':
      const submissionEvidence = findClientSubmission(
        relevantThreads,
        row.client_hiring_contact_email,
        row.candidate_name
      );
      if (submissionEvidence) {
        return {
          to: 'screening',
          evidence: `Email ${formatDate(submissionEvidence.date)}: profile submitted to ${row.client_hiring_contact_email}`,
        };
      }
      return null;

    case 'screening':
      const interviewEvidence = findInterviewScheduled(
        relevantThreads,
        row.candidate_email
      );
      if (interviewEvidence) {
        return {
          to: 'interview',
          evidence: `Calendar invite ${formatDate(interviewEvidence.date)}: ${interviewEvidence.subject}`,
        };
      }
      return null;
  }
}

Paso 4: Idempotencia

El motor es completamente idempotente. Volver a ejecutarlo sin nueva evidencia de Gmail produce cero escrituras. Esta es una propiedad de muro de carga: significa que el motor puede ejecutarse de forma programada, bajo demanda o después de cada sincronización de Gmail sin riesgo de transiciones duplicadas.

export function buildTransitionList(
  rows: PipelineRow[],
  threads: GmailThread[]
): Transition[] {
  return rows
    .map(row => ({ row, transition: decideTransition(row, threads) }))
    .filter(({ row, transition }) =>
      transition !== null && transition.to !== row.current_stage
    )
    .map(({ row, transition }) => ({
      pipelineId: row.pipeline_id,
      from: row.current_stage,
      to: transition.to,
      evidence: transition.evidence,
    }));
}

El filtro sobre transition.to !== row.current_stage es la salvaguarda de idempotencia. Una fila que ya está en su etapa correcta no produce trabajo.

Paso 5: Escritura atómica

Las actualizaciones de etapa y las filas de historial se escriben de forma atómica. Nunca se permite que la inserción del historial falle silenciosamente si la actualización de etapa tiene éxito, y viceversa.

BEGIN;

UPDATE job_candidate_pipeline
SET stage = $1, updated_at = now()
WHERE id = $2 AND stage = $3;  -- comprobación de concurrencia optimista
-- Si el número de filas afectadas es 0, otro escritor cambió la etapa. Abortar.

INSERT INTO pipeline_stage_history (
  id, pipeline_id, job_id, candidate_id,
  previous_stage, new_stage,
  changed_by_user_id, changed_at, notes
)
VALUES (
  gen_random_uuid(), $2, $4, $5,
  $3, $1,
  $6, now(), $7
);

COMMIT;

La comprobación de concurrencia optimista en el UPDATE (AND stage = $3) evita actualizaciones perdidas si dos ejecuciones de reconciliación ocurren de forma concurrente, lo que puede suceder si el motor se invoca desde múltiples lugares. El perdedor de la carrera ve cero filas afectadas y aborta limpiamente.

Historial de solo adición

La tabla pipeline_stage_history es de solo adición por convención y por ruta de código. No existe ninguna ruta de código de la aplicación que actualice o elimine filas de esta tabla. Esto es lo que hace que el sistema sea auditable.

Esquema

CREATE TABLE pipeline_stage_history (
  id                  uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  pipeline_id         uuid NOT NULL REFERENCES job_candidate_pipeline(id) ON DELETE CASCADE,
  job_id              uuid NOT NULL,
  candidate_id        uuid NOT NULL,
  previous_stage      text,
  new_stage           text NOT NULL,
  changed_by_user_id  uuid NOT NULL,
  changed_at          timestamptz NOT NULL DEFAULT now(),
  notes               text NOT NULL CHECK (length(notes) > 0)
);

CREATE INDEX idx_history_pipeline ON pipeline_stage_history(pipeline_id);
CREATE INDEX idx_history_candidate ON pipeline_stage_history(candidate_id);
CREATE INDEX idx_history_changed_at ON pipeline_stage_history(changed_at DESC);

La restricción CHECK (length(notes) > 0) en la capa SQL aplica que cada fila de historial tenga evidencia adjunta. Las filas de historial vacías son inútiles para la auditoría y están prohibidas por el esquema.

Lectura del historial

La tabla de historial funciona también como la fuente de verdad para las métricas del pipeline basadas en el tiempo: tiempo en etapa, tasas de conversión por fuente, análisis de abandono. Dado que cada transición se preserva con su evidencia, estas métricas son reproducibles únicamente a partir de la tabla de historial, sin ningún sistema de registro de series temporales.

-- Tasa de conversión de job_shared → interest_confirmed, últimos 90 días
WITH stage_transitions AS (
  SELECT
    pipeline_id,
    new_stage,
    LAG(new_stage) OVER (PARTITION BY pipeline_id ORDER BY changed_at) AS previous_in_pipeline
  FROM pipeline_stage_history
  WHERE changed_at > now() - interval '90 days'
)
SELECT
  COUNT(*) FILTER (WHERE new_stage = 'interest_confirmed' AND previous_in_pipeline = 'job_shared')::float /
  NULLIF(COUNT(*) FILTER (WHERE new_stage = 'job_shared'), 0) AS conversion_rate
FROM stage_transitions;

Marcado para atención

Después de que se ejecuta la reconciliación, el motor expone las filas que necesitan atención humana pero que no produjeron transiciones automáticas. Se calculan dos marcas:

  1. job_shared estancado. Una fila en job_shared durante más de 7 días sin respuesta del candidato. Acción: considerar un seguimiento.
  2. screening estancado. Una fila en screening durante más de 7 días sin respuesta del cliente. Acción: considerar un recordatorio.
-- Consulta de marcado para atención
SELECT
  c.name AS candidate,
  COALESCE(co.name, j.company_name) AS company,
  j.title AS role,
  p.stage,
  age(now(), p.updated_at) AS time_in_stage,
  CASE p.stage
    WHEN 'job_shared' THEN 'Consider candidate follow-up'
    WHEN 'screening' THEN 'Consider client nudge'
  END AS suggested_action
FROM job_candidate_pipeline p
JOIN candidates c ON c.id = p.candidate_id
JOIN jobs j ON j.id = p.job_id
LEFT JOIN companies co ON co.id = j.company_id
WHERE p.stage IN ('job_shared', 'screening')
  AND p.updated_at < now() - interval '7 days'
ORDER BY p.updated_at ASC;

Este es el tipo de inteligencia operativa que un ATS produce, en el mejor de los casos, por accidente. Refery la produce de forma determinista, en cada ejecución de reconciliación, como una salida de primera clase del motor del pipeline.

Por qué esto es novedoso

  1. Máquina de estados de solo avance con invariantes aplicadas por la base de datos. La mayoría de las plataformas de reclutamiento permiten la mutación libre de estado, lo que conduce a la deriva de datos. La restricción CHECK de Refery es el muro de carga.
  2. Transiciones ligadas a evidencia con verificación de FULL_CONTENT. Otros sistemas tratan los metadatos del correo como evidencia. Refery requiere que el nombre del puesto aparezca realmente en el cuerpo del mensaje. Esta es la diferencia entre un 80% de precisión y un 99%.
  3. Concurrencia optimista en cada transición. Dos ejecuciones de reconciliación no pueden producir transiciones duplicadas, ni siquiera bajo concurrencia agresiva.
  4. Reconciliación idempotente. El motor puede invocarse libremente sin riesgo de efectos secundarios. Esto hace que sea seguro ejecutarlo en cada sincronización de Gmail, en cada tick programado y en cada acción del operador.
  5. Historial de solo adición como fuente de métricas. Las métricas del pipeline se calculan a partir de la propia tabla de historial, no de un pipeline de métricas separado. El sistema tiene una única fuente de verdad.

El motor del pipeline es la parte de Refery que un ingeniero de software empresarial experimentado reconoce de inmediato como de grado de producción. Está construido como un libro mayor financiero, no como un CRM.