Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for pub-sub message ordering issue #1618

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2f210bb
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
3164241
Fix for ordering issues with Pub-Sub messages to the same node - Form…
renjithalexander Mar 28, 2020
9dda906
Fix for ordering issues with Pub-Sub messages to the same node - Form…
renjithalexander Mar 28, 2020
a937463
Fix for ordering issues with Pub-Sub messages to the same node - JUnits
renjithalexander Mar 28, 2020
9b4b122
Fix for ordering issues with Pub-Sub messages to the same node - JUnits
renjithalexander Mar 28, 2020
b54cea0
Fix for ordering issues with Pub-Sub messages to the same node - Fix …
renjithalexander Mar 28, 2020
08c9729
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
48fbb0e
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
df15841
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
74272dc
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
bbd53dc
Fix for ordering issues with Pub-Sub messages to the same node - Docu…
renjithalexander Mar 28, 2020
54d7ba5
Fix for ordering issues with Pub-Sub messages to the same node - A pe…
renjithalexander Mar 30, 2020
2c511a2
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
92d3c6f
Fix for ordering issues with Pub-Sub messages to the same node - Form…
renjithalexander Mar 28, 2020
2b52dd3
Fix for ordering issues with Pub-Sub messages to the same node - Form…
renjithalexander Mar 28, 2020
084b2b4
Fix for ordering issues with Pub-Sub messages to the same node - JUnits
renjithalexander Mar 28, 2020
05151d4
Fix for ordering issues with Pub-Sub messages to the same node - JUnits
renjithalexander Mar 28, 2020
1aa58d3
Fix for ordering issues with Pub-Sub messages to the same node - Fix …
renjithalexander Mar 28, 2020
cf6e5b5
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
37a4de0
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
942c94a
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
c6a318a
Fix for ordering issues with Pub-Sub messages to the same node
renjithalexander Mar 28, 2020
749e62f
Fix for ordering issues with Pub-Sub messages to the same node - Docu…
renjithalexander Mar 28, 2020
420e668
Fix for ordering issues with Pub-Sub messages to the same node - A pe…
renjithalexander Mar 30, 2020
7e72a88
Merge branch 'master' of https://github.com/renjithalexander/Openfire
renjithalexander Apr 14, 2020
d2a9b1c
Non-null check for tasks submitted, and the ordering keys
renjithalexander Apr 14, 2020
e260738
Added shutdown logic for OrderedExecutors
renjithalexander Apr 27, 2020
4fa4cd4
Fixed issue where Future.get() getting blocked for deferred tasks, if…
renjithalexander Apr 27, 2020
41d60c4
Naming ordered executor
renjithalexander May 7, 2020
bee1e79
Naming ordered executor
renjithalexander May 7, 2020
5d80639
Merge https://github.com/igniterealtime/Openfire
renjithalexander Nov 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.OrderedExecutor;
import org.jivesoftware.util.SystemProperty;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.openfire.archive.ArchiveManager;
Expand Down Expand Up @@ -1319,6 +1320,9 @@ private void shutdownServer() {
// Shutdown the task engine.
TaskEngine.getInstance().shutdown();

// Cleanup all instances created of OrderedExecutor.
OrderedExecutor.shutdownInstances();

// hack to allow safe stopping
logger.info("Openfire stopped");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.ImmediateFuture;
import org.jivesoftware.util.OrderedExecutor;
import org.jivesoftware.util.OrderedRunnable;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.cache.CacheFactory;
Expand Down Expand Up @@ -56,6 +58,49 @@ public class PubSubEngine
*/
private PacketRouter router = null;

/**
* The ordered executor used to publish IQ packets to their respective nodes.
* This is specifically used to publish the messages to any particular node, in
* the same order as they were submitted. The messages to separate nodes are
* published in parallel.
*/
private static final OrderedExecutor publishToNodePool = new OrderedExecutor("PubSub");
/**
* Used to generate ordering key for publish to node, if node id is null.
*/
private static int defaultOrderingKeyGenPubSub = 0;

private class PublishToNodeTask implements OrderedRunnable {

private final PubSubService service;

private final IQ iq;

private final Element finalAction;

private final String orderingKey;

PublishToNodeTask(final PubSubService service, final IQ iq, final Element finalAction) {
this.service = service;
this.iq = iq;
this.finalAction = finalAction;
String nodeID = finalAction.attributeValue("node");
this.orderingKey = nodeID != null ? nodeID : String.valueOf(++defaultOrderingKeyGenPubSub);

}

@Override
public void run() {
publishItemsToNode(service, iq, finalAction);
}

@Override
public Object getOrderingKey() {
return this.orderingKey;
}

}

public PubSubEngine(PacketRouter router) {
this.router = router;
}
Expand Down Expand Up @@ -88,13 +133,7 @@ public Future process(final PubSubService service, final IQ iq) {
if (action != null) {
// Entity publishes an item
// Complete this asynchronously, as UserManager::isRegisteredUser(JID) blocks, waiting for a result which may come in on this thread
final Element finalAction = action;
return TaskEngine.getInstance().submit(new Runnable() {
@Override
public void run() {
publishItemsToNode(service, iq, finalAction);
}
});
return publishToNodePool.submit(new PublishToNodeTask(service, iq, action));
}
action = childElement.element("subscribe");
if (action != null) {
Expand Down
Loading