Skip to content

Commit

Permalink
feat: XMTP service
Browse files Browse the repository at this point in the history
  • Loading branch information
karanpargal committed Sep 6, 2024
1 parent e3ec640 commit 068baef
Show file tree
Hide file tree
Showing 10 changed files with 1,081 additions and 19 deletions.
109 changes: 109 additions & 0 deletions packages/backend/microservices/xmtp/xmtp-broadcast.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { XmtpClientService } from "../../services";
import {
base64ToBytes,
addBroadcast,
startBroadcast,
} from "../../utils/functions";
import { broadcastEntities } from "../../utils/functions/start-broadcast";
import { invitation } from "@xmtp/proto";
import { broadCastConfigEntities } from "common/xmtp/brodcaster-config";

export const lookupAddress = async (
address: string,
broadcastAddress: string,
) => {
if (typeof address !== "string") {
throw new Error("Address must be a string");
}
if (typeof broadcastAddress !== "string") {
throw new Error("Broadcast address must be a string");
}
const client = await XmtpClientService.getXmtpClient(broadcastAddress);
if (!client) {
console.log("Client not initialized " + broadcastAddress);
throw new Error("Client not initialized");
}
const canMessage = await client.canMessage(address);
return {
onNetwork: canMessage,
};
};

export const subscribeBroadcast = async (
address: string,
broadcastAddress: string,
consentProofString: string,
) => {
if (typeof address !== "string") {
throw new Error("Address must be a string");
}

if (typeof broadcastAddress !== "string") {
throw new Error("Broadcast address must be a string");
}

if (typeof consentProofString !== "string") {
throw new Error("Consent proof must be a string");
}

const consentProofUint8Array = base64ToBytes(consentProof);

const client = await XmtpClientService.getXmtpClient(broadcastAddress);
const { greeting } = broadCastConfigEntities.map[broadcastAddress];
if (!client) {
console.log("Client not initialized " + broadcastAddress);
throw new Error("Client not initialized");
}
const consentProof = invitation.ConsentProofPayload.decode(
consentProofUint8Array,
);
console.log("Creating conversation with: ", {
consentProof,
});
const conversation = await client.conversations.newConversation(
address,
undefined,
consentProof,
);
console.log("Conversation created: ", conversation.topic);
await conversation.send(greeting);
return {
success: true,
topic: conversation.topic,
};
};

export const sendBroadcast = async (text: string, address: string) => {
if (typeof text !== "string") {
throw new Error("Text must be a string");
}
if (typeof address !== "string") {
throw new Error("Address must be a string");
}
const client = await XmtpClientService.getXmtpClient(address);
if (!client) {
throw new Error("Client not initialized");
}

const subscribers = getDevWalletAddresses();
const broadcastId = addBroadcast(client.address, subscribers, text);
startBroadcast(client, subscribers, text, broadcastId);

return {
success: true,
broadcastId,
};
};

export const getBroadcastLink = (broadcastId: string) => {
if (broadcastEntities.entities[broadcastId] === undefined) {
console.log(broadcastEntities.ids);
throw new Error("Broadcast not found");
}
const broadcast = broadcastEntities.entities[broadcastId];
return {
success: true,
broadcastId,
broadcast,
};
};
7 changes: 7 additions & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
"license": "ISC",
"dependencies": {
"@supabase/supabase-js": "^2.45.3",
"@xmtp/grpc-api-client": "^0.2.7",
"@xmtp/proto": "^3.68.0",
"@xmtp/redis-persistence": "^0.0.4",
"@xmtp/xmtp-js": "^12.1.0",
"common": "workspace:*",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
"ethers": "^6.13.2",
"express": "^4.19.2",
"ioredis": "^5.4.1",
"jsonwebtoken": "^9.0.2",
"proto": "link:@types/@xmtp/proto",
"socket.io": "^4.7.5",
"yup": "^1.4.0"
},
Expand All @@ -42,7 +47,9 @@
"@typescript-eslint/parser": "^8.0.1",
"eslint": "^9.8.0",
"eslint-plugin-prettier": "^5.2.1",
"grpc-api-client": "link:@types/@xmtp/grpc-api-client",
"prettier": "^3.3.3",
"redis-persistence": "link:@types/@xmtp/redis-persistence",
"ts-node": "^10.9.2",
"tsc-watch": "^6.2.0",
"typescript": "^5.5.4"
Expand Down
240 changes: 240 additions & 0 deletions packages/backend/services/broadcast.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import { type Conversation, type Client } from "@xmtp/xmtp-js";

