Skip to content

Commit

Permalink
Merge pull request #116 from samagra-comms/development
Browse files Browse the repository at this point in the history
Development Merge
  • Loading branch information
chinmoy12c authored Dec 4, 2023
2 parents bd721d1 + 8cd4b9d commit cc49383
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 104 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.uci</groupId>
<artifactId>transformer</artifactId>
<version>2.4.0</version>
<version>2.4.1</version>
<name>transformer</name>
<description>Demo project for Spring Boot</description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,64 +57,9 @@ private void postConstruct() {
if(!(downloadFormsFlag != null && downloadFormsFlag.equalsIgnoreCase("False"))) {
downloadForms();
}
// testFormManager();
}

private void testFormManager() {
String formPath = ODKConsumerReactive.getFormPath("samagra_workflows_form");
ServiceResponse response1 = new FormManager(null, null, null, formPath).start();
log.debug("First response");
log.debug(response1.getCurrentIndex(), response1.getNextMessage().getText());
}

private void downloadForms() {
//Empty the database and folder
FormsDao dao;
try{
File directoryToDelete = new File("/tmp/forms2");
FileSystemUtils.deleteRecursively(directoryToDelete);
dao = new FormsDao(JsonDB.getInstance().getDB());
dao.deleteFormsDatabase();
}catch (Exception e){}

//Create a folder /tmp/forms
new File("/tmp/forms2").mkdirs();

//Download fresh
OpenRosaHttpInterface openRosaHttpInterface = new OkHttpConnection(
new OkHttpOpenRosaServerClientProvider(new OkHttpClient()),
null,
"userAgent"
);
WebCredentialsUtils webCredentialsUtils = new WebCredentialsUtils();
OpenRosaAPIClient openRosaAPIClient = new OpenRosaAPIClient(openRosaHttpInterface, webCredentialsUtils);
FormListDownloader formListDownloader = new FormListDownloader(
openRosaAPIClient,
webCredentialsUtils);
HashMap<String, FormDetails> formList = formListDownloader.downloadFormList(false);
int count = 0;
if (formList.size() > 0) {
ArrayList<FormDetails> forms = new ArrayList<>();
for (Map.Entry<String, FormDetails> form : formList.entrySet()) {
forms.add(form.getValue());
count += 1;
}
FormDownloader formDownloader = null;
dao = new FormsDao(JsonDB.getInstance().getDB());
formDownloader = new FormDownloader(dao, openRosaAPIClient);
formDownloader.downloadForms(forms);
List<Form> downloadedForms = dao.getForms();
log.info("Total downloaded forms: " + downloadedForms.size());
}
new FormDownloader().downloadFormsDelta();
}
// @Bean
// CommandLineRunner executeOnStartup(Scheduler scheduler, Task<Void> sampleOneTimeTask) {
// log.info("Scheduling one time task to now!");
//
// return ignored -> scheduler.schedule(
// sampleOneTimeTask.instance("command-line-runner"),
// Instant.now()
// );
// }
}

18 changes: 18 additions & 0 deletions src/main/java/com/uci/transformer/controllers/ODKController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.uci.transformer.controllers;

