Worker ランタイム基盤 (graphile-worker)
Worker ランタイム基盤 (graphile-worker)
Section titled “Worker ランタイム基盤 (graphile-worker)”Document ID: SRS-WRK-001 Parent: SRS-ROOT-001 v0.5 Version: 0.4 Status: Implemented Last Updated: 2026-05-06 Depends on: SRS-TEN-001 v0.4, SRS-TEN-002 v0.2 依存される: SRS-RES-004 v0.3, SRS-REG-002 v0.2, SRS-CUS-001 v0.2, Phase 1 全体
本書は SRS-ROOT-001 v0.5 に従う。graphile_worker.* schema は業務 ERD 外の運用 schema として扱い、Drizzle 管理に混ぜない。
ジョブ投入・cron・worker runtime を Phase 1 で実運用可能にする。現状の packages/jobs の ping smoke のみでは不足しており、以下を整備する:
- 型安全 enqueue (
enqueueJob<K>) - cron 宣言(code declaration)
- store fan-out 規律(system scheduler → store-scoped task)
- Sentry 連携
worker_runtimeロール分離 (bootstrap)/healthz,/readyz分離
DATABASE_URL_MIGRATOR を worker runtime で使う運用は禁止。
2. ユーザーストーリー
Section titled “2. ユーザーストーリー”- As a developer, I want to API から型安全に job を enqueue したい, so that payload 破壊を compile error にしたい
- As a manager, I want to 期限切れ仮予約や日次集計を自動処理したい
- As an operator, I want to worker 障害時に health/ready で即座に検知したい
3. ユースケース
Section titled “3. ユースケース”3.1 主シナリオ
Section titled “3.1 主シナリオ”- API または cron が
enqueueJob<K>()を呼ぶ - payload 型が
JobPayloadMap[K]で検証される - worker が
taskListから該当 handler を起動する - handler は
withTenant(storeId)でSET LOCAL app.current_store_idを適用する(store-scoped task のみ) - 成功で完了、失敗で retry、最終失敗で Sentry
3.2 代替フロー
Section titled “3.2 代替フロー”- system scheduler task は store 一覧を列挙し、store-scoped fan-out task を enqueue する(親 §7.1.7)
worker_runtimeはstoreテーブルへの cross-tenant SELECT 権を持たない- 列挙は SECURITY DEFINER 関数
public.list_store_ids_for_fan_out()経由のみで許可する - 関数の所有者は専用
worker_fanout_owner(NOLOGIN BYPASSRLS) ロール - 関数は引数なし・返り値は
store_id uuidのみで、汎用 cross-tenant API に育てない - system scheduler task は fan-out enqueue 以外の業務 mutation を行ってはならない
- graceful shutdown 時は新規取得を止め、実行中 job の完了を待つ
/readyzは DB ping +graphile_workerschema 可視性まで確認する
3.3 例外フロー
Section titled “3.3 例外フロー”- payload 不整合は compile error。runtime zod fallback でも弾く
- task 名未登録は enqueue 時に禁止
- DB 接続失敗時
/healthzは 200、/readyzは 503 - retry 上限超過時は Sentry に task 名・storeId・attempt を送る
4. UI仕様
Section titled “4. UI仕様”Phase 1 では operator 向け UI は持たない。運用は /healthz, /readyz, logs, Sentry のみ。
5. API仕様
Section titled “5. API仕様”5.1 公開面
Section titled “5.1 公開面”| 種別 | 名前 | 用途 |
|---|---|---|
| TS API | enqueueJob<K>() | 型安全 enqueue |
| Worker HTTP | GET /healthz | process liveness |
| Worker HTTP | GET /readyz | DB / schema readiness |
5.2 TypeScript 契約
Section titled “5.2 TypeScript 契約”type JobPayloadMap = { // system schedulers (storeId なし、fan-out のみ) 'expire-tentative-reservations': { reason: 'cron' }; 'aggregate-daily-sales': { targetDate: string; reason: 'cron' }; 'cleanup-deleted-customers': { reason: 'cron' }; 'cleanup-expired-operator-sessions': { reason: 'cron' }; 'cleanup-expired-operator-invitations': { reason: 'cron' }; 'purge-deleted-customer-notes': { reason: 'cron' };
// store-scoped task (storeId 必須) 'expire-tentative-for-store': { storeId: StoreId }; 'aggregate-daily-sales-for-store': { storeId: StoreId; targetDate: string }; 'cleanup-deleted-customers-for-store': { storeId: StoreId; cutoffIso: string }; 'purge-deleted-customer-notes-for-store': { storeId: StoreId; cutoffIso: string }; 'send-operator-invitation-email': { storeId: StoreId; invitationId: string };};
declare function enqueueJob<K extends keyof JobPayloadMap>( name: K, payload: JobPayloadMap[K], opts?: { jobKey?: string; jobKeyMode?: 'replace' | 'preserve_run_at' | 'unsafe_dedupe'; maxAttempts?: number; runAt?: Date; }): Promise<void>;5.3 エラー / 運用コード
Section titled “5.3 エラー / 運用コード”JOB.UNKNOWN_TASKJOB.PAYLOAD_INVALIDWORKER.NOT_READYWORKER.SCHEMA_MISSING
6. データモデル影響
Section titled “6. データモデル影響”6.1 スキーマ / ロール
Section titled “6.1 スキーマ / ロール”graphile_worker.*はbun run db:worker:migrateで管理する- 業務 schema migration は持たない
- cluster-scope role は Drizzle migration ではなく bootstrap で管理する
infra/sql/bootstrap/002_worker_runtime_role.sql
Section titled “infra/sql/bootstrap/002_worker_runtime_role.sql”-- worker_runtime ロール作成DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'worker_runtime') THEN CREATE ROLE worker_runtime LOGIN PASSWORD 'worker_runtime'; END IF;END $$;
GRANT CONNECT ON DATABASE salon_dev TO worker_runtime;GRANT USAGE ON SCHEMA public TO worker_runtime;GRANT USAGE ON SCHEMA graphile_worker TO worker_runtime;
-- graphile_worker schema へのフル権限GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA graphile_worker TO worker_runtime;GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA graphile_worker TO worker_runtime;ALTER DEFAULT PRIVILEGES IN SCHEMA graphile_worker GRANT ALL PRIVILEGES ON TABLES TO worker_runtime;ALTER DEFAULT PRIVILEGES IN SCHEMA graphile_worker GRANT ALL PRIVILEGES ON SEQUENCES TO worker_runtime;
-- public schema は app と同じ tenant-safe 経路のみ-- worker_runtime も tenant-scoped task 実行時は app と同等の RLS 適用を受けるGRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO worker_runtime;ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO worker_runtime;
-- 全テーブルで RLS が適用されるため、worker_runtime も SET LOCAL app.current_store_id 必須infra/sql/bootstrap/003_worker_fanout_owner_role.sql
Section titled “infra/sql/bootstrap/003_worker_fanout_owner_role.sql”system scheduler の store 列挙を SECURITY DEFINER 関数 1 本に閉じ込めるために、
専用 owner ロールを bootstrap で作る (Codex co-design Round 3)。
worker_runtime 自体には cross-tenant SELECT を与えない。
-- LOGIN 不可、worker_runtime / app への membership 付与禁止DO $$BEGIN IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'worker_fanout_owner') THEN CREATE ROLE worker_fanout_owner NOLOGIN BYPASSRLS; END IF;END$$;packages/db/migrations/0021_worker_fanout_helpers.sql
Section titled “packages/db/migrations/0021_worker_fanout_helpers.sql”GRANT USAGE ON SCHEMA public TO worker_fanout_owner;GRANT SELECT (id) ON TABLE public.store TO worker_fanout_owner;
CREATE OR REPLACE FUNCTION public.list_store_ids_for_fan_out()RETURNS TABLE (store_id uuid)LANGUAGE sqlSTABLESECURITY DEFINERSET search_path = pg_catalogAS $$ SELECT s.id FROM public.store AS s ORDER BY s.id$$;
ALTER FUNCTION public.list_store_ids_for_fan_out() OWNER TO worker_fanout_owner;REVOKE ALL ON FUNCTION public.list_store_ids_for_fan_out() FROM PUBLIC;GRANT EXECUTE ON FUNCTION public.list_store_ids_for_fan_out() TO worker_runtime;注意点:
db:bootstrap→db:migrateの順序を守る。逆順だと role 不在で migration 失敗- 関数は引数なし・返り値は
store_idのみ。汎用 cross-tenant API に育てない search_path = pg_catalog固定 +public.store完全修飾。未修飾名は hijack 余地
6.2 ディレクトリ構成
Section titled “6.2 ディレクトリ構成”packages/jobs/src/ index.ts # taskList export types.ts # JobPayloadMap enqueue.ts # enqueueJob<K> reservation/ expire-tentative-reservations.ts expire-tentative-for-store.ts register/ aggregate-daily-sales.ts aggregate-daily-sales-for-store.ts customer/ cleanup-deleted-customers.ts cleanup-deleted-customers-for-store.ts purge-deleted-customer-notes.ts purge-deleted-customer-notes-for-store.ts operator/ cleanup-expired-operator-sessions.ts cleanup-expired-operator-invitations.ts send-operator-invitation-email.ts _shared/ with-tenant-job.ts sentry.ts logger.ts7. 業務ルール
Section titled “7. 業務ルール”7.1 命名規約
Section titled “7.1 命名規約”- ファイル名: kebab-case verb-noun
- task 名: kebab-case verb-noun
- BC 識別は directory で行い、task 名に冗長 prefix を付けない
7.2 fan-out 規律(親 §7.1.7)
Section titled “7.2 fan-out 規律(親 §7.1.7)”- system scheduler task は store 一覧を列挙するだけ
- 業務 mutation は必ず
*-for-storetask で行う expire-tentative-for-storeのjob_keyはexpire_tentative:${storeId}+mode=preserve_run_ataggregate-daily-sales-for-storeのjob_keyはaggregate_daily_sales:${storeId}:${targetDate}+mode=preserve_run_atpurge-deleted-customer-notes-for-storeのjob_keyはpurge_customer_notes:${storeId}:${cutoffIso}+mode=preserve_run_atcleanup-deleted-customers-for-storeのjob_keyはcleanup_customers:${storeId}:${cutoffIso}+mode=preserve_run_atmode=preserve_run_atは graphile-worker の API 仕様に従い、同じjobKeyのジョブが queue 上に存在するなら新規追加せず既存の run_at を維持する (= 重複 enqueue を抑制)- retention 判定は JST 日付丸めではなく 絶対時刻
cutoffIso = now - intervalDaysを scheduler 側で算出し、全 store に同一値を配布する (Codex co-design 2026-05-05)purge-deleted-customer-notes: 7 日 (CUS-002 §7)cleanup-deleted-customers: 90 日 (CUS-002 AF-4)
7.3 cron 宣言
Section titled “7.3 cron 宣言”code declaration(apps/worker/src/index.ts 内):
| ジョブ | cron |
|---|---|
expire-tentative-reservations | */5 * * * *(5 分ごと) |
aggregate-daily-sales | 0 18 * * *(UTC 18:00 = JST 03:00) |
cleanup-deleted-customers | 0 17 * * *(UTC 17:00 = JST 02:00) |
cleanup-expired-operator-sessions | 0 16 * * * |
cleanup-expired-operator-invitations | 0 16 * * * |
purge-deleted-customer-notes | 0 15 * * * |
7.4 リトライ規約
Section titled “7.4 リトライ規約”expire-tentative-for-store:maxAttempts=10aggregate-daily-sales-for-store:maxAttempts=25cleanup-deleted-customers-for-store:maxAttempts=5purge-deleted-customer-notes-for-store:maxAttempts=5- scheduler fan-out tasks:
maxAttempts=3 - 非冪等な job は Phase 1 に入れない
7.5 observability
Section titled “7.5 observability”pinoJSON log 必須- 各 job log に
taskName,jobId,storeId,attemptを付与 - 最終失敗は Sentry capture
- queue lag は Phase 1 では
/readyzと log 監視で代替
7.6 docker-compose
Section titled “7.6 docker-compose”worker service は API と分離。runtime env:
WORKER_DATABASE_URL(worker_runtime接続文字列)WORKER_CONCURRENCYSENTRY_DSN
DATABASE_URL_MIGRATOR は migration job 専用、runtime に渡さない。
7.7 store-scoped task の SET LOCAL
Section titled “7.7 store-scoped task の SET LOCAL”各 store-scoped task の handler は冒頭で:
async function handler(payload: { storeId: StoreId }, helpers) { await helpers.withPgClient(async (client) => { await client.query('BEGIN'); await client.query('SELECT set_config($1, $2, true)', [ 'app.current_store_id', payload.storeId, ]); try { // 実業務処理 await client.query('COMMIT'); } catch (e) { await client.query('ROLLBACK'); throw e; } });}SET LOCAL で tenant 分離を確保。
8. 非機能要件
Section titled “8. 非機能要件”- 平常時の job pickup は 5 秒以内(親 §6.1)
- worker 起動後 10 秒以内に
/readyz=200 - graceful shutdown 30 秒以内
- store fan-out 100 店舗規模で 1 分以内に enqueue 完了
9. セキュリティ・認可
Section titled “9. セキュリティ・認可”- store-scoped task は payload に
storeId必須 - handler は
SET LOCAL app.current_store_id必須 - system-scope scheduler task は tenant table を直接 mutate しない
worker_runtimeは superuser 不可graphile_workerschema ownership は migrator、runtime は実行専用
10. 受け入れ基準(Given-When-Then)
Section titled “10. 受け入れ基準(Given-When-Then)”- GWT-1 型安全 enqueue:
enqueueJob('aggregate-daily-sales-for-store', {storeId, targetDate})→ compile 成功 - GWT-2 payload 型破壊:
storeId欠落 payload → compile error - GWT-3 health/ready 分離: DB down で
/healthz=200,/readyz=503 - GWT-4 worker schema migrate: 空 DB で
bun run db:worker:migrate→graphile_workerschema 作成 - GWT-5 fan-out enqueue: 3 店舗で
expire-tentative-reservations実行 → 3 件のexpire-tentative-for-storeenqueue - GWT-6 preserve dedupe: 同一 storeId で scheduler 再実行 → duplicate job 増えない
- GWT-7 aggregate cron: JST 2026-05-06 03:00 で起動 →
targetDate=2026-05-05を集計 - GWT-8 cleanup cron:
deleted_at < now()-90dの customer → 物理削除 - GWT-9 retry: transient DB failure → configured attempts まで retry
- GWT-10 final failure alert: retry 上限超過 → Sentry event 1 件
- GWT-11 runtime role 分離: worker runtime →
worker_runtimeで接続、DATABASE_URL_MIGRATORは使わない - GWT-12 smoke:
addJob('ping')→ pickup される(既存 smoke 保持) - GWT-13 SET LOCAL: store-scoped task が tenant 分離を効かせていることを RLS テストで検証
11. テスト計画
Section titled “11. テスト計画”- compile-time:
enqueueJob型テスト - integration:
runOnce、fan-out、preserve dedupe、tenant 分離 - runtime:
/healthz,/readyz - ops: bootstrap + worker migrate の冪等性
- regression:
pingsmoke 保持
12. Phase 1 タスクカタログ
Section titled “12. Phase 1 タスクカタログ”| Task | 種別 | cron | 起動元 |
|---|---|---|---|
expire-tentative-reservations | system scheduler | 5 分 | RES-004 |
expire-tentative-for-store | store fan-out | - | scheduler |
aggregate-daily-sales | system scheduler | daily UTC 18:00 | REG-002 |
aggregate-daily-sales-for-store | store fan-out | - | scheduler |
cleanup-deleted-customers | system scheduler | daily | CUS-001 |
cleanup-deleted-customers-for-store | store fan-out | - | scheduler |
purge-deleted-customer-notes | system scheduler | daily | CUS-002 |
purge-deleted-customer-notes-for-store | store fan-out | - | scheduler |
cleanup-expired-operator-sessions | system scheduler | daily | TEN-002 |
cleanup-expired-operator-invitations | system scheduler | daily | TEN-002 |
send-operator-invitation-email | store-scoped | on demand | TEN-002 / TEN-003 |
ping | smoke | - | - |
13. Open Questions
Section titled “13. Open Questions”| # | 内容 | 扱い |
|---|---|---|
| OQ-WRK-001-01 | graphile_worker metrics の Prometheus 化 | Phase 2 |
| OQ-WRK-001-02 | scheduler task を graphile-worker 外へ逃す案 | Phase 3 検討 |
| OQ-WRK-001-03 | 失敗 job 可視化 UI | Phase 2 |
14. 変更履歴
Section titled “14. 変更履歴”| Version | Date | Author | Change |
|---|---|---|---|
| 0.1 | 2026-05-05 | Codex / yudai | 初版 |
| 0.2 | 2026-05-05 | yudai (with Codex co-design) | Round 2 反映: worker_runtime ロール分離を infra/sql/bootstrap/002_worker_runtime_role.sql に固定、DATABASE_URL_MIGRATOR を runtime で使う運用を明示禁止、Phase 1 タスクカタログ確定(11 タスク)、SET LOCAL pattern を §7.7 に明記 |
| 0.3 | 2026-05-05 | yudai (with Codex co-design) | Round 3 反映: system scheduler fan-out の RLS 設計を確定。専用 worker_fanout_owner (NOLOGIN BYPASSRLS) ロール + public.list_store_ids_for_fan_out() SECURITY DEFINER 関数で store 列挙を最小権限化。worker_runtime は store 直接 SELECT 不可。bootstrap 003_worker_fanout_owner_role.sql + migration 0021_worker_fanout_helpers.sql で実装 |