Skip to content

Commit

Permalink
Custom Threadpool for gRPC calls (#110)
Browse files Browse the repository at this point in the history
* issue #108: Add custom dispatcher for handlers ✨
* issue #108: custom Akka gRPC client implementation
* issue #108: change package name
* issue #108: cleanup config file from unnecessary settings
* issue #108: remove lazy val for settings

Co-authored-by: zen <[email protected]>
  • Loading branch information
Tochemey and zen authored Oct 14, 2020
1 parent e58ec1d commit a81875a
Show file tree
Hide file tree
Showing 23 changed files with 924 additions and 253 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import play.grpc.gen.scaladsl.{PlayScalaClientCodeGenerator, PlayScalaServerCodeGenerator}
import play.grpc.gen.scaladsl.PlayScalaServerCodeGenerator

enablePlugins(DockerComposePlugin)
dockerImageCreationTask := (Docker / publishLocal in `chiefofstate`).value
Expand Down Expand Up @@ -42,7 +42,7 @@ lazy val protogen: Project = project
// Using Scala
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Scala),
akkaGrpcExtraGenerators in Compile += PlayScalaServerCodeGenerator,
akkaGrpcExtraGenerators in Compile += PlayScalaClientCodeGenerator,
akkaGrpcGeneratedSources := Seq(AkkaGrpc.Server),
akkaGrpcCodeGeneratorSettings += "server_power_apis",
akkaGrpcCodeGeneratorSettings := akkaGrpcCodeGeneratorSettings.value.filterNot(_ == "flat_package")
)
130 changes: 124 additions & 6 deletions code/service/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ akka {
deadline = infinite
override-authority = ""
user-agent = ""
# Pulls default configuration from ssl-config-core's reference.conf
ssl-config = ${ssl-config}
use-tls = false

creation {
Expand Down Expand Up @@ -140,8 +138,6 @@ akka {
deadline = infinite
override-authority = ""
user-agent = ""
# Pulls default configuration from ssl-config-core's reference.conf
ssl-config = ${ssl-config}
use-tls = false

creation {
Expand Down Expand Up @@ -211,7 +207,6 @@ akka {
}
}


lagompb {
service-name = "chiefofstate"
service-name = ${?COS_SERVICE_NAME}
Expand Down Expand Up @@ -257,7 +252,6 @@ lagompb {

}


chief-of-state {

# Helps switch on/off the readSide model
Expand All @@ -281,6 +275,130 @@ chief-of-state {
# example: "namely.org_units.OrgUnitTypeCreated", "namely.org_units.OrgUnitTypeUpdated"
events-protos = ""
events-protos = ${?HANDLER_SERVICE_EVENTS_PROTOS}
# custom dispatcher for command hander
# Since the command handler is blocking call it is recommended
# to use a custom dispatcher to handle this type of calls.
# The thread pool size should be fine tuned depending on the workload you’re expecting to run on this dispatcher.
# https://doc.akka.io/docs/akka/current/typed/dispatchers.html#solution-dedicated-dispatcher-for-blocking-operations
# https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf
writeside-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
# Keep alive time for threads
keep-alive-time = 60s

# Define a fixed thread pool size with this property. The corePoolSize
# and the maximumPoolSize of the ThreadPoolExecutor will be set to this
# value, if it is defined. Then the other pool-size properties will not
# be used.
#
# Valid values are: `off` or a positive integer.
fixed-pool-size = off

# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 8

# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0

# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 64

# Minimum number of threads to cap factor-based maximumPoolSize number to
max-pool-size-min = 8

# The max-pool-size-factor is used to determine maximumPoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor)
# The maximumPoolSize will not be less than corePoolSize.
# It is only used if using a bounded task queue.
max-pool-size-factor = 3.0

# Max number of threads to cap factor-based maximumPoolSize number to
max-pool-size-max = 64

# Specifies the bounded capacity of the task queue (< 1 == unbounded)
task-queue-size = -1

# Specifies which type of task queue will be used, can be "array" or
# "linked" (default)
task-queue-type = "linked"

# Allow core threads to time out
allow-core-timeout = on
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
# custom dispatcher for read hander
# Since the read handler is blocking call it is recommended
# to use a custom dispatcher to handle this type of calls.
# The thread pool size should be fine tuned depending on the workload you’re expecting to run on this dispatcher.
# https://doc.akka.io/docs/akka/current/typed/dispatchers.html#solution-dedicated-dispatcher-for-blocking-operations
# https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf
readside-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
# Keep alive time for threads
keep-alive-time = 60s

# Define a fixed thread pool size with this property. The corePoolSize
# and the maximumPoolSize of the ThreadPoolExecutor will be set to this
# value, if it is defined. Then the other pool-size properties will not
# be used.
#
# Valid values are: `off` or a positive integer.
fixed-pool-size = off

# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 8

# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0

# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 64

# Minimum number of threads to cap factor-based maximumPoolSize number to
max-pool-size-min = 8

# The max-pool-size-factor is used to determine maximumPoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor)
# The maximumPoolSize will not be less than corePoolSize.
# It is only used if using a bounded task queue.
max-pool-size-factor = 3.0

# Max number of threads to cap factor-based maximumPoolSize number to
max-pool-size-max = 64

# Specifies the bounded capacity of the task queue (< 1 == unbounded)
task-queue-size = -1

# Specifies which type of task queue will be used, can be "array" or
# "linked" (default)
task-queue-type = "linked"

# Allow core threads to time out
allow-core-timeout = on
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
}

send-command {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package com.namely.chiefofstate
import akka.actor.ActorSystem
import com.google.protobuf.empty.Empty
import com.typesafe.config.Config
import io.superflat.lagompb.{AggregateRoot, CommandHandler, EventHandler}
import io.superflat.lagompb.encryption.EncryptionAdapter
import scalapb.GeneratedMessageCompanion
import io.superflat.lagompb.{AggregateRoot, CommandHandler, EventHandler}

/**
* ChiefOfStateAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
package com.namely.chiefofstate

import akka.actor.ActorSystem
import akka.grpc.GrpcServiceException
import com.google.protobuf.any.Any
import com.namely.chiefofstate.config.HandlerSetting
import com.namely.chiefofstate.grpc.client.WriteSideHandlerServiceClient
import com.namely.protobuf.chiefofstate.v1.common.{MetaData => _}
import com.namely.protobuf.chiefofstate.v1.internal.RemoteCommand
import com.namely.protobuf.chiefofstate.v1.service.GetStateRequest
import com.namely.protobuf.chiefofstate.v1.writeside.{
HandleCommandRequest,
HandleCommandResponse,
WriteSideHandlerServiceClient
}
import com.namely.chiefofstate.config.HandlerSetting
import com.namely.protobuf.chiefofstate.v1.writeside.{HandleCommandRequest, HandleCommandResponse}
import io.grpc.{Status, StatusRuntimeException}
import io.superflat.lagompb.{CommandHandler, ProtosRegistry}
import io.superflat.lagompb.protobuf.v1.core._
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

/**
* ChiefOfStateCommandHandler
*
* @param actorSystem the actor system
* @param writeSideHandlerServiceClient the gRpcClient used to connect to the actual command handler
* @param handlerSetting the command handler setting
*/
class AggregateCommandHandler(
actorSystem: ActorSystem,
writeSideHandlerServiceClient: WriteSideHandlerServiceClient,
val writeSideHandlerServiceClient: WriteSideHandlerServiceClient,
handlerSetting: HandlerSetting
) extends CommandHandler {

Expand All @@ -42,23 +36,21 @@ class AggregateCommandHandler(
* entrypoint command handler that unpacks the command proto and calls
* the typed parameter
*
* @param command
* @param priorState
* @param priorEventMeta
* @return
* @param command the command to handle
* @param priorState the priorState before the handled command
* @param priorEventMeta the priorEventMeta before the handled command
* @return a command handler response
*/
final def handle(command: Any, priorState: Any, priorEventMeta: MetaData): Try[CommandHandlerResponse] = {
ProtosRegistry.unpackAny(command) match {
case Failure(exception) =>
Failure(exception)

case Success(innerCommand) =>
ProtosRegistry
.unpackAny(command)
.flatMap(innerCommand => {
handleTyped(
command = innerCommand,
priorState = priorState,
priorEventMeta = priorEventMeta
)
}
})
}

/**
Expand All @@ -67,7 +59,7 @@ class AggregateCommandHandler(
* @param command the actual command to handle
* @param priorState the priorState
* @param priorEventMeta the priorEventMeta
* @return
* @return a command handler response
*/
def handleTyped(command: scalapb.GeneratedMessage,
priorState: Any,
Expand Down Expand Up @@ -145,7 +137,7 @@ class AggregateCommandHandler(
)
.invoke(handleCommandRequest)

// await response and return
// await response and return since grpc calls do timeout
Await.result(futureResponse, Duration.Inf)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.namely.chiefofstate

import akka.actor.ActorSystem
import com.google.protobuf.any.Any
import com.namely.chiefofstate.config.HandlerSetting
import com.namely.protobuf.chiefofstate.v1.writeside.{
HandleEventRequest,
HandleEventResponse,
WriteSideHandlerServiceClient
}
import com.namely.chiefofstate.grpc.client.WriteSideHandlerServiceClient
import com.namely.protobuf.chiefofstate.v1.writeside.{HandleEventRequest, HandleEventResponse}
import io.superflat.lagompb.EventHandler
import io.superflat.lagompb.protobuf.v1.core.MetaData
import org.slf4j.{Logger, LoggerFactory}
Expand All @@ -19,12 +15,10 @@ import scala.util.{Failure, Success, Try}
/**
* ChiefOfStateEventHandler
*
* @param actorSystem the actor system
* @param writeSideHandlerServiceClient the gRpcClient used to connect to the actual event handler
* @param handlerSetting the event handler setting
*/
class AggregateEventHandler(
actorSystem: ActorSystem,
writeSideHandlerServiceClient: WriteSideHandlerServiceClient,
handlerSetting: HandlerSetting
) extends EventHandler {
Expand Down
Loading

0 comments on commit a81875a

Please sign in to comment.