Refery.

04. The Pipeline Engine

The pipeline engine is responsible for one of the hardest problems in any recruiting platform: keeping the system's view of every candidate's stage in sync with reality. Reality lives across Gmail conversations, calendar invites, internal Slack messages, scout updates, and verbal confirmations. The pipeline engine reconciles all of those sources back into the database without ever moving a row backward without explicit human authorization.

The engine is built on three principles:

  1. Forward-only state machine. Stages can only advance, never regress. This eliminates an entire class of bugs.
  2. Evidence-bound transitions. Every stage change requires evidence, and the evidence is logged with the transition.
  3. Append-only history. No stage transition is ever overwritten. The full audit trail lives in pipeline_stage_history.

The state machine

stateDiagram-v2
  [*] --> sourced
  sourced --> job_matched: candidate × role pair created
  job_matched --> job_shared: outbound email naming<br/>specific role + company
  job_shared --> interest_confirmed: candidate explicit<br/>positive reply
  interest_confirmed --> screening: client receives<br/>candidate profile
  screening --> interview: client schedules call
  interview --> offer: offer extended
  offer --> hired: offer accepted
  job_matched --> rejected: client passes
  job_shared --> rejected
  interest_confirmed --> rejected
  screening --> rejected
  interview --> rejected
  offer --> rejected
  job_matched --> withdrawn: candidate exits process
  job_shared --> withdrawn
  interest_confirmed --> withdrawn
  screening --> withdrawn
  interview --> withdrawn
  hired --> [*]
  rejected --> [*]
  withdrawn --> [*]

Why forward-only

Forward-only transitions are a deliberate engineering choice. The alternative, freely-reversible state, sounds more flexible but in practice is the source of every "where did this candidate go" support ticket in every recruiting platform that has ever shipped.

If a candidate at screening produces ambiguous evidence that might be interpreted as interest_confirmed, the system does not automatically demote them. It surfaces the ambiguity to the operator with the evidence and asks for an explicit decision. This is slower for individual edge cases and dramatically more correct in aggregate.

Stage enum

The full set of valid stages, enforced at the database level via a CHECK constraint:

ALTER TABLE job_candidate_pipeline
  ADD CONSTRAINT pipeline_stage_check
  CHECK (stage IN (
    'sourced',
    'job_matched',
    'job_shared',
    'interest_confirmed',
    'screening',
    'interview',
    'offer',
    'hired',
    'rejected',
    'withdrawn'
  ));

The constraint at the SQL layer is the load-bearing wall. Application code may have bugs, but the database will reject any invalid stage value. This is the same pattern used in financial systems where data integrity matters more than agility.

Evidence ladders

Every transition has an explicit evidence requirement. The reconciliation engine walks the ladder downward and stops at the first match. Stages are never skipped without explicit operator authorization.

job_matched → job_shared

Required evidence. An outbound email from Refery to the candidate that names this specific role and company in the email body. Snippet-level reads are insufficient. The reconciliation engine pulls full message content and verifies the role title is actually mentioned.

A typical edge case: an email reads "I'd love to share three roles with you: Stripe SWE, Anthropic ML Eng, plus three more." The first two roles transition to job_shared. The "three more" rows do not transition until those roles are explicitly named in a follow-up email. This is the difference between a system that has 80% data accuracy and one that has 99%.

job_shared → interest_confirmed

Required evidence. Candidate replies with explicit positive language about this specific role: "I'm interested," "let's proceed," "happy to apply," "please share my profile." Passive acknowledgment ("thanks for sharing," "got it," "noted") does not count. "Maybe later" does not count. "Tell me more first" does not count.

This bar is intentionally high. A false positive transition here pollutes downstream pipeline metrics and causes Refery to pitch unconfirmed candidates to clients, which damages the client relationship. False negatives are recoverable; false positives are not.

interest_confirmed → screening

Required evidence. Refery emails the client (hiring contact at the company) submitting the candidate's profile, with the email referencing this specific candidate. Calendar invites or Slack messages with equivalent confirmation also count.

screening → interview

Required evidence. Client replies scheduling a call or interview with the candidate, or a calendar invite is created involving the candidate.

* → rejected

Required evidence. Client passes ("not a fit," "moving on," "going in another direction") or candidate is no longer being considered. The notes field on the history row must specify whether rejection came from client or candidate.

* → withdrawn

Required evidence. Candidate explicitly pulls out: "not interested anymore," "accepted another offer," "not the right time."

The reconciliation engine

The reconciliation engine is invoked on demand. It reads Gmail, matches threads to specific pipeline rows, and emits stage transitions according to the evidence ladders above.

Step 1: Define scope

Default scope: every pipeline row currently in motion (job_matched, job_shared, interest_confirmed, screening). Terminal states (hired, rejected, withdrawn, interview once concluded) are excluded unless explicitly requested.