import com.uci.transformer.odk.FormDownloader;
import lombok.extern.java.Log;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class ODKController {

@GetMapping(path = "/odk/updateAll")
public void updateAllODKForms() {
log.info("Updating forms....");
new FormDownloader().downloadFormsDelta();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.uci.transformer.controllers;

import com.uci.transformer.odk.ODKConsumerReactive;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import javax.ws.rs.BadRequestException;

@RestController()
public class XMessageController {

@Autowired
private ODKConsumerReactive odkConsumerReactive;

@PostMapping("/xmsg/processXMessage")
public Mono<String> processXMessage(@RequestBody String xMessage) {
if (xMessage == null || xMessage.isEmpty() || xMessage.isBlank()) {
throw new BadRequestException();
}
return odkConsumerReactive.processMessage(xMessage);
}
}
57 changes: 51 additions & 6 deletions src/main/java/com/uci/transformer/odk/FormDownloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
import com.uci.transformer.odk.model.Form;
import com.uci.transformer.odk.model.FormDetails;
import com.uci.transformer.odk.openrosa.OpenRosaAPIClient;
import com.uci.transformer.odk.openrosa.OpenRosaHttpInterface;
import com.uci.transformer.odk.openrosa.okhttp.OkHttpConnection;
import com.uci.transformer.odk.openrosa.okhttp.OkHttpOpenRosaServerClientProvider;
import com.uci.transformer.odk.persistance.FormsDao;
import com.uci.transformer.odk.utilities.DocumentFetchResult;
import com.uci.transformer.odk.utilities.FileUtils;
import com.uci.transformer.odk.utilities.MediaFile;
import com.uci.transformer.odk.persistance.JsonDB;
import com.uci.transformer.odk.utilities.*;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.javarosa.core.reference.ReferenceManager;
import org.javarosa.core.reference.RootTranslator;
import org.javarosa.xform.parse.XFormParser;
import org.kxml2.kdom.Element;
import org.springframework.util.FileSystemUtils;

import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -23,6 +27,7 @@
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -44,9 +49,14 @@ public FormDownloader(OpenRosaAPIClient openRosaAPIClient) {
private static final String NAMESPACE_OPENROSA_ORG_XFORMS_XFORMS_MANIFEST =
"http:openrosa.org/xforms/xformsManifest";

public FormDownloader(FormsDao formsDao, OpenRosaAPIClient openRosaAPIClient) {
this.formsDao = formsDao;
this.openRosaAPIClient = openRosaAPIClient;
public FormDownloader() {
this.formsDao = new FormsDao(JsonDB.getInstance().getDB());
OpenRosaHttpInterface openRosaHttpInterface = new OkHttpConnection(
new OkHttpOpenRosaServerClientProvider(new OkHttpClient()),
null,
"userAgent"
);
this.openRosaAPIClient = new OpenRosaAPIClient(openRosaHttpInterface, new WebCredentialsUtils());
}

public static boolean isXformsManifestNamespacedElement(Element e) {
Expand All @@ -67,6 +77,41 @@ private static class TaskCancelledException extends Exception {
}
}

public void resetForms() {
File directoryToDelete = new File("/tmp/forms2");
FileSystemUtils.deleteRecursively(directoryToDelete);
}

public void downloadFormsDelta() {
File formsDir = new File("/tmp/forms2");
if (!formsDir.exists()) {
//Create a folder /tmp/forms2
new File("/tmp/forms2").mkdirs();
}

//Download fresh
WebCredentialsUtils webCredentialsUtils = new WebCredentialsUtils();
FormListDownloader formListDownloader = new FormListDownloader(
openRosaAPIClient,
webCredentialsUtils);
HashMap<String, FormDetails> formList = formListDownloader.downloadFormList(false);
int count = 0;
if (formList.size() > 0) {
ArrayList<FormDetails> forms = new ArrayList<>();
for (Map.Entry<String, FormDetails> form : formList.entrySet()) {
File existingForm = new File(formsDir.getAbsolutePath(), FormNameUtils.formatFilenameFromFormName(form.getValue().getFormName()) + ".xml");
if (!existingForm.exists()) {
forms.add(form.getValue());
count += 1;
}
}
downloadForms(forms);
List<Form> downloadedForms = formsDao.getForms();
log.info("Total forms on disk: " + downloadedForms.size());
log.info("Total forms downloaded: " + count);
}
}

public void downloadForms(List<FormDetails> toDownload) {
int total = toDownload.size();
int count = 1;
Expand Down
82 changes: 41 additions & 41 deletions src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,47 +163,7 @@ public void onMessage() {
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> stringMessage) {
final long startTime = System.currentTimeMillis();
final Date startDateTime = new Date();
try {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
logTimeTaken(startTime, 1);
Mono.just(msg)
.flatMap(message -> transform(message))
.subscribeOn(Schedulers.parallel())
.subscribe(transformedMessage -> {
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime);
log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date());
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null
&& transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null
&& transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) {
log.info("CP-04" + transformedMessage.toXML());
kafkaProducer.send(genericTransformer, transformedMessage.toXML());

} else {
log.info("CP-05");
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
}
} catch (JAXBException e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
}
});
} catch (JAXBException e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
} catch (NullPointerException e) {
log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber()
+ " in class : " + e.getStackTrace()[0].getClassName());
} catch (Exception e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
processMessage(stringMessage.value()).subscribe();
}
})
.doOnError(new Consumer<Throwable>() {
Expand All @@ -216,6 +176,45 @@ public void accept(Throwable e) {

}

public Mono<String> processMessage(String stringMessage) {
return Mono.defer(() -> {
final long startTime = System.currentTimeMillis();
final Date startDateTime = new Date();
return Mono.fromCallable(() -> {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes()));
logTimeTaken(startTime, 1);
return msg;
})
.flatMap(this::transform)
.map(transformedMessage -> {
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime);
log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date());
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null
&& transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null
&& transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) {
log.info("CP-04" + transformedMessage.toXML());
kafkaProducer.send(genericTransformer, transformedMessage.toXML());
} else {
log.info("CP-05");
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
}
return transformedMessage.toXML();
} catch (JAXBException e) {
log.error("An error occurred : " + e.getMessage());
e.printStackTrace();
return "";
}
}
return "";
})
.subscribeOn(Schedulers.parallel());
});
}

@Override
public Mono<XMessage> transform(XMessage xMessage) {
ArrayList<Transformer> transformers = xMessage.getTransformers();
Expand All @@ -232,6 +231,7 @@ public Mono<XMessage> transform(XMessage xMessage) {
String formPath = getFormPath(formID);
log.info("current form path:" + formPath);
if (formPath == null) {
new FormDownloader().downloadFormsDelta();
log.error("formPath null found return null value : " + formID);
return Mono.empty();
}
Expand Down

0 comments on commit cc49383

Please sign in to comment.