Skip to content

Commit

Permalink
feat: support webhook retries
Browse files Browse the repository at this point in the history
  • Loading branch information
sweatybridge committed Aug 21, 2024
1 parent 1fd3dc3 commit e6c0474
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 89 deletions.
126 changes: 77 additions & 49 deletions migrations/db/migrations/20240822021428_enable_webhooks_by_default.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,106 +4,136 @@
CREATE EXTENSION IF NOT EXISTS pg_net SCHEMA extensions;

-- Create supabase_functions schema
CREATE SCHEMA supabase_functions AUTHORIZATION supabase_admin;
CREATE SCHEMA IF NOT EXISTS supabase_functions AUTHORIZATION supabase_admin;

GRANT USAGE ON SCHEMA supabase_functions TO postgres, anon, authenticated, service_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA supabase_functions GRANT ALL ON TABLES TO postgres, anon, authenticated, service_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA supabase_functions GRANT ALL ON FUNCTIONS TO postgres, anon, authenticated, service_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA supabase_functions GRANT ALL ON SEQUENCES TO postgres, anon, authenticated, service_role;

-- supabase_functions.migrations definition
CREATE TABLE supabase_functions.migrations (
CREATE TABLE IF NOT EXISTS supabase_functions.migrations (
version text PRIMARY KEY,
inserted_at timestamptz NOT NULL DEFAULT NOW()
);

-- Initial supabase_functions migration
INSERT INTO supabase_functions.migrations (version) VALUES ('initial');
INSERT INTO supabase_functions.migrations (version) VALUES
('initial'),
('20210809183423_update_grants'),
('20240125163000_add_retry_to_http_request')
ON CONFLICT DO NOTHING;

-- supabase_functions.hooks definition
CREATE TABLE supabase_functions.hooks (
CREATE TABLE IF NOT EXISTS supabase_functions.hooks (
id bigserial PRIMARY KEY,
hook_table_id integer NOT NULL,
hook_name text NOT NULL,
created_at timestamptz NOT NULL DEFAULT NOW(),
request_id bigint
);
CREATE INDEX supabase_functions_hooks_request_id_idx ON supabase_functions.hooks USING btree (request_id);
CREATE INDEX supabase_functions_hooks_h_table_id_h_name_idx ON supabase_functions.hooks USING btree (hook_table_id, hook_name);
CREATE INDEX IF NOT EXISTS supabase_functions_hooks_request_id_idx ON supabase_functions.hooks USING btree (request_id);
CREATE INDEX IF NOT EXISTS supabase_functions_hooks_h_table_id_h_name_idx ON supabase_functions.hooks USING btree (hook_table_id, hook_name);
COMMENT ON TABLE supabase_functions.hooks IS 'Supabase Functions Hooks: Audit trail for triggered hooks.';

CREATE FUNCTION supabase_functions.http_request()
CREATE OR REPLACE FUNCTION supabase_functions.http_request()
RETURNS trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path TO 'supabase_functions'
AS $function$
DECLARE
request_id bigint;
local_request_id bigint;
payload jsonb;
url text := TG_ARGV[0]::text;
method text := TG_ARGV[1]::text;
headers jsonb DEFAULT '{}'::jsonb;
params jsonb DEFAULT '{}'::jsonb;
timeout_ms integer DEFAULT 1000;
timeout_ms integer;
retry_count integer DEFAULT 0;
max_retries integer := COALESCE(TG_ARGV[5]::integer, 0);
succeeded boolean := FALSE;
retry_delays double precision[] := ARRAY[0, 0.250, 0.500, 1.000, 2.500, 5.000];
status_code integer := 0;
BEGIN
IF url IS NULL OR url = 'null' THEN
RAISE EXCEPTION 'url argument is missing';
END IF;

IF method IS NULL OR method = 'null' THEN
RAISE EXCEPTION 'method argument is missing';
END IF;

IF TG_ARGV[2] IS NULL OR TG_ARGV[2] = 'null' THEN
headers = '{"Content-Type": "application/json"}'::jsonb;
ELSE
headers = TG_ARGV[2]::jsonb;
END IF;

IF TG_ARGV[3] IS NULL OR TG_ARGV[3] = 'null' THEN
params = '{}'::jsonb;
ELSE
params = TG_ARGV[3]::jsonb;
END IF;

IF TG_ARGV[4] IS NULL OR TG_ARGV[4] = 'null' THEN
timeout_ms = 1000;
ELSE
IF TG_ARGV[4] IS NOT NULL OR TG_ARGV[4] <> 'null' THEN
timeout_ms = TG_ARGV[4]::integer;
END IF;

CASE
WHEN method = 'GET' THEN
SELECT http_get INTO request_id FROM net.http_get(
url,
params,
headers,
timeout_ms
);
WHEN method = 'POST' THEN
payload = jsonb_build_object(
'old_record', OLD,
'record', NEW,
'type', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA
);

SELECT http_post INTO request_id FROM net.http_post(
url,
payload,
params,
headers,
timeout_ms
);
ELSE
RAISE EXCEPTION 'method argument % is invalid', method;
END CASE;

-- Retry loop
WHILE NOT succeeded AND retry_count <= max_retries LOOP
PERFORM pg_sleep(retry_delays[retry_count + 1]);
IF retry_delays[retry_count + 1] > 0 THEN
RAISE WARNING 'Retrying HTTP request: {retry_attempt: %, url: "%", timeout_ms: %, retry_delay_ms: %}',
retry_count, url, timeout_ms, retry_delays[retry_count + 1] * 1000;
END IF;
retry_count := retry_count + 1;
BEGIN
CASE
WHEN method = 'GET' THEN
SELECT http_get INTO local_request_id FROM net.http_get(
url,
params,
headers,
timeout_ms
);
WHEN method = 'POST' THEN
payload = jsonb_build_object(
'old_record', OLD,
'record', NEW,
'type', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA
);
SELECT http_post INTO local_request_id FROM net.http_post(
url,
payload,
params,
headers,
timeout_ms
);
ELSE
RAISE EXCEPTION 'method argument % is invalid', method;
END CASE;
IF local_request_id IS NOT NULL THEN
SELECT (response).status_code::integer
INTO status_code
FROM net._http_collect_response(local_request_id);
IF status_code < 500 THEN
succeeded := TRUE;
END IF;
END IF;
-- Exit loop on successful request
EXIT WHEN succeeded;
EXCEPTION
WHEN OTHERS THEN
IF retry_count > max_retries THEN
-- If retries exhausted, re-raise exception
RAISE EXCEPTION 'HTTP request failed after % retries. SQL Error: { %, % }',
max_retries, SQLERRM, SQLSTATE;
END IF;
END;
END LOOP;
-- Failed retries are not logged
INSERT INTO supabase_functions.hooks
(hook_table_id, hook_name, request_id)
VALUES
(TG_RELID, TG_NAME, request_id);

(TG_RELID, TG_NAME, local_request_id);
RETURN NEW;
END
$function$;
Expand Down Expand Up @@ -231,8 +261,6 @@ BEGIN
END
$$;

INSERT INTO supabase_functions.migrations (version) VALUES ('20210809183423_update_grants');

ALTER function supabase_functions.http_request() SECURITY DEFINER;
ALTER function supabase_functions.http_request() SET search_path = supabase_functions;
REVOKE ALL ON FUNCTION supabase_functions.http_request() FROM PUBLIC;
Expand Down
104 changes: 64 additions & 40 deletions migrations/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -604,73 +604,97 @@ CREATE FUNCTION supabase_functions.http_request() RETURNS trigger
SET search_path TO 'supabase_functions'
AS $$
DECLARE
request_id bigint;
local_request_id bigint;
payload jsonb;
url text := TG_ARGV[0]::text;
method text := TG_ARGV[1]::text;
headers jsonb DEFAULT '{}'::jsonb;
params jsonb DEFAULT '{}'::jsonb;
timeout_ms integer DEFAULT 1000;
timeout_ms integer;
retry_count integer DEFAULT 0;
max_retries integer := COALESCE(TG_ARGV[5]::integer, 0);
succeeded boolean := FALSE;
retry_delays double precision[] := ARRAY[0, 0.250, 0.500, 1.000, 2.500, 5.000];
status_code integer := 0;
BEGIN
IF url IS NULL OR url = 'null' THEN
RAISE EXCEPTION 'url argument is missing';
END IF;

IF method IS NULL OR method = 'null' THEN
RAISE EXCEPTION 'method argument is missing';
END IF;

IF TG_ARGV[2] IS NULL OR TG_ARGV[2] = 'null' THEN
headers = '{"Content-Type": "application/json"}'::jsonb;
ELSE
headers = TG_ARGV[2]::jsonb;
END IF;

IF TG_ARGV[3] IS NULL OR TG_ARGV[3] = 'null' THEN
params = '{}'::jsonb;
ELSE
params = TG_ARGV[3]::jsonb;
END IF;

IF TG_ARGV[4] IS NULL OR TG_ARGV[4] = 'null' THEN
timeout_ms = 1000;
ELSE
IF TG_ARGV[4] IS NOT NULL OR TG_ARGV[4] <> 'null' THEN
timeout_ms = TG_ARGV[4]::integer;
END IF;

CASE
WHEN method = 'GET' THEN
SELECT http_get INTO request_id FROM net.http_get(
url,
params,
headers,
timeout_ms
);
WHEN method = 'POST' THEN
payload = jsonb_build_object(
'old_record', OLD,
'record', NEW,
'type', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA
);

SELECT http_post INTO request_id FROM net.http_post(
url,
payload,
params,
headers,
timeout_ms
);
ELSE
RAISE EXCEPTION 'method argument % is invalid', method;
END CASE;

-- Retry loop
WHILE NOT succeeded AND retry_count <= max_retries LOOP
PERFORM pg_sleep(retry_delays[retry_count + 1]);
IF retry_delays[retry_count + 1] > 0 THEN
RAISE WARNING 'Retrying HTTP request: {retry_attempt: %, url: "%", timeout_ms: %, retry_delay_ms: %}',
retry_count, url, timeout_ms, retry_delays[retry_count + 1] * 1000;
END IF;
retry_count := retry_count + 1;
BEGIN
CASE
WHEN method = 'GET' THEN
SELECT http_get INTO local_request_id FROM net.http_get(
url,
params,
headers,
timeout_ms
);
WHEN method = 'POST' THEN
payload = jsonb_build_object(
'old_record', OLD,
'record', NEW,
'type', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA
);
SELECT http_post INTO local_request_id FROM net.http_post(
url,
payload,
params,
headers,
timeout_ms
);
ELSE
RAISE EXCEPTION 'method argument % is invalid', method;
END CASE;
IF local_request_id IS NOT NULL THEN
SELECT (response).status_code::integer
INTO status_code
FROM net._http_collect_response(local_request_id);
IF status_code < 500 THEN
succeeded := TRUE;
END IF;
END IF;
-- Exit loop on successful request
EXIT WHEN succeeded;
EXCEPTION
WHEN OTHERS THEN
IF retry_count > max_retries THEN
-- If retries exhausted, re-raise exception
RAISE EXCEPTION 'HTTP request failed after % retries. SQL Error: { %, % }',
max_retries, SQLERRM, SQLSTATE;
END IF;
END;
END LOOP;
-- Failed retries are not logged
INSERT INTO supabase_functions.hooks
(hook_table_id, hook_name, request_id)
VALUES
(TG_RELID, TG_NAME, request_id);

(TG_RELID, TG_NAME, local_request_id);
RETURN NEW;
END
$$;
Expand Down

0 comments on commit e6c0474

Please sign in to comment.