Skip to content

Commit

Permalink
update: usage of new scheduler APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
calebpitan committed Nov 10, 2024
1 parent db6ad29 commit 9ab801d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
10 changes: 10 additions & 0 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type { MasterMessageEventData, WorkerMessageEventData } from './types'
export interface SchedulerController {
add(schedule: TaskSchedule): void
add(schedule: TaskSchedule[]): void
update(schedule: TaskSchedule): void
remove(id: string): void
observer: Observable<MasterMessageEventData>
}

Expand All @@ -17,10 +19,18 @@ function msgFactory(msg: WorkerMessageEventData) {
export function SchedulerWorker(): SchedulerController {
const worker = new Worker(new URL('./worker.ts', import.meta.url), { type: 'module' })

worker.addEventListener('error', (error) => console.error(error))

const controller: SchedulerController = {
observer: undefined!,
add(schedule) {
worker.postMessage(msgFactory({ command: 'add', data: schedule }))
},
update(schedule) {
worker.postMessage(msgFactory({ command: 'update', data: schedule }))
},
remove(id) {
worker.postMessage(msgFactory({ command: 'drop', data: id }))
}
}

Expand Down
45 changes: 26 additions & 19 deletions src/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,39 +122,46 @@ function subDataFactory(id: string): MasterMessageEventData {
}

async function main() {
const runner = sch.get_scheduler_runner()
const scheduler = sch.get_scheduler()

self.addEventListener('message', async (msg: MessageEvent<WorkerMessageEventData>) => {
switch (msg.data.command) {
case 'add':
if (Array.isArray(msg.data.data)) {
msg.data.data.forEach((item) => {
!item.timestamp ? void 0 : scheduler.add_schedule(create_st_schedule(item))
})
case 'add': {
const data = Array.isArray(msg.data.data) ? msg.data.data : [msg.data.data]
data
.filter((v) => !!v.timestamp)
.map((v) => create_st_schedule(v))
.forEach((v) => scheduler.add_schedule(v))
break
}

break
}
case 'subscribe':
scheduler.subscribe((id: string) => self.postMessage(subDataFactory(id)))
break

if (!msg.data.data.timestamp) break
scheduler.add_schedule(create_st_schedule(msg.data.data))
case 'run':
await runner.run(scheduler)
break

case 'update':
case 'drop':
case 'drop_all':
if (msg.data.data.timestamp === null) break
await runner.update_scheduler_with(create_st_schedule(msg.data.data))
break

case 'abort':
scheduler.abort()
case 'drop': {
const data = Array.isArray(msg.data.data) ? msg.data.data : [msg.data.data]
await Promise.all(data.map(async (v) => runner.remove_from_scheduler(v)))
break
case 'run':
await scheduler.run()
}

case 'drop_all':
break
case 'subscribe':
scheduler.subscribe((st_schedule_id: string) =>
self.postMessage(subDataFactory(st_schedule_id))
)

case 'abort':
await runner.quit()
break

default:
// no default
}
Expand Down

0 comments on commit 9ab801d

Please sign in to comment.