コンテンツにスキップ

Worker ランタイム基盤 (graphile-worker)

Worker ランタイム基盤 (graphile-worker)

Section titled “Worker ランタイム基盤 (graphile-worker)”

Document ID: SRS-WRK-001 Parent: SRS-ROOT-001 v0.5 Version: 0.4 Status: Implemented Last Updated: 2026-05-06 Depends on: SRS-TEN-001 v0.4, SRS-TEN-002 v0.2 依存される: SRS-RES-004 v0.3, SRS-REG-002 v0.2, SRS-CUS-001 v0.2, Phase 1 全体

本書は SRS-ROOT-001 v0.5 に従う。graphile_worker.* schema は業務 ERD 外の運用 schema として扱い、Drizzle 管理に混ぜない。


ジョブ投入・cron・worker runtime を Phase 1 で実運用可能にする。現状の packages/jobsping smoke のみでは不足しており、以下を整備する:

  • 型安全 enqueue (enqueueJob<K>)
  • cron 宣言(code declaration)
  • store fan-out 規律(system scheduler → store-scoped task)
  • Sentry 連携
  • worker_runtime ロール分離 (bootstrap)
  • /healthz, /readyz 分離

DATABASE_URL_MIGRATOR を worker runtime で使う運用は禁止。


  • As a developer, I want to API から型安全に job を enqueue したい, so that payload 破壊を compile error にしたい
  • As a manager, I want to 期限切れ仮予約や日次集計を自動処理したい
  • As an operator, I want to worker 障害時に health/ready で即座に検知したい

  1. API または cron が enqueueJob<K>() を呼ぶ
  2. payload 型が JobPayloadMap[K] で検証される
  3. worker が taskList から該当 handler を起動する
  4. handler は withTenant(storeId)SET LOCAL app.current_store_id を適用する(store-scoped task のみ)
  5. 成功で完了、失敗で retry、最終失敗で Sentry
  • system scheduler task は store 一覧を列挙し、store-scoped fan-out task を enqueue する(親 §7.1.7)
    • worker_runtimestore テーブルへの cross-tenant SELECT 権を持たない
    • 列挙は SECURITY DEFINER 関数 public.list_store_ids_for_fan_out() 経由のみで許可する
    • 関数の所有者は専用 worker_fanout_owner (NOLOGIN BYPASSRLS) ロール
    • 関数は引数なし・返り値は store_id uuid のみで、汎用 cross-tenant API に育てない
    • system scheduler task は fan-out enqueue 以外の業務 mutation を行ってはならない
  • graceful shutdown 時は新規取得を止め、実行中 job の完了を待つ
  • /readyz は DB ping + graphile_worker schema 可視性まで確認する
  • payload 不整合は compile error。runtime zod fallback でも弾く
  • task 名未登録は enqueue 時に禁止
  • DB 接続失敗時 /healthz は 200、/readyz は 503
  • retry 上限超過時は Sentry に task 名・storeId・attempt を送る

Phase 1 では operator 向け UI は持たない。運用は /healthz, /readyz, logs, Sentry のみ。


種別名前用途
TS APIenqueueJob<K>()型安全 enqueue
Worker HTTPGET /healthzprocess liveness
Worker HTTPGET /readyzDB / schema readiness
type JobPayloadMap = {
// system schedulers (storeId なし、fan-out のみ)
'expire-tentative-reservations': { reason: 'cron' };
'aggregate-daily-sales': { targetDate: string; reason: 'cron' };
'cleanup-deleted-customers': { reason: 'cron' };
'cleanup-expired-operator-sessions': { reason: 'cron' };
'cleanup-expired-operator-invitations': { reason: 'cron' };
'purge-deleted-customer-notes': { reason: 'cron' };
// store-scoped task (storeId 必須)
'expire-tentative-for-store': { storeId: StoreId };
'aggregate-daily-sales-for-store': { storeId: StoreId; targetDate: string };
'cleanup-deleted-customers-for-store': { storeId: StoreId; cutoffIso: string };
'purge-deleted-customer-notes-for-store': { storeId: StoreId; cutoffIso: string };
'send-operator-invitation-email': { storeId: StoreId; invitationId: string };
};
declare function enqueueJob<K extends keyof JobPayloadMap>(
name: K,
payload: JobPayloadMap[K],
opts?: {
jobKey?: string;
jobKeyMode?: 'replace' | 'preserve_run_at' | 'unsafe_dedupe';
maxAttempts?: number;
runAt?: Date;
}
): Promise<void>;
  • JOB.UNKNOWN_TASK
  • JOB.PAYLOAD_INVALID
  • WORKER.NOT_READY
  • WORKER.SCHEMA_MISSING

  • graphile_worker.*bun run db:worker:migrate で管理する
  • 業務 schema migration は持たない
  • cluster-scope role は Drizzle migration ではなく bootstrap で管理する

