Skip to content

Commit

Permalink
Merge pull request #9 from pop-team/tfc
Browse files Browse the repository at this point in the history
Merge upstream revisions.
  • Loading branch information
MDosky authored Oct 6, 2017
2 parents 87a3590 + 0a4f9c8 commit 4ef86ab
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 23 deletions.
22 changes: 13 additions & 9 deletions workspace/popjava/src/popjava/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public enum State{
// thread unique callers
private static ThreadLocal<POPRemoteCaller> remoteCaller = new InheritableThreadLocal<>();

/**
* Request queue shared by all comboxes of this broker
*/
private final RequestQueue requestQueue = new RequestQueue();

private State state;
private ComboxServer[] comboxServers;
private POPBuffer buffer;
Expand Down Expand Up @@ -838,15 +843,10 @@ public POPAccessPoint getAccessPoint() {
public void treatRequests() throws InterruptedException {
setState(State.Running);
while (getState() == State.Running) {
for (ComboxServer comboxServer : comboxServers) {
Request request = comboxServer.getRequestQueue().peek(REQUEST_QUEUE_TIMEOUT_MS,
TimeUnit.MILLISECONDS);

if (request != null) {
serveRequest(request);
}else {
Thread.sleep(100);
}
Request request = requestQueue.peek(REQUEST_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

if (request != null) {
serveRequest(request);
}
}
LogWriter.writeDebugInfo("[Broker] Close broker "+popInfo.getClassName());
Expand Down Expand Up @@ -1179,4 +1179,8 @@ public String getLogPrefix() {
+ popInfo.getClass().getName() + ":";
}
}

public RequestQueue getRequestQueue() {
return requestQueue;
}
}
5 changes: 2 additions & 3 deletions workspace/popjava/src/popjava/combox/ComboxServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public abstract class ComboxServer {
static public final int ABORT = 2;

protected int status = EXIT;
protected RequestQueue requestQueue = new RequestQueue();
protected Broker broker;
protected int timeOut = 0;
protected AccessPoint accessPoint;
Expand All @@ -35,7 +34,7 @@ public ComboxServer(AccessPoint accessPoint, int timeout, Broker broker) {
* @return The associated request queue
*/
public RequestQueue getRequestQueue() {
return requestQueue;
}
return broker.getRequestQueue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import popjava.baseobject.AccessPoint;
import popjava.broker.Broker;
import popjava.broker.RequestQueue;
import popjava.combox.ComboxServer;

/**
Expand All @@ -20,13 +19,5 @@ public ComboxServerPlugin(AccessPoint accessPoint, int timeout,
Broker broker) {
super(accessPoint, timeout, broker);
}

/**
* Get the associated request queue
* @return The associated request queue
*/
public RequestQueue getRequestQueue() {
return requestQueue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void createServer() {
serverSocket = new ServerSocket();
serverSocket.setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
serverSocket.bind(new InetSocketAddress(accessPoint.getPort()));
serverCombox = new ComboxAcceptSocket(broker, requestQueue,
serverCombox = new ComboxAcceptSocket(broker, getRequestQueue(),
serverSocket);
serverCombox.setStatus(RUNNING);
Thread thread = new Thread(serverCombox, "Server combox acception thread");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void createServer() {

serverSocket.setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
serverSocket.bind(new InetSocketAddress(accessPoint.getPort()));
serverCombox = new ComboxAcceptSecureSocket(broker, requestQueue, serverSocket);
serverCombox = new ComboxAcceptSecureSocket(broker, getRequestQueue(), serverSocket);
serverCombox.setStatus(RUNNING);
Thread thread = new Thread(serverCombox, "Server combox acception thread");
thread.start();
Expand Down

0 comments on commit 4ef86ab

Please sign in to comment.