SELECT
  p.id AS pipeline_id,
  p.candidate_id,
  p.job_id,
  p.stage AS current_stage,
  p.updated_at,
  c.name AS candidate_name,
  c.email AS candidate_email,
  j.title AS role,
  COALESCE(co.name, j.company_name) AS company
FROM job_candidate_pipeline p
JOIN candidates c ON c.id = p.candidate_id
JOIN jobs j ON j.id = p.job_id
LEFT JOIN companies co ON co.id = j.company_id
WHERE p.stage IN ('job_matched', 'job_shared', 'interest_confirmed', 'screening')
ORDER BY c.name, company, role;

Step 2: Batch-read Gmail

To minimize API round trips, the engine combines all in-motion candidate emails into a single Gmail search query.

// reconciliation/gmail-batch.ts

export async function batchReadGmail(
  candidates: Candidate[],
  windowDays: number = 30
): Promise<GmailThread[]> {
  // Combine all candidate emails into one query
  const emailClauses = candidates
    .map(c => `to:${c.email} OR from:${c.email}`)
    .join(' OR ');

  const query = `(${emailClauses}) newer_than:${windowDays}d`;

  const threads = await gmail.searchThreads(query);

  // For each thread, fetch FULL_CONTENT.
  // Snippet level misses role names, comp negotiation,
  // and explicit role-interest signals.
  const fullThreads = await Promise.all(
    threads.map(t => gmail.getThread(t.id, { messageFormat: 'FULL_CONTENT' }))
  );

  return fullThreads;
}

A single combined query is approximately 30x more efficient than N individual queries against the Gmail API, and it stays well within Gmail API rate limits even with hundreds of in-motion pipeline rows.

Step 3: Decide each transition

For each pipeline row, the engine matches Gmail threads to that specific row and walks the evidence ladder.

// reconciliation/decide-transition.ts

export function decideTransition(
  row: PipelineRow,
  threads: GmailThread[]
): Transition | null {
  const relevantThreads = threads.filter(t =>
    threadInvolvesParty(t, row.candidate_email) ||
    threadInvolvesParty(t, row.client_hiring_contact_email)
  );

  // Walk ladder from current stage. Stop at first matching transition.
  switch (row.current_stage) {
    case 'job_matched':
      const sharedEvidence = findRoleNamedOutbound(
        relevantThreads,
        row.role,
        row.company
      );
      if (sharedEvidence) {
        return {
          to: 'job_shared',
          evidence: `Email ${formatDate(sharedEvidence.date)}: outbound naming "${row.role}" at ${row.company}`,
        };
      }
      return null;

    case 'job_shared':
      const interestEvidence = findExplicitInterestReply(
        relevantThreads,
        row.candidate_email,
        row.role
      );
      if (interestEvidence) {
        return {
          to: 'interest_confirmed',
          evidence: `Email ${formatDate(interestEvidence.date)}: candidate reply "${interestEvidence.snippet}"`,
        };
      }
      return null;

    case 'interest_confirmed':
      const submissionEvidence = findClientSubmission(
        relevantThreads,
        row.client_hiring_contact_email,
        row.candidate_name
      );
      if (submissionEvidence) {
        return {
          to: 'screening',
          evidence: `Email ${formatDate(submissionEvidence.date)}: profile submitted to ${row.client_hiring_contact_email}`,
        };
      }
      return null;

    case 'screening':
      const interviewEvidence = findInterviewScheduled(
        relevantThreads,
        row.candidate_email
      );
      if (interviewEvidence) {
        return {
          to: 'interview',
          evidence: `Calendar invite ${formatDate(interviewEvidence.date)}: ${interviewEvidence.subject}`,
        };
      }
      return null;
  }
}

Step 4: Idempotency

The engine is fully idempotent. Re-running it with no new Gmail evidence produces zero writes. This is a load-bearing property: it means the engine can be run on a schedule, on-demand, or after every Gmail sync without risk of duplicate transitions.

export function buildTransitionList(
  rows: PipelineRow[],
  threads: GmailThread[]
): Transition[] {
  return rows
    .map(row => ({ row, transition: decideTransition(row, threads) }))
    .filter(({ row, transition }) =>
      transition !== null && transition.to !== row.current_stage
    )
    .map(({ row, transition }) => ({
      pipelineId: row.pipeline_id,
      from: row.current_stage,
      to: transition.to,
      evidence: transition.evidence,
    }));
}

The filter on transition.to !== row.current_stage is the idempotency guard. A row that is already at its correct stage produces no work.

Step 5: Atomic write

Stage updates and history rows are written atomically. The history insert is never allowed to fail silently if the stage update succeeds, and vice versa.

BEGIN;

UPDATE job_candidate_pipeline
SET stage = $1, updated_at = now()
WHERE id = $2 AND stage = $3;  -- optimistic concurrency check
-- If row affected count is 0, another writer changed the stage. Abort.