type MessageType = string;
type MessagesType = MessageType[];

interface BroadcastOptions {
client: Client;
addresses: string[];
cachedCanMessageAddresses: string[];
messages: MessagesType;
rateLimitAmount?: number;
rateLimitTime?: number;

onBatchStart?: (addresses: string[]) => void;
onBatchComplete?: (addresses: string[]) => void;
onBroadcastComplete?: () => void;
onCantMessageAddress?: (address: string) => void;
onCanMessageAddreses?: (addresses: string[]) => void;
onMessageSending?: (address: string) => void;
onMessageFailed?: (address: string) => void;
onMessageSent?: (address: string) => void;
onCanMessageAddressesUpdate?: (addresses: string[]) => void;
onDelay?: (ms: number) => void;
}

const GENERAL_RATE_LIMIT = 10000;

export class Broadcast {
client: Client;
addresses: string[];
cachedCanMessageAddresses: Set<string>;
rateLimitAmount: number;
rateLimitTime: number;
batches: string[][] = [];
errorBatch: string[] = [];
conversationMapping: Map<string, Conversation> = new Map();
messages: MessagesType = [];

onBatchStart?: (addresses: string[]) => void;
onBatchComplete?: (addresses: string[]) => void;
onBroadcastComplete?: () => void;
onCantMessageAddress?: (address: string) => void;
onCanMessageAddreses?: (addresses: string[]) => void;
onMessageSending?: (address: string) => void;
onMessageFailed?: (address: string) => void;
onMessageSent?: (address: string) => void;
onCanMessageAddressesUpdate?: (addresses: string[]) => void;
onDelay?: (ms: number) => void;

constructor({
client,
addresses,
cachedCanMessageAddresses,
messages,
rateLimitAmount = 1000,
rateLimitTime = 1000 * 60 * 5,
onBatchStart,
onBatchComplete,
onBroadcastComplete,
onCantMessageAddress,
onCanMessageAddreses,
onMessageSending,
onMessageFailed,
onMessageSent,
onCanMessageAddressesUpdate,
onDelay,
}: BroadcastOptions) {
this.client = client;
this.addresses = addresses;
this.cachedCanMessageAddresses = new Set(cachedCanMessageAddresses);
this.messages = messages;
this.rateLimitAmount = rateLimitAmount;
this.rateLimitTime = rateLimitTime;
this.onBatchStart = onBatchStart;
this.onBatchComplete = onBatchComplete;
this.onBroadcastComplete = onBroadcastComplete;
this.onCantMessageAddress = onCantMessageAddress;
this.onCanMessageAddreses = onCanMessageAddreses;
this.onMessageSending = onMessageSending;
this.onMessageFailed = onMessageFailed;
this.onMessageSent = onMessageSent;
this.onCanMessageAddressesUpdate = onCanMessageAddressesUpdate;
this.onDelay = onDelay;
}

public broadcast = async () => {
const conversations = await this.client.conversations.list();
for (const conversation of conversations) {
this.conversationMapping.set(
conversation.peerAddress,
conversation,
);
}
console.log("delaying after list");
if (
conversations.length / 2 >
GENERAL_RATE_LIMIT - this.rateLimitAmount
) {
await this.delay(this.rateLimitTime);
}

this.batches = this.getBatches();
for (
let batchIndex = 0;
batchIndex < this.batches.length;
batchIndex++
) {
await this.handleBatch({
addresses: this.batches[batchIndex],
});
if (batchIndex !== this.batches.length - 1) {
await this.delay(this.rateLimitTime);
} else {
await this.sendErrorBatch();
}
}
this.onBroadcastComplete?.();
};

private handleBatch = async ({ addresses }: { addresses: string[] }) => {
this.onBatchStart?.(addresses);
await this.canMessageAddresses(
addresses,
this.onCanMessageAddressesUpdate,
);
for (const address of addresses) {
if (!this.cachedCanMessageAddresses.has(address)) {
this.onCantMessageAddress?.(address);
continue;
}
try {
let conversation = this.conversationMapping.get(address);
if (!conversation) {
conversation =
await this.client.conversations.newConversation(
address,
);
this.conversationMapping.set(address, conversation);
}

for (const message of this.messages) {
await conversation.send(message);
}
this.onMessageSent?.(address);
// Clear up some memory after we are done with the conversation
this.cachedCanMessageAddresses.delete(address);
this.conversationMapping.delete(address);
} catch (err) {
this.onMessageFailed?.(address);
this.errorBatch.push(address);
await this.delay(this.rateLimitTime);
}
}
this.onBatchComplete?.(addresses);
};

private sendErrorBatch = async () => {
if (this.errorBatch.length === 0) {
return;
}
const finalErrors: string[] = [];
for (const address of this.errorBatch) {
try {
const conversation =
await this.client.conversations.newConversation(address);
for (const message of this.messages) {
await conversation.send(message);
}
this.onMessageSent?.(address);
} catch (err) {
this.onMessageFailed?.(address);
this.errorBatch.push(address);
await this.delay(this.rateLimitTime);
}
}
this.errorBatch = finalErrors;
};

private canMessageAddresses = async (
addresses: string[],
onCanMessageAddressesUpdate?: (newAddresses: string[]) => void,
) => {
const unknownStateAddresses: string[] = [];
for (let i = 0; i < addresses.length; i++) {
if (!this.cachedCanMessageAddresses.has(addresses[i])) {
unknownStateAddresses.push(addresses[i]);
}
}
const canMessageAddresses = await this.client.canMessage(
unknownStateAddresses,
);
const newCanMessageAddresses: string[] = [];
for (let i = 0; i < addresses.length; i++) {
if (canMessageAddresses[i]) {
newCanMessageAddresses.push(addresses[i]);
this.cachedCanMessageAddresses.add(addresses[i]);
}
}
onCanMessageAddressesUpdate?.(newCanMessageAddresses);
};

private delay = async (ms: number) => {
this.onDelay?.(ms);
return new Promise<void>((resolve) => setTimeout(resolve, ms));
};

private getBatches = (): string[][] => {
let batch: string[] = [];
const batches: string[][] = [];
let batchCount = 0;
for (const address of this.addresses) {
let addressWeight = 0;
addressWeight += this.messages.length;
if (!this.conversationMapping.has(address)) {
addressWeight += 3;
} else {
addressWeight += 1;
}
const newBatchCount = batchCount + addressWeight;
if (newBatchCount === this.rateLimitAmount) {
batch.push(address);
batches.push(batch);
batch = [];
batchCount = 0;
} else if (newBatchCount > this.rateLimitAmount) {
batches.push(batch);
batch = [];
batch.push(address);
batchCount = addressWeight;
} else {
batch.push(address);
batchCount = newBatchCount;
}
}
if (batch.length > 0) {
batches.push(batch);
}
return batches;
};
}
1 change: 1 addition & 0 deletions packages/backend/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export { EthersService } from "./ethers.service";
export { RedisService } from "./redis.service";
export { SupabaseService } from "./supabase.service";
export { WSService } from "./ws.service";
export { XmtpClientService } from "./xmtp.service";
Loading

0 comments on commit 068baef

Please sign in to comment.