diff --git a/src/main/java/org/sorincos/mytumble/MongoConnector.java b/src/main/java/org/sorincos/mytumble/MongoConnector.java index 71fc2ff..4f3649f 100644 --- a/src/main/java/org/sorincos/mytumble/MongoConnector.java +++ b/src/main/java/org/sorincos/mytumble/MongoConnector.java @@ -77,11 +77,12 @@ private void resetUsers(Message msg) { } private void saveUser(Message msg) { - vertx.executeBlocking(future -> { + vertx.executeBlocking(fiberHandler(future -> { try { MongoClient client = vertx.getOrCreateContext().get("mongoclient"); JsonObject user = msg.body(); - client.save("users", user, h -> { + JsonObject newUser = hydrateOldUser(client, (JsonObject) user); + client.save("users", newUser, h -> { if (h.failed()) logger.info("User upsert failed: " + h.cause().getLocalizedMessage()); }); @@ -90,7 +91,7 @@ private void saveUser(Message msg) { ex.printStackTrace(); future.fail(ex.getLocalizedMessage()); } - }, result -> { + }), result -> { if (result.succeeded()) msg.reply(result.result()); else @@ -104,26 +105,12 @@ private void saveUsers(Message msg) { try { MongoClient client = vertx.getOrCreateContext().get("mongoclient"); ArrayList users = Lists.newArrayList(msg.body()); - int total = users.size(); - logger.info("Saving users: " + total); - int followsme = 0; - int ifollow = 0; for (Object user : users) { - if (((JsonObject) user).getBoolean("ifollow") != null && ((JsonObject) user).getBoolean("ifollow")) - ifollow++; - if (((JsonObject) user).getBoolean("followsme") != null - && ((JsonObject) user).getBoolean("followsme")) - followsme++; - // await, to not kill Mongo's thread pool - String res = awaitResult(h -> client.save("users", (JsonObject) user, h)); + JsonObject newUser = hydrateOldUser(client, (JsonObject) user); + String res = awaitResult(h -> client.save("users", newUser, h)); if (res != null) // not sure what that means logger.error(res); - List found = awaitResult(h -> client.find("users", (JsonObject) user, h)); - if (found.size() != 1) - logger.error(found.size() + " for user " + ((JsonObject) user).getString("name")); } - logger.info("ifollow " + ifollow); - logger.info("followsme " + followsme); future.complete(); } catch (Exception ex) { logger.error("Exception updating users: ", ex); @@ -137,14 +124,26 @@ private void saveUsers(Message msg) { }); } + @Suspendable + private JsonObject hydrateOldUser(MongoClient client, JsonObject newUser) { + JsonObject query = new JsonObject().put("name", ((JsonObject) newUser).getString("name")); + List oldUsers = awaitResult(h -> client.find("users", query, h)); + if (oldUsers.size() < 1) + return newUser; + JsonObject oldUser = oldUsers.get(0); + return oldUser.mergeIn(newUser); + } + private void getUsers(Message msg) { JsonObject query = createQueryFromFilters(msg.body()); vertx.executeBlocking(future -> { MongoClient client = vertx.getOrCreateContext().get("mongoclient"); FindOptions options = new FindOptions().setSort(new JsonObject().put("name", 1)); client.findWithOptions("users", query, options, res -> { - if (res.failed()) + if (res.failed()) { future.fail(res.cause().getLocalizedMessage()); + return; + } final JsonArray users = new JsonArray(); res.result().forEach(users::add); future.complete(users); @@ -162,8 +161,10 @@ private void getUser(Message msg) { vertx.executeBlocking(future -> { MongoClient client = vertx.getOrCreateContext().get("mongoclient"); client.find("users", query, res -> { - if (res.failed()) + if (res.failed()) { future.fail(res.cause().getLocalizedMessage()); + return; + } final JsonArray users = new JsonArray(); res.result().forEach(users::add); // expected max one future.complete(users);