Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address balance listener worker #819

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "webhooks" ADD COLUMN "config" JSONB;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "configuration" ADD COLUMN "addressBalanceListenerCronSchedule" TEXT;
3 changes: 3 additions & 0 deletions src/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ model Configuration {
maxRetriesPerTx Int @map("maxRetriesPerTx")
// Contract Indexer Updates
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
// cron to track balance of address
addressBalanceListenerCronSchedule String? @map("addressBalanceListenerCronSchedule")
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
Expand Down Expand Up @@ -181,6 +183,7 @@ model Webhooks {
url String @map("url")
secret String @map("secret")
eventType String @map("evenType")
config Json? @map("config")
createdAt DateTime @default(now()) @map("createdAt")
updatedAt DateTime @updatedAt @map("updatedAt")
revokedAt DateTime? @map("revokedAt")
Expand Down
5 changes: 4 additions & 1 deletion src/server/routes/system/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type EngineFeature =
| "CONTRACT_SUBSCRIPTIONS"
| "IP_ALLOWLIST"
| "HETEROGENEOUS_WALLET_TYPES"
| "SMART_BACKEND_WALLETS";
| "SMART_BACKEND_WALLETS"
| "WEBHOOK_CONFIG";

const ReplySchemaOk = Type.Object({
status: Type.String(),
Expand All @@ -25,6 +26,7 @@ const ReplySchemaOk = Type.Object({
Type.Literal("IP_ALLOWLIST"),
Type.Literal("HETEROGENEOUS_WALLET_TYPES"),
Type.Literal("SMART_BACKEND_WALLETS"),
Type.Literal("WEBHOOK_CONFIG")
]),
),
clientId: Type.String(),
Expand Down Expand Up @@ -89,6 +91,7 @@ const getFeatures = (): EngineFeature[] => {
"HETEROGENEOUS_WALLET_TYPES",
"CONTRACT_SUBSCRIPTIONS",
"SMART_BACKEND_WALLETS",
"WEBHOOK_CONFIG",
];

if (env.ENABLE_KEYPAIR_AUTH) features.push("KEYPAIR_AUTH");
Expand Down
4 changes: 3 additions & 1 deletion src/server/routes/webhooks/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const requestBodySchema = Type.Object({
}),
),
eventType: Type.Enum(WebhooksEventTypes),
config: Type.Optional(Type.Any()),
});

