Skip to content

Commit

Permalink
unclicked by mistake (again)
Browse files Browse the repository at this point in the history
  • Loading branch information
sorin-costea committed Oct 25, 2016
1 parent 254893d commit 61da0c7
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions src/main/java/org/sorincos/mytumble/MongoConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ private void resetUsers(Message<String> msg) {
}

private void saveUser(Message<JsonObject> msg) {
vertx.<JsonObject>executeBlocking(future -> {
vertx.<JsonObject>executeBlocking(fiberHandler(future -> {
try {
MongoClient client = vertx.getOrCreateContext().get("mongoclient");
JsonObject user = msg.body();
client.save("users", user, h -> {
JsonObject newUser = hydrateOldUser(client, (JsonObject) user);
client.save("users", newUser, h -> {
if (h.failed())
logger.info("User upsert failed: " + h.cause().getLocalizedMessage());
});
Expand All @@ -90,7 +91,7 @@ private void saveUser(Message<JsonObject> msg) {
ex.printStackTrace();
future.fail(ex.getLocalizedMessage());
}
}, result -> {
}), result -> {
if (result.succeeded())
msg.reply(result.result());
else
Expand All @@ -104,26 +105,12 @@ private void saveUsers(Message<JsonArray> msg) {
try {
MongoClient client = vertx.getOrCreateContext().get("mongoclient");
ArrayList<Object> users = Lists.newArrayList(msg.body());
int total = users.size();
logger.info("Saving users: " + total);
int followsme = 0;
int ifollow = 0;
for (Object user : users) {
if (((JsonObject) user).getBoolean("ifollow") != null && ((JsonObject) user).getBoolean("ifollow"))
ifollow++;
if (((JsonObject) user).getBoolean("followsme") != null
&& ((JsonObject) user).getBoolean("followsme"))
followsme++;
// await, to not kill Mongo's thread pool
String res = awaitResult(h -> client.save("users", (JsonObject) user, h));
JsonObject newUser = hydrateOldUser(client, (JsonObject) user);
String res = awaitResult(h -> client.save("users", newUser, h));
if (res != null) // not sure what that means
logger.error(res);
List<JsonObject> found = awaitResult(h -> client.find("users", (JsonObject) user, h));
if (found.size() != 1)
logger.error(found.size() + " for user " + ((JsonObject) user).getString("name"));
}
logger.info("ifollow " + ifollow);
logger.info("followsme " + followsme);
future.complete();
} catch (Exception ex) {
logger.error("Exception updating users: ", ex);
Expand All @@ -137,14 +124,26 @@ private void saveUsers(Message<JsonArray> msg) {
});
}

@Suspendable
private JsonObject hydrateOldUser(MongoClient client, JsonObject newUser) {
JsonObject query = new JsonObject().put("name", ((JsonObject) newUser).getString("name"));
List<JsonObject> oldUsers = awaitResult(h -> client.find("users", query, h));
if (oldUsers.size() < 1)
return newUser;
JsonObject oldUser = oldUsers.get(0);
return oldUser.mergeIn(newUser);
}

private void getUsers(Message<String> msg) {
JsonObject query = createQueryFromFilters(msg.body());
vertx.<JsonArray>executeBlocking(future -> {
MongoClient client = vertx.getOrCreateContext().get("mongoclient");
FindOptions options = new FindOptions().setSort(new JsonObject().put("name", 1));
client.findWithOptions("users", query, options, res -> {
if (res.failed())
if (res.failed()) {
future.fail(res.cause().getLocalizedMessage());
return;
}
final JsonArray users = new JsonArray();
res.result().forEach(users::add);
future.complete(users);
Expand All @@ -162,8 +161,10 @@ private void getUser(Message<JsonArray> msg) {
vertx.<JsonArray>executeBlocking(future -> {
MongoClient client = vertx.getOrCreateContext().get("mongoclient");
client.find("users", query, res -> {
if (res.failed())
if (res.failed()) {
future.fail(res.cause().getLocalizedMessage());
return;
}
final JsonArray users = new JsonArray();
res.result().forEach(users::add); // expected max one
future.complete(users);
Expand Down

0 comments on commit 61da0c7

Please sign in to comment.