infra/sql/bootstrap/002_worker_runtime_role.sql

Section titled “infra/sql/bootstrap/002_worker_runtime_role.sql”
-- worker_runtime ロール作成
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'worker_runtime') THEN
CREATE ROLE worker_runtime LOGIN PASSWORD 'worker_runtime';
END IF;
END $$;
GRANT CONNECT ON DATABASE salon_dev TO worker_runtime;
GRANT USAGE ON SCHEMA public TO worker_runtime;
GRANT USAGE ON SCHEMA graphile_worker TO worker_runtime;
-- graphile_worker schema へのフル権限
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA graphile_worker TO worker_runtime;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA graphile_worker TO worker_runtime;
ALTER DEFAULT PRIVILEGES IN SCHEMA graphile_worker GRANT ALL PRIVILEGES ON TABLES TO worker_runtime;
ALTER DEFAULT PRIVILEGES IN SCHEMA graphile_worker GRANT ALL PRIVILEGES ON SEQUENCES TO worker_runtime;
-- public schema は app と同じ tenant-safe 経路のみ
-- worker_runtime も tenant-scoped task 実行時は app と同等の RLS 適用を受ける
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO worker_runtime;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO worker_runtime;
-- 全テーブルで RLS が適用されるため、worker_runtime も SET LOCAL app.current_store_id 必須

infra/sql/bootstrap/003_worker_fanout_owner_role.sql

Section titled “infra/sql/bootstrap/003_worker_fanout_owner_role.sql”

system scheduler の store 列挙を SECURITY DEFINER 関数 1 本に閉じ込めるために、 専用 owner ロールを bootstrap で作る (Codex co-design Round 3)。 worker_runtime 自体には cross-tenant SELECT を与えない。

-- LOGIN 不可、worker_runtime / app への membership 付与禁止
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'worker_fanout_owner') THEN
CREATE ROLE worker_fanout_owner NOLOGIN BYPASSRLS;
END IF;
END
$$;

packages/db/migrations/0021_worker_fanout_helpers.sql

Section titled “packages/db/migrations/0021_worker_fanout_helpers.sql”
GRANT USAGE ON SCHEMA public TO worker_fanout_owner;
GRANT SELECT (id) ON TABLE public.store TO worker_fanout_owner;
CREATE OR REPLACE FUNCTION public.list_store_ids_for_fan_out()
RETURNS TABLE (store_id uuid)
LANGUAGE sql
STABLE
SECURITY DEFINER
SET search_path = pg_catalog
AS $$
SELECT s.id FROM public.store AS s ORDER BY s.id
$$;
ALTER FUNCTION public.list_store_ids_for_fan_out() OWNER TO worker_fanout_owner;
REVOKE ALL ON FUNCTION public.list_store_ids_for_fan_out() FROM PUBLIC;
GRANT EXECUTE ON FUNCTION public.list_store_ids_for_fan_out() TO worker_runtime;

注意点:

  • db:bootstrapdb:migrate の順序を守る。逆順だと role 不在で migration 失敗
  • 関数は引数なし・返り値は store_id のみ。汎用 cross-tenant API に育てない
  • search_path = pg_catalog 固定 + public.store 完全修飾。未修飾名は hijack 余地