requestBodySchema.examples = [
Expand Down Expand Up @@ -57,7 +58,7 @@ export async function createWebhookRoute(fastify: FastifyInstance) {
},
},
handler: async (req, res) => {
const { url, name, eventType } = req.body;
const { url, name, eventType, config } = req.body;

if (!isValidWebhookUrl(url)) {
throw createCustomError(
Expand All @@ -71,6 +72,7 @@ export async function createWebhookRoute(fastify: FastifyInstance) {
url,
name,
eventType,
config,
});

res.status(StatusCodes.OK).send({
Expand Down
2 changes: 2 additions & 0 deletions src/server/schemas/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const WebhookSchema = Type.Object({
eventType: Type.String(),
active: Type.Boolean(),
createdAt: Type.String(),
config: Type.Optional(Type.Any()),
});

export const toWebhookSchema = (
Expand All @@ -17,6 +18,7 @@ export const toWebhookSchema = (
url: webhook.url,
name: webhook.name,
eventType: webhook.eventType,
config: webhook.config,
secret: webhook.secret,
createdAt: webhook.createdAt.toISOString(),
active: !webhook.revokedAt,
Expand Down
4 changes: 4 additions & 0 deletions src/shared/db/configuration/get-configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ export const getConfiguration = async (): Promise<ParsedConfig> => {
config = await updateConfiguration({
indexerListenerCronSchedule: "*/5 * * * * *",
});
} else if (!config.addressBalanceListenerCronSchedule) {
config = await updateConfiguration({
addressBalanceListenerCronSchedule: "0 */5 * * * *",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 5 minutes a good poll interval?

});
}

const result = await toParsedConfig(config);
Expand Down
7 changes: 5 additions & 2 deletions src/shared/db/webhooks/create-webhook.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import type { Webhooks } from "@prisma/client";
import type { Prisma, Webhooks } from "@prisma/client";
import { createHash, randomBytes } from "crypto";
import type { WebhooksEventTypes } from "../../schemas/webhooks";
import { prisma } from "../client";

interface CreateWebhooksParams {
export interface CreateWebhooksParams {
url: string;
name?: string;
eventType: WebhooksEventTypes;
config?: Prisma.InputJsonValue | undefined;
}

export const insertWebhook = async ({
url,
name,
eventType,
config,
}: CreateWebhooksParams): Promise<Webhooks> => {
// generate random bytes
const bytes = randomBytes(4096);
Expand All @@ -25,6 +27,7 @@ export const insertWebhook = async ({
name,
eventType,
secret,
config,
},
});
};
4 changes: 4 additions & 0 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
newWebhooksListener,
updatedWebhooksListener,
} from "./listeners/webhook-listener";
import { addressBalanceListener } from "./listeners/address-balance-listener";
import { initCancelRecycledNoncesWorker } from "./tasks/cancel-recycled-nonces-worker";
import { initMineTransactionWorker } from "./tasks/mine-transaction-worker";
import { initNonceHealthCheckWorker } from "./tasks/nonce-health-check-worker";
Expand Down Expand Up @@ -40,4 +41,7 @@ export const initWorker = async () => {

// Contract subscriptions.
await chainIndexerListener();

// Notify address balance
await addressBalanceListener();
};
46 changes: 46 additions & 0 deletions src/worker/listeners/address-balance-listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import cron from "node-cron";
import { getConfig } from "../../shared/utils/cache/get-config";
import { logger } from "../../shared/utils/logger";
import { trackAddressBalance } from "../tasks/track-address-balance";

let isLocked = false;
let task: cron.ScheduledTask;

/**
* Tracks balance of address and calls webhook if reaches a threshold.
* Caveat: high chance of balance going under threshold for fast chains. Doesn't scale well for large amount of addresses.
*
* todo optimization: stream transactions via websocket and filter by from/to address for balance transfers.
*/
export const addressBalanceListener = async (): Promise<void> => {
const config = await getConfig();
if (!config.addressBalanceListenerCronSchedule) {
return;
}
if (task) {
task.stop();
}

task = cron.schedule(config.addressBalanceListenerCronSchedule, async () => {
if (!isLocked) {
isLocked = true;
try {
await trackAddressBalance();
} catch (e) {
logger({
service: "worker",
level: "warn",
message: "error on trackAddressBalance",
error: e,
});
}
isLocked = false;
} else {
logger({
service: "worker",
level: "warn",
message: "trackAddressBalance already running, skipping",
});
}
});
};
112 changes: 112 additions & 0 deletions src/worker/tasks/track-address-balance.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { Prisma } from "@prisma/client";
import { prisma } from "../../shared/db/client";
import { WebhooksEventTypes } from "../../shared/schemas/webhooks";
import { thirdwebClient } from "../../shared/utils/sdk";
import { getChain } from "../../shared/utils/chain";
import { SendWebhookQueue } from "../queues/send-webhook-queue";
import { logger } from "../../shared/utils/logger";
import { getWalletBalance } from "thirdweb/wallets";

type WebhookDetail = {
id: number;
name: string | null;
url: string;
secret: string;
eventType: string;
config: Prisma.JsonValue;
createdAt: Date;
updatedAt: Date;
revokedAt: Date | null;
};

type WebhookConfig = {
address: string;
chainId: number;
threshold: number;
};

export const trackAddressBalance = async () => {
const today = Date.now();
const oneDayAgo = today - 24 * 60 * 60 * 1000;

// returns new webhooks that have not been notified at all or was last notified 1 day ago
const webhookDetails = await prisma.webhooks.findMany({
where: {
eventType: WebhooksEventTypes.BACKEND_WALLET_BALANCE,
config: { path: ["address"], not: Prisma.AnyNull },
OR: [
{ config: { path: ["lastNotify"], equals: Prisma.AnyNull } },
{ config: { path: ["lastNotify"], lt: oneDayAgo } },
],
},
});

let promises = [];
let ids = [];
for (const webhookDetail of webhookDetails) {
const config = webhookDetail.config as WebhookConfig | null;
if (!config?.address) continue;
if (!config?.chainId) continue;

ids.push(webhookDetail.id);
promises.push(
_checkBalanceAndEnqueueWebhook(webhookDetail).catch((e) =>
logger({
service: "worker",
level: "warn",
message: `errored while _checkBalanceAndEnqueueWebhook for ${webhookDetail.id}`,
error: e,
}),
),
);

if (ids.length >= 10) {
await Promise.allSettled(promises);
await _updateLastNotify(ids, today);
promises = [];
ids = [];
}
}
if (ids.length) {
await Promise.allSettled(promises);
await _updateLastNotify(ids, today);
}
};

const _checkBalanceAndEnqueueWebhook = async (webhookDetail: WebhookDetail) => {
const { address, chainId, threshold } = webhookDetail.config as WebhookConfig;

// get native balance of address
const balanceData = await getWalletBalance({
client: thirdwebClient,
address,
chain: await getChain(chainId),
});
const currentBalance = balanceData.displayValue;

// dont do anything if has enough balance
if (Number.parseFloat(currentBalance) > threshold) return;

await SendWebhookQueue.enqueueWebhook({
type: WebhooksEventTypes.BACKEND_WALLET_BALANCE,
body: {
chainId,
walletAddress: address,
minimumBalance: threshold.toString(),
currentBalance: currentBalance,
message: `LowBalance: The address ${address} on chain ${chainId} has ${Number.parseFloat(
currentBalance,
)
.toFixed(2)
.toString()}/${threshold} gas remaining.`,
},
});
};

const _updateLastNotify = async (webhookIds: number[], time: number) => {
// using query as only want to update a single field in config json and not replace the entire object
await prisma.$executeRaw`
update webhooks
set config=jsonb_set(config, '{lastNotify}', ${time.toString()}::jsonb, true)
where id = any(array[${webhookIds}]);`;
};
Loading
Loading