Files
supabase/apps/studio/data/database-queues/database-queues-toggle-postgrest-mutation.ts
Ivan Vasilov 8b657165b5 chore: Migrate to use custom type for ReactQuery queries and mutations (#40073)
* Add custom types for queries, mutations and infinite queries.

* Migrate all queries to use the new type.

* Migrate all infinite queries to useCustomInfiniteQueryOptions.

* Migrate all mutations to use useCustomMutationOptions.

* Add type to all imports in `types` folder.
2025-11-03 13:18:13 +01:00

295 lines
9.5 KiB
TypeScript

import { useMutation, useQueryClient } from '@tanstack/react-query'
import minify from 'pg-minify'
import { toast } from 'sonner'
import { databaseKeys } from 'data/database/keys'
import { executeSql } from 'data/sql/execute-sql-query'
import { isGreaterThanOrEqual } from 'lib/semver'
import type { ResponseError, UseCustomMutationOptions } from 'types'
import { databaseQueuesKeys } from './keys'
export type DatabaseQueueExposePostgrestVariables = {
projectRef: string
enable: boolean
pgmqVersion: string
connectionString?: string | null
}
const CONDITIONAL_READ_SIGNATURE_PGMQ_VERSION = '1.5.0'
export const QUEUES_SCHEMA = 'pgmq_public'
const getExposeQueuesSQL = (pgmqVersion: string) => {
const conditionalJsonb = isGreaterThanOrEqual(
pgmqVersion,
CONDITIONAL_READ_SIGNATURE_PGMQ_VERSION
)
? `, conditional := '{}'::jsonb`
: ''
const jsonBArg = isGreaterThanOrEqual(pgmqVersion, CONDITIONAL_READ_SIGNATURE_PGMQ_VERSION)
? `, jsonb`
: ''
return minify(/* SQL */ `
create schema if not exists ${QUEUES_SCHEMA};
grant usage on schema ${QUEUES_SCHEMA} to postgres, anon, authenticated, service_role;
create or replace function ${QUEUES_SCHEMA}.pop(
queue_name text
)
returns setof pgmq.message_record
language plpgsql
set search_path = ''
as $$
begin
return query
select *
from pgmq.pop(
queue_name := queue_name
);
end;
$$;
comment on function ${QUEUES_SCHEMA}.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.';
create or replace function ${QUEUES_SCHEMA}.send(
queue_name text,
message jsonb,
sleep_seconds integer default 0 -- renamed from 'delay'
)
returns setof bigint
language plpgsql
set search_path = ''
as $$
begin
return query
select *
from pgmq.send(
queue_name := queue_name,
msg := message,
delay := sleep_seconds
);
end;
$$;
comment on function ${QUEUES_SCHEMA}.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.';
create or replace function ${QUEUES_SCHEMA}.send_batch(
queue_name text,
messages jsonb[],
sleep_seconds integer default 0 -- renamed from 'delay'
)
returns setof bigint
language plpgsql
set search_path = ''
as $$
begin
return query
select *
from pgmq.send_batch(
queue_name := queue_name,
msgs := messages,
delay := sleep_seconds
);
end;
$$;
comment on function ${QUEUES_SCHEMA}.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.';
create or replace function ${QUEUES_SCHEMA}.archive(
queue_name text,
message_id bigint
)
returns boolean
language plpgsql
set search_path = ''
as $$
begin
return
pgmq.archive(
queue_name := queue_name,
msg_id := message_id
);
end;
$$;
comment on function ${QUEUES_SCHEMA}.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.';
create or replace function ${QUEUES_SCHEMA}.delete(
queue_name text,
message_id bigint
)
returns boolean
language plpgsql
set search_path = ''
as $$
begin
return
pgmq.delete(
queue_name := queue_name,
msg_id := message_id
);
end;
$$;
comment on function ${QUEUES_SCHEMA}.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.';
create or replace function ${QUEUES_SCHEMA}.read(
queue_name text,
sleep_seconds integer,
n integer
)
returns setof pgmq.message_record
language plpgsql
set search_path = ''
as $$
begin
return query
select *
from pgmq.read(
queue_name := queue_name,
vt := sleep_seconds,
qty := n ${conditionalJsonb}
);
end;
$$;
comment on function ${QUEUES_SCHEMA}.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).';
-- Grant execute permissions on wrapper functions to roles
grant execute on function ${QUEUES_SCHEMA}.pop(text) to postgres, service_role, anon, authenticated;
grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated;
grant execute on function ${QUEUES_SCHEMA}.send(text, jsonb, integer) to postgres, service_role, anon, authenticated;
grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated;
grant execute on function ${QUEUES_SCHEMA}.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated;
grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated;
grant execute on function ${QUEUES_SCHEMA}.archive(text, bigint) to postgres, service_role, anon, authenticated;
grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated;
grant execute on function ${QUEUES_SCHEMA}.delete(text, bigint) to postgres, service_role, anon, authenticated;
grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated;
grant execute on function ${QUEUES_SCHEMA}.read(text, integer, integer) to postgres, service_role, anon, authenticated;
grant execute on function pgmq.read(text, integer, integer ${jsonBArg}) to postgres, service_role, anon, authenticated;
-- For the service role, we want full access
-- Grant permissions on existing tables
grant all privileges on all tables in schema pgmq to postgres, service_role;
-- Ensure service_role has permissions on future tables
alter default privileges in schema pgmq grant all privileges on tables to postgres, service_role;
grant usage on schema pgmq to postgres, anon, authenticated, service_role;
/*
Grant access to sequences to API roles by default. Existing table permissions
continue to enforce insert restrictions. This is necessary to accommodate the
on-backup hook that rebuild queue table primary keys to avoid a pg_dump segfault.
This can be removed once logical backups are completely retired.
*/
grant usage, select, update
on all sequences in schema pgmq
to anon, authenticated, service_role;
alter default privileges in schema pgmq
grant usage, select, update
on sequences
to anon, authenticated, service_role;
`)
}
const HIDE_QUEUES_FROM_POSTGREST_SQL = minify(/* SQL */ `
drop function if exists
${QUEUES_SCHEMA}.pop(queue_name text),
${QUEUES_SCHEMA}.send(queue_name text, message jsonb, sleep_seconds integer),
${QUEUES_SCHEMA}.send_batch(queue_name text, message jsonb[], sleep_seconds integer),
${QUEUES_SCHEMA}.archive(queue_name text, message_id bigint),
${QUEUES_SCHEMA}.delete(queue_name text, message_id bigint),
${QUEUES_SCHEMA}.read(queue_name text, sleep integer, n integer)
;
-- Revoke execute permissions on inner pgmq functions to roles (inverse of enabling)
do $$
begin
if exists (select 1 from pg_namespace where nspname = 'pgmq') then
-- Revoke privileges on the schema itself
revoke all on schema pgmq from anon, authenticated, service_role;
-- Revoke default privileges for future objects
alter default privileges in schema pgmq revoke all on tables from anon, authenticated, service_role;
alter default privileges in schema pgmq revoke all on sequences from anon, authenticated, service_role;
alter default privileges in schema pgmq revoke all on functions from anon, authenticated, service_role;
end if;
end $$;
drop schema if exists ${QUEUES_SCHEMA};
`)
export async function toggleQueuesExposurePostgrest({
projectRef,
pgmqVersion,
connectionString,
enable,
}: DatabaseQueueExposePostgrestVariables) {
const sql = enable ? getExposeQueuesSQL(pgmqVersion) : HIDE_QUEUES_FROM_POSTGREST_SQL
const { result } = await executeSql({
projectRef,
connectionString,
sql,
queryKey: ['toggle-queues-exposure'],
})
return result
}
type DatabaseQueueExposePostgrestData = Awaited<ReturnType<typeof toggleQueuesExposurePostgrest>>
export const useDatabaseQueueToggleExposeMutation = ({
onSuccess,
onError,
...options
}: Omit<
UseCustomMutationOptions<
DatabaseQueueExposePostgrestData,
ResponseError,
DatabaseQueueExposePostgrestVariables
>,
'mutationFn'
> = {}) => {
const queryClient = useQueryClient()
return useMutation<
DatabaseQueueExposePostgrestData,
ResponseError,
DatabaseQueueExposePostgrestVariables
>({
mutationFn: (vars) => toggleQueuesExposurePostgrest(vars),
async onSuccess(data, variables, context) {
const { projectRef } = variables
await queryClient.invalidateQueries({
queryKey: databaseQueuesKeys.exposePostgrestStatus(projectRef),
})
// [Joshen] Schemas can be invalidated without waiting
queryClient.invalidateQueries({ queryKey: databaseKeys.schemas(projectRef) })
await onSuccess?.(data, variables, context)
},
async onError(data, variables, context) {
if (onError === undefined) {
toast.error(`Failed to toggle queue exposure via PostgREST: ${data.message}`)
} else {
onError(data, variables, context)
}
},
...options,
})
}