Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KV Feedback]: Jobs using KV and Queue with Limits #746

Open
1 task done
CodingXD opened this issue Nov 6, 2024 · 0 comments
Open
1 task done

[KV Feedback]: Jobs using KV and Queue with Limits #746

CodingXD opened this issue Nov 6, 2024 · 0 comments
Labels
kv Feedback about Deno KV

Comments

@CodingXD
Copy link

CodingXD commented Nov 6, 2024

🔍

  • Did you search for existing issues?

Type of feedback

Feature request

Description

Is there a way to send items to the queue while they only get processed based on a limit (like 10 or 15 at a time or concurrently)?

This is what I'm currently doing, but is there a better way or a native way to do this?

import { Hono } from "https://deno.land/x/hono/mod.ts";
import { z } from "https://deno.land/x/zod/mod.ts";
import { zValidator } from "npm:@hono/zod-validator";

const app = new Hono();
const kv = await Deno.openKv();
const MAX_JOBS = 3;

app.post(
  "/",
  zValidator(
    "json",
    z.object({
      id: z.number(),
      name: z.string(),
    }),
  ),
  async (c) => {
    const body = c.req.valid('json');
    await kv.enqueue(body);
    return c.json({ message: "Added to queue" });
  },
);

Deno.serve(app.fetch);

const stream = kv.watch([["activeJobs"]]);
for await (const entries of stream) {
  // check if we're already processing more than 3 jobs
  const activeJobs = (entries[0].value as number) || 0;
  if (activeJobs <= MAX_JOBS) {
    // if not, listen to queue for more jobs
    console.log("listening for jobs, active jobs:", activeJobs);
    kv.listenQueue(async (msg) => {
      // once you get a job, increment the activeJobs by 1
      console.log("msg:", msg);
      await kv.atomic().sum(["activeJobs"], 1n).commit();

      // will run some logic and once done, reduce the activejobs count
      // to accept another job from queue
      setTimeout(async () => {
        await kv.atomic().sum(["activeJobs"], -1n).commit();
      }, 4000);
    });
  }
}

Steps to reproduce (if applicable)

No response

Expected behavior (if applicable)

No response

Possible solution (if applicable)

No response

Additional context

No response

@CodingXD CodingXD added the kv Feedback about Deno KV label Nov 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kv Feedback about Deno KV
Projects
None yet
Development

No branches or pull requests

1 participant