Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve schedule metrics #179

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object Dependencies {
}

object Kamon {
private val version = "2.5.1"
private val version = "2.5.10"
val core = "io.kamon" %% "kamon-core" % version
val akka = "io.kamon" %% "kamon-akka" % version
val prometheus = "io.kamon" %% "kamon-prometheus" % version
Expand Down
6 changes: 4 additions & 2 deletions scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.scaladsl.Consumer.Control
import com.sky.kms.actors._
import com.sky.kms.config.Configured
import com.sky.kms.monitoring.ScheduleGauge
import com.sky.kms.streams.{ScheduleReader, ScheduledMessagePublisher}
import kamon.Kamon
import kamon.jmx.collector.KamonJmxMetricCollector
Expand All @@ -21,8 +22,9 @@ object SchedulerApp {
case class Running(reader: ScheduleReader.Running[Future[Control]], publisher: ScheduledMessagePublisher.Running)

def configure(implicit system: ActorSystem): Configured[SchedulerApp] = {
val publisherActor = PublisherActor.create
val schedulingActor = SchedulingActor.create(publisherActor)
val scheduleGauge = ScheduleGauge.kamon()
val publisherActor = PublisherActor.create(scheduleGauge)
val schedulingActor = SchedulingActor.create(publisherActor, scheduleGauge)
TerminatorActor.create(schedulingActor, publisherActor)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import com.sky.kms.Start
import com.sky.kms.actors.PublisherActor.{DownstreamFailure, Init, ScheduleQueue, Trigger}
import com.sky.kms.domain.PublishableMessage.ScheduledMessage
import com.sky.kms.domain.{ScheduleEvent, ScheduleId, ScheduleQueueOfferResult}
import com.sky.kms.monitoring.ScheduleGauge

import scala.util.{Failure, Success}

class PublisherActor extends Actor with ActorLogging {
class PublisherActor(scheduleGauge: ScheduleGauge) extends Actor with ActorLogging {

implicit val ec = context.dispatcher

Expand All @@ -25,8 +26,10 @@ class PublisherActor extends Actor with ActorLogging {
private def receiveWithQueue(queue: ScheduleQueue): Receive = { case Trigger(scheduleId, schedule) =>
queue.offer((scheduleId, messageFrom(schedule))) onComplete {
case Success(QueueOfferResult.Enqueued) =>
scheduleGauge.onDelete()
log.debug(ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Enqueued).show)
case Success(res) =>
scheduleGauge.onDelete()
log.warning(ScheduleQueueOfferResult(scheduleId, res).show)
case Failure(t) =>
log.error(t, s"Failed to enqueue $scheduleId")
Expand All @@ -53,8 +56,8 @@ object PublisherActor {

case class DownstreamFailure(t: Throwable)

def create(implicit system: ActorSystem): ActorRef =
system.actorOf(Props[PublisherActor](), "publisher-actor")
def create(scheduleGauge: ScheduleGauge)(implicit system: ActorSystem): ActorRef =
system.actorOf(Props(new PublisherActor(scheduleGauge)), "publisher-actor")

def init(queue: ScheduleQueue): Start[Unit] =
Start(_.publisherActor ! Init(queue))
Expand Down
19 changes: 12 additions & 7 deletions scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import monix.execution.{Cancelable, Scheduler => MonixScheduler}

import scala.collection.mutable

class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monitoring: Monitoring)
class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, scheduleGauge: ScheduleGauge)
extends Actor
with ActorLogging {

Expand All @@ -29,7 +29,7 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit
val finishInitialisation: Receive = { case Initialised =>
log.debug("State initialised - scheduling stored schedules")
val scheduled = schedules.map { case (scheduleId, schedule) =>
monitoring.scheduleReceived()
scheduleGauge.onUpdate()
scheduleId -> scheduleOnce(scheduleId, schedule)
}
log.info("Reloaded state has been scheduled")
Expand All @@ -45,19 +45,24 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit

val handleSchedulingMessage: Receive = {
case CreateOrUpdate(scheduleId: ScheduleId, schedule: ScheduleEvent) =>
scheduled.get(scheduleId).foreach(_.cancel())
scheduled.get(scheduleId).foreach { schedule =>
schedule.cancel()
scheduleGauge.onDelete()
log.info(s"Cancelled and updated $scheduleId")
}

val cancellable = scheduleOnce(scheduleId, schedule)
log.info(
s"Scheduled $scheduleId from ${schedule.inputTopic} to ${schedule.outputTopic} in ${schedule.delay.toMillis} millis"
)

monitoring.scheduleReceived()
scheduled += (scheduleId -> cancellable)
scheduleGauge.onUpdate()

case Cancel(scheduleId: String) =>
scheduled.get(scheduleId).foreach { schedule =>
schedule.cancel()
monitoring.scheduleDone()
scheduleGauge.onDelete()
log.info(s"Cancelled $scheduleId")
}
scheduled -= scheduleId
Expand Down Expand Up @@ -98,9 +103,9 @@ object SchedulingActor {

case class UpstreamFailure(t: Throwable)

def create(publisherActor: ActorRef)(implicit system: ActorSystem): ActorRef =
def create(publisherActor: ActorRef, scheduleGauge: ScheduleGauge)(implicit system: ActorSystem): ActorRef =
system.actorOf(
Props(new SchedulingActor(publisherActor, MonixScheduler(system.dispatcher), new KamonMonitoring())),
Props(new SchedulingActor(publisherActor, MonixScheduler(system.dispatcher), scheduleGauge)),
"scheduling-actor"
)
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.sky.kms.monitoring

import kamon.Kamon

trait ScheduleGauge {

def onUpdate(): Unit

def onDelete(): Unit

}

object ScheduleGauge {
def kamon(): ScheduleGauge = new ScheduleGauge {

private val gauge = Kamon.gauge("scheduler-messages").withTag("status", "scheduled")

override def onUpdate(): Unit = gauge.increment()

override def onDelete(): Unit = gauge.decrement()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package com.sky.kms.unit

import java.util.UUID

import akka.stream.QueueOfferResult
import akka.testkit.TestActorRef
import com.sky.kms.actors.PublisherActor
import com.sky.kms.actors.PublisherActor._
import com.sky.kms.base.AkkaSpecBase
import com.sky.kms.domain.{ScheduleEvent, ScheduleId}
import com.sky.kms.utils.MockGauge
import com.sky.kms.utils.TestDataUtils._
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar
Expand Down Expand Up @@ -53,11 +55,37 @@ class PublisherActorSpec extends AkkaSpecBase with MockitoSugar {
expectTerminated(publisherActor)
}

"decrement schedule counter when message is queued" in new TestContext {
val (scheduleId, schedule) = generateSchedule

when(mockSourceQueue.offer((scheduleId, schedule.toScheduledMessage)))
.thenReturn(Future.successful(QueueOfferResult.Enqueued))

publisherActor ! Trigger(scheduleId, schedule)

eventually {
mockGauge.counter.get() shouldBe -1L
}
}

"decrement schedule counter when message is dropped" in new TestContext {
val (scheduleId, schedule) = generateSchedule

when(mockSourceQueue.offer((scheduleId, schedule.toScheduledMessage)))
.thenReturn(Future.successful(QueueOfferResult.Dropped))

publisherActor ! Trigger(scheduleId, schedule)

eventually {
mockGauge.counter.get() shouldBe -1L
}
}
}

private class TestContext {
val mockSourceQueue = mock[ScheduleQueue]
val publisherActor = TestActorRef(new PublisherActor)
val mockGauge = new MockGauge()
val publisherActor = TestActorRef(new PublisherActor(mockGauge))

when(mockSourceQueue.watchCompletion()).thenReturn(Future.never)
publisherActor ! Init(mockSourceQueue)
Expand Down
45 changes: 33 additions & 12 deletions scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.sky.kms.actors.SchedulingActor
import com.sky.kms.actors.SchedulingActor._
import com.sky.kms.base.AkkaSpecBase
import com.sky.kms.domain._
import com.sky.kms.utils.SimpleCounterMonitoring
import com.sky.kms.utils.MockGauge
import com.sky.kms.utils.TestDataUtils._
import monix.execution.schedulers.TestScheduler
import org.scalatest.concurrent.Eventually
Expand Down Expand Up @@ -96,35 +96,60 @@ class SchedulingActorSpec extends AkkaSpecBase with ImplicitSender with MockitoS
expectTerminated(schedulingActor)
}

"update monitoring when new schedule is received" in new Initialised {
"increment schedule counter when new schedule is received" in new Initialised {
val (scheduleId, schedule) = generateSchedule

createSchedule(scheduleId, schedule)

eventually {
scheduleReceivedCounter shouldBe 1L
mockGauge.counter.get() shouldBe 1L
}
}

"update monitoring when a cancel message is received" in new Initialised {
"keep the schedule counter the same when updating a previous schedule" in new Initialised {
val (scheduleId, schedule) = generateSchedule
createSchedule(scheduleId, schedule)

val updatedSchedule = schedule.copy(delay = schedule.delay + 5.minutes)
createSchedule(scheduleId, updatedSchedule)

eventually {
mockGauge.counter.get() shouldBe 1L
}

advanceToTimeFrom(schedule)
probe.expectNoMessage(NoMsgTimeout)

advanceToTimeFrom(updatedSchedule)
probe.expectMsg(Trigger(scheduleId, updatedSchedule))

eventually {
mockGauge.counter.get() shouldBe 1L
}
}

"decrement schedule counter when a cancel message is received" in new Initialised {
val (scheduleId, schedule) = generateSchedule
createSchedule(scheduleId, schedule)

eventually {
mockGauge.counter.get() shouldBe 1L
}

cancelSchedule(scheduleId)

eventually {
scheduleDoneCounter shouldBe 1L
mockGauge.counter.get() shouldBe 0L
}
}
}

private class TestContext {

val monitoring = new SimpleCounterMonitoring()
val mockGauge = new MockGauge()
val testScheduler = TestScheduler()
val probe = TestProbe()
val schedulingActor = TestActorRef(new SchedulingActor(probe.ref, testScheduler, monitoring))
val now = System.currentTimeMillis()
val schedulingActor = TestActorRef(new SchedulingActor(probe.ref, testScheduler, mockGauge))

def advanceToTimeFrom(schedule: ScheduleEvent): Unit =
testScheduler.tick(schedule.delay)
Expand All @@ -138,10 +163,6 @@ class SchedulingActorSpec extends AkkaSpecBase with ImplicitSender with MockitoS
schedulingActor ! Cancel(scheduleId)
expectMsg(Ack)
}

def scheduleReceivedCounter: Long = monitoring.scheduleReceivedCounter.get()

def scheduleDoneCounter: Long = monitoring.scheduleDoneCounter.get()
}

private class Initialised extends TestContext {
Expand Down
15 changes: 15 additions & 0 deletions scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.sky.kms.utils

import java.util.concurrent.atomic.AtomicLong

import com.sky.kms.monitoring.ScheduleGauge

class MockGauge extends ScheduleGauge {

val counter: AtomicLong = new AtomicLong()

override def onUpdate(): Unit = counter.incrementAndGet()

override def onDelete(): Unit = counter.decrementAndGet()

}

This file was deleted.