INSERT INTO pipeline_stage_history (
  id, pipeline_id, job_id, candidate_id,
  previous_stage, new_stage,
  changed_by_user_id, changed_at, notes
)
VALUES (
  gen_random_uuid(), $2, $4, $5,
  $3, $1,
  $6, now(), $7
);

COMMIT;

The optimistic concurrency check on the UPDATE (AND stage = $3) prevents lost updates if two reconciliation runs happen concurrently, which can happen if the engine is invoked from multiple places. The race loser sees zero rows affected and aborts cleanly.

Append-only history

The pipeline_stage_history table is append-only by convention and by code path. There is no application code path that updates or deletes rows from this table. This is what makes the system auditable.

Schema

CREATE TABLE pipeline_stage_history (
  id                  uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  pipeline_id         uuid NOT NULL REFERENCES job_candidate_pipeline(id) ON DELETE CASCADE,
  job_id              uuid NOT NULL,
  candidate_id        uuid NOT NULL,
  previous_stage      text,
  new_stage           text NOT NULL,
  changed_by_user_id  uuid NOT NULL,
  changed_at          timestamptz NOT NULL DEFAULT now(),
  notes               text NOT NULL CHECK (length(notes) > 0)
);

CREATE INDEX idx_history_pipeline ON pipeline_stage_history(pipeline_id);
CREATE INDEX idx_history_candidate ON pipeline_stage_history(candidate_id);
CREATE INDEX idx_history_changed_at ON pipeline_stage_history(changed_at DESC);

The CHECK (length(notes) > 0) constraint at the SQL layer enforces that every history row has evidence attached. Empty history rows are useless for audit and are forbidden by the schema.

Reading history

The history table doubles as the source of truth for time-based pipeline metrics: time-in-stage, conversion rates by source, drop-off analysis. Because every transition is preserved with its evidence, these metrics are reproducible from the history table alone, without any time-series logging system.

-- Conversion rate from job_shared → interest_confirmed, last 90 days
WITH stage_transitions AS (
  SELECT
    pipeline_id,
    new_stage,
    LAG(new_stage) OVER (PARTITION BY pipeline_id ORDER BY changed_at) AS previous_in_pipeline
  FROM pipeline_stage_history
  WHERE changed_at > now() - interval '90 days'
)
SELECT
  COUNT(*) FILTER (WHERE new_stage = 'interest_confirmed' AND previous_in_pipeline = 'job_shared')::float /
  NULLIF(COUNT(*) FILTER (WHERE new_stage = 'job_shared'), 0) AS conversion_rate
FROM stage_transitions;

Flagged-for-attention

After reconciliation runs, the engine surfaces rows that need human attention but did not produce automatic transitions. Two flags are computed:

  1. Stalled job_shared. A row in job_shared for more than 7 days with no candidate reply. Action: consider follow-up.
  2. Stalled screening. A row in screening for more than 7 days with no client reply. Action: consider nudge.
-- Flagged-for-attention query
SELECT
  c.name AS candidate,
  COALESCE(co.name, j.company_name) AS company,
  j.title AS role,
  p.stage,
  age(now(), p.updated_at) AS time_in_stage,
  CASE p.stage
    WHEN 'job_shared' THEN 'Consider candidate follow-up'
    WHEN 'screening' THEN 'Consider client nudge'
  END AS suggested_action
FROM job_candidate_pipeline p
JOIN candidates c ON c.id = p.candidate_id
JOIN jobs j ON j.id = p.job_id
LEFT JOIN companies co ON co.id = j.company_id
WHERE p.stage IN ('job_shared', 'screening')
  AND p.updated_at < now() - interval '7 days'
ORDER BY p.updated_at ASC;

This is the kind of operational intelligence an ATS produces by accident at best. Refery produces it deterministically, on every reconciliation run, as a first-class output of the pipeline engine.

Why this is novel

  1. Forward-only state machine with database-enforced invariants. Most recruiting platforms allow free state mutation, which leads to data drift. Refery's CHECK constraint is the load-bearing wall.
  2. Evidence-bound transitions with FULL_CONTENT verification. Other systems treat email metadata as evidence. Refery requires the role name to actually appear in the message body. This is the difference between 80% accuracy and 99%.
  3. Optimistic concurrency on every transition. Two reconciliation runs cannot produce duplicate transitions, even under aggressive concurrency.
  4. Idempotent reconciliation. The engine can be invoked freely without side-effect risk. This makes it safe to run on every Gmail sync, every scheduled tick, every operator action.
  5. Append-only history as the metric source. Pipeline metrics are computed from the history table itself, not from a separate metrics pipeline. The system has one source of truth.

The pipeline engine is the part of Refery that an experienced enterprise software engineer recognizes immediately as production-grade. It is built like a financial ledger, not like a CRM.