packages/jobs/src/
index.ts # taskList export
types.ts # JobPayloadMap
enqueue.ts # enqueueJob<K>
reservation/
expire-tentative-reservations.ts
expire-tentative-for-store.ts
register/
aggregate-daily-sales.ts
aggregate-daily-sales-for-store.ts
customer/
cleanup-deleted-customers.ts
cleanup-deleted-customers-for-store.ts
purge-deleted-customer-notes.ts
purge-deleted-customer-notes-for-store.ts
operator/
cleanup-expired-operator-sessions.ts
cleanup-expired-operator-invitations.ts
send-operator-invitation-email.ts
_shared/
with-tenant-job.ts
sentry.ts
logger.ts

  • ファイル名: kebab-case verb-noun
  • task 名: kebab-case verb-noun
  • BC 識別は directory で行い、task 名に冗長 prefix を付けない
  • system scheduler task は store 一覧を列挙するだけ
  • 業務 mutation は必ず *-for-store task で行う
  • expire-tentative-for-storejob_keyexpire_tentative:${storeId} + mode=preserve_run_at
  • aggregate-daily-sales-for-storejob_keyaggregate_daily_sales:${storeId}:${targetDate} + mode=preserve_run_at
  • purge-deleted-customer-notes-for-storejob_keypurge_customer_notes:${storeId}:${cutoffIso} + mode=preserve_run_at
  • cleanup-deleted-customers-for-storejob_keycleanup_customers:${storeId}:${cutoffIso} + mode=preserve_run_at
  • mode=preserve_run_at は graphile-worker の API 仕様に従い、同じ jobKey のジョブが queue 上に存在するなら新規追加せず既存の run_at を維持する (= 重複 enqueue を抑制)
  • retention 判定は JST 日付丸めではなく 絶対時刻 cutoffIso = now - intervalDays を scheduler 側で算出し、全 store に同一値を配布する (Codex co-design 2026-05-05)
    • purge-deleted-customer-notes: 7 日 (CUS-002 §7)
    • cleanup-deleted-customers: 90 日 (CUS-002 AF-4)

code declaration(apps/worker/src/index.ts 内):

ジョブcron
expire-tentative-reservations*/5 * * * *(5 分ごと)
aggregate-daily-sales0 18 * * *(UTC 18:00 = JST 03:00)
cleanup-deleted-customers0 17 * * *(UTC 17:00 = JST 02:00)
cleanup-expired-operator-sessions0 16 * * *
cleanup-expired-operator-invitations0 16 * * *
purge-deleted-customer-notes0 15 * * *
  • expire-tentative-for-store: maxAttempts=10
  • aggregate-daily-sales-for-store: maxAttempts=25
  • cleanup-deleted-customers-for-store: maxAttempts=5
  • purge-deleted-customer-notes-for-store: maxAttempts=5
  • scheduler fan-out tasks: maxAttempts=3
  • 非冪等な job は Phase 1 に入れない
  • pino JSON log 必須
  • 各 job log に taskName, jobId, storeId, attempt を付与
  • 最終失敗は Sentry capture
  • queue lag は Phase 1 では /readyz と log 監視で代替

worker service は API と分離。runtime env:

  • WORKER_DATABASE_URLworker_runtime 接続文字列)
  • WORKER_CONCURRENCY
  • SENTRY_DSN

DATABASE_URL_MIGRATOR は migration job 専用、runtime に渡さない。

各 store-scoped task の handler は冒頭で:

async function handler(payload: { storeId: StoreId }, helpers) {
await helpers.withPgClient(async (client) => {
await client.query('BEGIN');
await client.query('SELECT set_config($1, $2, true)', [
'app.current_store_id',
payload.storeId,
]);
try {
// 実業務処理
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
}
});
}

SET LOCAL で tenant 分離を確保。


  • 平常時の job pickup は 5 秒以内(親 §6.1)
  • worker 起動後 10 秒以内に /readyz=200
  • graceful shutdown 30 秒以内
  • store fan-out 100 店舗規模で 1 分以内に enqueue 完了

  • store-scoped task は payload に storeId 必須
  • handler は SET LOCAL app.current_store_id 必須
  • system-scope scheduler task は tenant table を直接 mutate しない
  • worker_runtime は superuser 不可
  • graphile_worker schema ownership は migrator、runtime は実行専用

