Files
swarm/compose/firecrawl/nuq.sql

55 lines
2.9 KiB
SQL
Executable File

CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE EXTENSION IF NOT EXISTS pg_cron;
CREATE SCHEMA IF NOT EXISTS nuq;
DO $$ BEGIN
CREATE TYPE nuq.job_status AS ENUM ('queued', 'active', 'completed', 'failed');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
CREATE TABLE IF NOT EXISTS nuq.queue_scrape (
id uuid NOT NULL DEFAULT gen_random_uuid(),
status nuq.job_status NOT NULL DEFAULT 'queued'::nuq.job_status,
data jsonb,
created_at timestamp with time zone NOT NULL DEFAULT now(),
priority int NOT NULL DEFAULT 0,
lock uuid,
locked_at timestamp with time zone,
stalls integer,
finished_at timestamp with time zone,
returnvalue jsonb, -- only for selfhost
failedreason text, -- only for selfhost
CONSTRAINT queue_scrape_pkey PRIMARY KEY (id)
);
ALTER TABLE nuq.queue_scrape
SET (autovacuum_vacuum_scale_factor = 0.01,
autovacuum_analyze_scale_factor = 0.01,
autovacuum_vacuum_cost_limit = 2000,
autovacuum_vacuum_cost_delay = 2);
CREATE INDEX IF NOT EXISTS queue_scrape_active_locked_at_idx ON nuq.queue_scrape USING btree (locked_at) WHERE (status = 'active'::nuq.job_status);
CREATE INDEX IF NOT EXISTS nuq_queue_scrape_queued_optimal_2_idx ON nuq.queue_scrape (priority ASC, created_at ASC, id) WHERE (status = 'queued'::nuq.job_status);
CREATE INDEX IF NOT EXISTS nuq_queue_scrape_failed_created_at_idx ON nuq.queue_scrape USING btree (created_at) WHERE (status = 'failed'::nuq.job_status);
CREATE INDEX IF NOT EXISTS nuq_queue_scrape_completed_created_at_idx ON nuq.queue_scrape USING btree (created_at) WHERE (status = 'completed'::nuq.job_status);
SELECT cron.schedule('nuq_queue_scrape_clean_completed', '*/5 * * * *', $$
DELETE FROM nuq.queue_scrape WHERE nuq.queue_scrape.status = 'completed'::nuq.job_status AND nuq.queue_scrape.created_at < now() - interval '1 hour';
$$);
SELECT cron.schedule('nuq_queue_scrape_clean_failed', '*/5 * * * *', $$
DELETE FROM nuq.queue_scrape WHERE nuq.queue_scrape.status = 'failed'::nuq.job_status AND nuq.queue_scrape.created_at < now() - interval '6 hours';
$$);
SELECT cron.schedule('nuq_queue_scrape_lock_reaper', '15 seconds', $$
UPDATE nuq.queue_scrape SET status = 'queued'::nuq.job_status, lock = null, locked_at = null, stalls = COALESCE(stalls, 0) + 1 WHERE nuq.queue_scrape.locked_at <= now() - interval '1 minute' AND nuq.queue_scrape.status = 'active'::nuq.job_status AND COALESCE(nuq.queue_scrape.stalls, 0) < 9;
WITH stallfail AS (UPDATE nuq.queue_scrape SET status = 'failed'::nuq.job_status, lock = null, locked_at = null, stalls = COALESCE(stalls, 0) + 1 WHERE nuq.queue_scrape.locked_at <= now() - interval '1 minute' AND nuq.queue_scrape.status = 'active'::nuq.job_status AND COALESCE(nuq.queue_scrape.stalls, 0) >= 9 RETURNING id)
SELECT pg_notify('nuq.queue_scrape', (id::text || '|' || 'failed'::text)) FROM stallfail;
$$);
SELECT cron.schedule('nuq_queue_scrape_reindex', '0 9 * * *', $$
REINDEX TABLE CONCURRENTLY nuq.queue_scrape;
$$);