55 lines
2.9 KiB
SQL
Executable File
55 lines
2.9 KiB
SQL
Executable File
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
|
CREATE EXTENSION IF NOT EXISTS pg_cron;
|
|
|
|
CREATE SCHEMA IF NOT EXISTS nuq;
|
|
|
|
DO $$ BEGIN
|
|
CREATE TYPE nuq.job_status AS ENUM ('queued', 'active', 'completed', 'failed');
|
|
EXCEPTION
|
|
WHEN duplicate_object THEN null;
|
|
END $$;
|
|
|
|
CREATE TABLE IF NOT EXISTS nuq.queue_scrape (
|
|
id uuid NOT NULL DEFAULT gen_random_uuid(),
|
|
status nuq.job_status NOT NULL DEFAULT 'queued'::nuq.job_status,
|
|
data jsonb,
|
|
created_at timestamp with time zone NOT NULL DEFAULT now(),
|
|
priority int NOT NULL DEFAULT 0,
|
|
lock uuid,
|
|
locked_at timestamp with time zone,
|
|
stalls integer,
|
|
finished_at timestamp with time zone,
|
|
returnvalue jsonb, -- only for selfhost
|
|
failedreason text, -- only for selfhost
|
|
CONSTRAINT queue_scrape_pkey PRIMARY KEY (id)
|
|
);
|
|
|
|
ALTER TABLE nuq.queue_scrape
|
|
SET (autovacuum_vacuum_scale_factor = 0.01,
|
|
autovacuum_analyze_scale_factor = 0.01,
|
|
autovacuum_vacuum_cost_limit = 2000,
|
|
autovacuum_vacuum_cost_delay = 2);
|
|
|
|
CREATE INDEX IF NOT EXISTS queue_scrape_active_locked_at_idx ON nuq.queue_scrape USING btree (locked_at) WHERE (status = 'active'::nuq.job_status);
|
|
CREATE INDEX IF NOT EXISTS nuq_queue_scrape_queued_optimal_2_idx ON nuq.queue_scrape (priority ASC, created_at ASC, id) WHERE (status = 'queued'::nuq.job_status);
|
|
CREATE INDEX IF NOT EXISTS nuq_queue_scrape_failed_created_at_idx ON nuq.queue_scrape USING btree (created_at) WHERE (status = 'failed'::nuq.job_status);
|
|
CREATE INDEX IF NOT EXISTS nuq_queue_scrape_completed_created_at_idx ON nuq.queue_scrape USING btree (created_at) WHERE (status = 'completed'::nuq.job_status);
|
|
|
|
SELECT cron.schedule('nuq_queue_scrape_clean_completed', '*/5 * * * *', $$
|
|
DELETE FROM nuq.queue_scrape WHERE nuq.queue_scrape.status = 'completed'::nuq.job_status AND nuq.queue_scrape.created_at < now() - interval '1 hour';
|
|
$$);
|
|
|
|
SELECT cron.schedule('nuq_queue_scrape_clean_failed', '*/5 * * * *', $$
|
|
DELETE FROM nuq.queue_scrape WHERE nuq.queue_scrape.status = 'failed'::nuq.job_status AND nuq.queue_scrape.created_at < now() - interval '6 hours';
|
|
$$);
|
|
|
|
SELECT cron.schedule('nuq_queue_scrape_lock_reaper', '15 seconds', $$
|
|
UPDATE nuq.queue_scrape SET status = 'queued'::nuq.job_status, lock = null, locked_at = null, stalls = COALESCE(stalls, 0) + 1 WHERE nuq.queue_scrape.locked_at <= now() - interval '1 minute' AND nuq.queue_scrape.status = 'active'::nuq.job_status AND COALESCE(nuq.queue_scrape.stalls, 0) < 9;
|
|
WITH stallfail AS (UPDATE nuq.queue_scrape SET status = 'failed'::nuq.job_status, lock = null, locked_at = null, stalls = COALESCE(stalls, 0) + 1 WHERE nuq.queue_scrape.locked_at <= now() - interval '1 minute' AND nuq.queue_scrape.status = 'active'::nuq.job_status AND COALESCE(nuq.queue_scrape.stalls, 0) >= 9 RETURNING id)
|
|
SELECT pg_notify('nuq.queue_scrape', (id::text || '|' || 'failed'::text)) FROM stallfail;
|
|
$$);
|
|
|
|
SELECT cron.schedule('nuq_queue_scrape_reindex', '0 9 * * *', $$
|
|
REINDEX TABLE CONCURRENTLY nuq.queue_scrape;
|
|
$$);
|