10. 受け入れ基準(Given-When-Then)

Section titled “10. 受け入れ基準(Given-When-Then)”
  • GWT-1 型安全 enqueue: enqueueJob('aggregate-daily-sales-for-store', {storeId, targetDate}) → compile 成功
  • GWT-2 payload 型破壊: storeId 欠落 payload → compile error
  • GWT-3 health/ready 分離: DB down で /healthz=200, /readyz=503
  • GWT-4 worker schema migrate: 空 DB で bun run db:worker:migrategraphile_worker schema 作成
  • GWT-5 fan-out enqueue: 3 店舗で expire-tentative-reservations 実行 → 3 件の expire-tentative-for-store enqueue
  • GWT-6 preserve dedupe: 同一 storeId で scheduler 再実行 → duplicate job 増えない
  • GWT-7 aggregate cron: JST 2026-05-06 03:00 で起動 → targetDate=2026-05-05 を集計
  • GWT-8 cleanup cron: deleted_at < now()-90d の customer → 物理削除
  • GWT-9 retry: transient DB failure → configured attempts まで retry
  • GWT-10 final failure alert: retry 上限超過 → Sentry event 1 件
  • GWT-11 runtime role 分離: worker runtime → worker_runtime で接続、DATABASE_URL_MIGRATOR は使わない
  • GWT-12 smoke: addJob('ping') → pickup される(既存 smoke 保持)
  • GWT-13 SET LOCAL: store-scoped task が tenant 分離を効かせていることを RLS テストで検証

  • compile-time: enqueueJob 型テスト
  • integration: runOnce、fan-out、preserve dedupe、tenant 分離
  • runtime: /healthz, /readyz
  • ops: bootstrap + worker migrate の冪等性
  • regression: ping smoke 保持

Task種別cron起動元
expire-tentative-reservationssystem scheduler5 分RES-004
expire-tentative-for-storestore fan-out-scheduler
aggregate-daily-salessystem schedulerdaily UTC 18:00REG-002
aggregate-daily-sales-for-storestore fan-out-scheduler
cleanup-deleted-customerssystem schedulerdailyCUS-001
cleanup-deleted-customers-for-storestore fan-out-scheduler
purge-deleted-customer-notessystem schedulerdailyCUS-002
purge-deleted-customer-notes-for-storestore fan-out-scheduler
cleanup-expired-operator-sessionssystem schedulerdailyTEN-002
cleanup-expired-operator-invitationssystem schedulerdailyTEN-002
send-operator-invitation-emailstore-scopedon demandTEN-002 / TEN-003
pingsmoke--

#内容扱い
OQ-WRK-001-01graphile_worker metrics の Prometheus 化Phase 2
OQ-WRK-001-02scheduler task を graphile-worker 外へ逃す案Phase 3 検討
OQ-WRK-001-03失敗 job 可視化 UIPhase 2

VersionDateAuthorChange
0.12026-05-05Codex / yudai初版
0.22026-05-05yudai (with Codex co-design)Round 2 反映: worker_runtime ロール分離を infra/sql/bootstrap/002_worker_runtime_role.sql に固定、DATABASE_URL_MIGRATOR を runtime で使う運用を明示禁止、Phase 1 タスクカタログ確定(11 タスク)、SET LOCAL pattern を §7.7 に明記
0.32026-05-05yudai (with Codex co-design)Round 3 反映: system scheduler fan-out の RLS 設計を確定。専用 worker_fanout_owner (NOLOGIN BYPASSRLS) ロール + public.list_store_ids_for_fan_out() SECURITY DEFINER 関数で store 列挙を最小権限化。worker_runtime は store 直接 SELECT 不可。bootstrap 003_worker_fanout_owner_role.sql + migration 0021_worker_fanout_helpers.sql で実装