Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.5 scala 2.8 #4

Open
wants to merge 7 commits into
base: 0.5_scala_2.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package network

import com.google.protobuf.Message
import cluster.BaseClusterClient
import com.linkedin.norbert.cluster.ClusterClient

class NettyNetworkServer(config: NetworkServerConfig) extends NetworkServer {
val c = new com.linkedin.norbert.network.netty.NetworkServerConfig
if (config.getClusterClient != null) c.clusterClient = config.getClusterClient.asInstanceOf[BaseClusterClient].underlying
c.serviceName = config.getServiceName
c.zooKeeperConnectString = config.getZooKeeperConnectString

c.clusterClient = if (config.clusterClient != null)
config.getClusterClient.asInstanceOf[BaseClusterClient].underlying
else new ClusterClient(config.serviceName, config.zooKeeperConnectString, config.zooKeeperSessionTimeoutMillis)

c.zooKeeperSessionTimeoutMillis = config.getZooKeeperSessionTimeoutMillis
c.requestThreadCorePoolSize = config.getRequestThreadCorePoolSize
c.requestThreadMaxPoolSize = config.getRequestThreadMaxPoolSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,43 @@ package netty
import java.util.UUID
import org.jboss.netty.channel._
import com.google.protobuf.InvalidProtocolBufferException
import java.util.concurrent.{TimeUnit, ConcurrentHashMap}
import common.MessageRegistry
import protos.NorbertProtos
import logging.Logging
import jmx.JMX.MBean
import jmx.JMX
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit, ConcurrentHashMap}

@ChannelPipelineCoverage("all")
class ClientChannelHandler(serviceName: String, messageRegistry: MessageRegistry, staleRequestTimeoutMins: Int,
staleRequestCleanupFrequencyMins: Int) extends SimpleChannelHandler with Logging {
private val requestMap = new ConcurrentHashMap[UUID, Request]

private val cleanupThread = new Thread("stale-request-cleanup-thread") {
val cleanupTask = new Runnable() {
val staleRequestTimeoutMillis = TimeUnit.MILLISECONDS.convert(staleRequestTimeoutMins, TimeUnit.MINUTES)

override def run = {
while (true) {
TimeUnit.MINUTES.sleep(staleRequestCleanupFrequencyMins)

try {
import collection.JavaConversions._
var expiredEntryCount = 0

requestMap.keySet.foreach { uuid =>
val request = requestMap.get(uuid)
if ((System.currentTimeMillis - request.timestamp) > staleRequestTimeoutMillis) requestMap.remove(uuid)
if ((System.currentTimeMillis - request.timestamp) > staleRequestTimeoutMillis) {
requestMap.remove(uuid)
expiredEntryCount += 1
}
}

log.info("Expired %d stale entries from the request map".format(expiredEntryCount))
} catch {
case e: Exception => log.error("Exception caught in cleanup task, ignoring " + e)
}
}
}
cleanupThread.setDaemon(true)

val cleanupExecutor = new ScheduledThreadPoolExecutor(1)
cleanupExecutor.scheduleAtFixedRate(cleanupTask, staleRequestCleanupFrequencyMins, staleRequestCleanupFrequencyMins, TimeUnit.MINUTES)

private val statsActor = new NetworkStatisticsActor(100)
statsActor.start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class NettyNetworkServer(serverConfig: NetworkServerConfig) extends NetworkServe

override def shutdown = {
if (serverConfig.clusterClient == null) clusterClient.shutdown else super.shutdown
messageExecutor.shutdown
requestContextEncoder.shutdown
}
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
project.organization=com.linkedin.norbert
project.name=norbert
sbt.version=0.7.3
project.version=0.5-SNAPSHOT
project.version=0.5.1
build.scala.versions=2.8.0
project.initialize=false
6 changes: 3 additions & 3 deletions project/build/NorbertProject.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sbt._

class NorbertProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.com/maven2")
override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.org/nexus/content/groups/public/")

lazy val cluster = project("cluster", "Norbert Cluster", new ClusterProject(_))
lazy val network = project("network", "Norbert Network", new NetworkProject(_), cluster)
Expand All @@ -14,14 +14,14 @@ class NorbertProject(info: ProjectInfo) extends ParentProject(info) with IdeaPro
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0"
val log4j = "log4j" % "log4j" % "1.2.14"

val specs = "org.scala-tools.testing" %% "specs" % "1.6.5-SNAPSHOT" % "test"
val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test"
val mockito = "org.mockito" % "mockito-all" % "1.8.4" % "test"
val cglib = "cglib" % "cglib" % "2.1_3" % "test"
val objenesis = "org.objenesis" % "objenesis" % "1.0" % "test"
}

class NetworkProject(info: ProjectInfo) extends DefaultProject(info) with IdeaProject {
val netty = "org.jboss.netty" % "netty" % "3.1.5.GA"
val netty = "org.jboss.netty" % "netty" % "3.2.3.Final"
val slf4j = "org.slf4j" % "slf4j-api" % "1.5.6"
val slf4jLog4j = "org.slf4j" % "slf4j-log4j12" % "1.5.6"
}
Expand Down