Skip to content

Commit

Permalink
Merge pull request #14 from bcgov/dev-AA-FOIMOD-3636
Browse files Browse the repository at this point in the history
#foimod-3636 SOLR push updates
  • Loading branch information
abin-aot authored Dec 25, 2024
2 parents 0345385 + 3bd0528 commit 3a9c3fa
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
computingservices/documentextractservices/azuredocextractservice/src/foiazuredocextractservice
.vscode/*
.vscode/*
computingservices/documentextractservices/azuredocextractservice/src/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${fileDirname}"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package azureservices

import (
"azuredocextractservice/types"
"bytes"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -28,95 +29,96 @@ func NewAzureService(subscriptionKey string, baseURL string) *AzureService {
}

// CallAzureDocument initiates the document analysis request
func (a *AzureService) AnalyzeAndExtractDocument(jsonPayload []byte) error {
func (a *AzureService) AnalyzeAndExtractDocument(jsonPayload []byte) (types.AnalyzeResults, error) {
var results types.AnalyzeResults
requestURL := fmt.Sprintf("%s/formrecognizer/documentModels/prebuilt-read:analyze?api-version=2023-07-31&stringIndexType=utf16CodeUnit", a.BaseURL)
// Send the POST request
apimRequestID, err := a.createAnalysisRequest(requestURL, jsonPayload)
if err != nil {
return fmt.Errorf("failed to initiate document analysis: %w", err)
return results, fmt.Errorf("failed to initiate document analysis: %w", err)
}
results, err := a.getAnalysisResults(apimRequestID)
results, err = a.getAnalysisResults(apimRequestID)
if err != nil {
return fmt.Errorf("failed to fetch analysis results: %w", err)
return results, fmt.Errorf("failed to fetch analysis results: %w", err)
}
//Print extracted data form document
fmt.Printf("Analysis Results: %v\n", results)
return nil
return results, err
}

// sendAnalyzeRequest sends the initial analysis request to the Azure API
func (a *AzureService) createAnalysisRequest(requestURL string, jsonPayload []byte) (string, error) {
req, err := http.NewRequest(http.MethodPost, requestURL, bytes.NewBuffer(jsonPayload))
if err != nil {
return "", fmt.Errorf("failed to create POST request: %w", err)
}

req, _ := http.NewRequest(http.MethodPost, requestURL, bytes.NewBuffer(jsonPayload))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Ocp-Apim-Subscription-Key", a.SubscriptionKey)
res, err := a.Client.Do(req)
if err != nil {
return "", fmt.Errorf("error making HTTP request: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected response status: %s", res.Status)
}
apimRequestID := res.Header.Get("Apim-Request-Id")

if apimRequestID == "" {
return "", fmt.Errorf("missing Apim-Request-Id in response header")
}
return apimRequestID, nil
}

func (a *AzureService) getAnalysisResults(apimRequestID string) (map[string]interface{}, error) {
func (a *AzureService) getAnalysisResults(apimRequestID string) (types.AnalyzeResults, error) {
extractReqURL := fmt.Sprintf(
"%s/formrecognizer/documentModels/prebuilt-read/analyzeResults/%s?api-version=2023-07-31",
a.BaseURL, apimRequestID,
)

for {
time.Sleep(1 * time.Second)
jsonResponse, err := a.getExtractedResults(extractReqURL)
result, err := a.getExtractedResults(extractReqURL)
if err != nil {
return nil, err
return result, err
}
status, ok := jsonResponse["status"].(string)
if !ok {
return nil, fmt.Errorf("missing or invalid 'status' in response")
}
fmt.Printf("Current status: %s\n", status)

status := result.Status
fmt.Printf("Current status: %s\n", result.Status)
switch status {
case "succeeded":
return jsonResponse, nil
return result, nil
case "running":
continue
default:
return nil, fmt.Errorf("analysis failed with status: %s", status)
return result, fmt.Errorf("analysis failed with status: %s", status)
}
}
}

// Helper function to perform the HTTP GET request and parse the JSON response
func (a *AzureService) getExtractedResults(url string) (map[string]interface{}, error) {
func (a *AzureService) getExtractedResults(url string) (types.AnalyzeResults, error) {
var result types.AnalyzeResults
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create GET request: %w", err)
return result, fmt.Errorf("failed to create GET request: %w", err)
}
req.Header.Set("Ocp-Apim-Subscription-Key", a.SubscriptionKey)
res, err := a.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("error making HTTP request: %w", err)
return result, fmt.Errorf("error making HTTP request: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response status: %s", res.Status)
return result, fmt.Errorf("unexpected response status: %s", res.Status)
}
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %w", err)
return result, fmt.Errorf("error reading response body: %w", err)
}
fmt.Println("Response Body starts here:")
fmt.Println(string(bodyBytes))
fmt.Println("Response Body ends here:")
var jsonResponse map[string]interface{}
err = json.Unmarshal(bodyBytes, &jsonResponse)
json.Unmarshal(bodyBytes, &jsonResponse)
err = json.Unmarshal(bodyBytes, &result)
if err != nil {
return nil, fmt.Errorf("error unmarshaling response body: %w", err)
return result, fmt.Errorf("error unmarshaling response body: %w", err)
}
return jsonResponse, nil
return result, nil
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package azureservices

import (
"azuredocextractservice/types"
"azuredocextractservice/utils"
"log"
)

func CallAzureDocument(jsonPayload []byte) {
subscriptionKey := "abcd"
func CallAzureDocument(jsonPayload []byte) (types.AnalyzeResults, error) {
subscriptionKey := utils.ViperEnvVariable("azuresubcriptionkey")
baseURL := "https://foidocintelservice.cognitiveservices.azure.com"

service := NewAzureService(subscriptionKey, baseURL)

// Example JSON payload
//jsonPayload := []byte(`{"url": "https://example.com/sample.pdf"}`)

err := service.AnalyzeAndExtractDocument(jsonPayload)
result, err := service.AnalyzeAndExtractDocument(jsonPayload)
if err != nil {
log.Fatalf("Error calling Azure Document API: %v", err)
}

return result, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
var activeMQBaseURL = utils.ViperEnvVariable("activeMQBaseURL")
var username = utils.ViperEnvVariable("activeMQUserName")
var password = utils.ViperEnvVariable("activeMQPassword")
var queuename = utils.ViperEnvVariable("focdocextractqueue")
var activemqclientid = utils.ViperEnvVariable("activemqclientid")

// ProcessMessage fetches messages from the ActiveMQ queue using HTTP
func ProcessMessage() ([]types.QueueMessage, error) {
queueName := "queuetest"
queueName := queuename
clientid := activemqclientid
// Construct the URL to fetch messages from the queue
url := fmt.Sprintf("%s/%s?type=queue", activeMQBaseURL, queueName)
url := fmt.Sprintf("%s://%s&clientId=%s", activeMQBaseURL, queueName, clientid)
messages := []types.QueueMessage{}
timeoutCounter := 0
maxTimeouts := 1
Expand All @@ -45,7 +48,7 @@ func ProcessMessage() ([]types.QueueMessage, error) {
fmt.Println("No more messages in the queue. Exiting...")
break
}
fmt.Printf("Extracted s3uri: %s\n", message.S3Uri)
fmt.Printf("Extracted s3uri: %s\n", message.BatchID)
messages = append(messages, *message)
}
fmt.Println("All messages processed. Exiting.")
Expand Down Expand Up @@ -86,7 +89,7 @@ func fetchMessageFromQueue(url string) (*types.QueueMessage, error) {
}
fmt.Printf("Response Body: %s\n", string(body))
var message types.QueueMessage
if err := json.Unmarshal(body, &message); err != nil {
if err := json.Unmarshal([]byte(body), &message); err != nil {
return nil, fmt.Errorf("failed to unmarshal message: %w", err)
}
return &message, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"azuredocextractservice/azureservices"
"azuredocextractservice/httpservices"
"azuredocextractservice/s3services"
"azuredocextractservice/solrsearchservices"
"azuredocextractservice/types"
"fmt"
"log"
"net/url"
"strconv"
"strings"
"time"
)
Expand All @@ -28,29 +31,72 @@ func main() {
for _, message := range dequeuedmessages {
fmt.Printf("Received message: %+v\n", message)

parsedURL, err := url.Parse(message.S3Uri)
if err != nil {
fmt.Printf("Error parsing URL: %v\n", err)
return
var requests []types.Requests = message.Requests

for _, request := range requests {
for _, document := range request.Documents {
var parsedURL = document.DocumentS3URL
var jsonStrbytes []byte = getBytesfromDocumentPath(parsedURL)
analysisResults, _analyzeerr := azureservices.CallAzureDocument(jsonStrbytes)
if _analyzeerr == nil && analysisResults.Status == "succeeded" {

searchdocumentpagelines := []types.SOLRSearchDocument{}
//pUSH to solr.
for _, page := range analysisResults.AnalyzeResult.Pages {
for _, line := range page.Lines {
_solrsearchdocuemnt := types.SOLRSearchDocument{
FoiDocumentID: strconv.Itoa(int(document.DocumentID)),
FoiRequestNumber: request.RequestNumber,
FoiMinistryRequestID: request.MinistryRequestID,
FoiMinistryCode: request.MinistryCode,
FoiDocumentFileName: document.DocumentName,
FoiDocumentPageNumber: page.PageNumber,
FoiDocumentSentence: line.Content,
FoiRequestMiscInfo: document.DocumentS3URL,
}
searchdocumentpagelines = append(searchdocumentpagelines, _solrsearchdocuemnt)
fmt.Println(_solrsearchdocuemnt.FoiDocumentFileName)
}

}

solrsearchservices.PushtoSolr(searchdocumentpagelines)

}
}
}

// Get the path after the hostname
path := strings.TrimPrefix(parsedURL.Path, "/")
bucketName, relativePath, found := strings.Cut(path, "/")
if !found {
fmt.Println("Invalid URL format")
return
}
fmt.Printf("Bucket: %s, Key: %s\n", bucketName, relativePath)
var s3url = s3services.GetFilefroms3(relativePath, bucketName)
jsonStr := `{
"urlSource": "` + s3url + `"
}`
var jsonStrbytes = []byte(jsonStr)
azureservices.CallAzureDocument(jsonStrbytes)

fmt.Printf("################-------------------------------####################")
}
end := time.Now()
fmt.Println("End Time :" + end.String())
total := end.Sub(start)
fmt.Println("Total time:" + total.String())
}

func getBytesfromDocumentPath(documenturlpath string) []byte {
//path := strings.TrimPrefix(documenturlpath, "/")
//bucketName, relativePath, found := strings.Cut(path, "/")
parsedURL, err := url.Parse(documenturlpath)
if err != nil {
fmt.Println("Error is parsing URL")
return nil
}
relativePath := parsedURL.Path
relativePath = strings.TrimPrefix(relativePath, "/")
bucketName, relativePath, found := strings.Cut(relativePath, "/")
if !found {
fmt.Println("Invalid URL format")
return nil
}
fmt.Printf("Bucket: %s, Key: %s\n", bucketName, relativePath)
var s3url = s3services.GetFilefroms3(relativePath, bucketName)
jsonStr := `{
"urlSource": "` + s3url + `"
}`
var jsonStrbytes = []byte(jsonStr)

return jsonStrbytes
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package solrsearchservices

import (
"azuredocextractservice/types"
"azuredocextractservice/utils"
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
)

func PushtoSolr(searchdocs []types.SOLRSearchDocument) bool {

// Convert the struct to JSON
jsonData, err := json.Marshal(searchdocs)
fmt.Println("SOLR Search Data starts here")
fmt.Println(string(jsonData))
fmt.Println("SOLR Search Data ends here")
if err != nil {
log.Fatal("Error marshaling JSON:", err)
}

// Solr endpoint URL (Replace with your Solr endpoint)
url := utils.ViperEnvVariable("foisearchsolrpostendpoint")
// Create a POST request with JSON data
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Fatal("Error creating request:", err)
}

// Set the appropriate headers for JSON content
req.Header.Set("Content-Type", "application/json")
username := utils.ViperEnvVariable("solradmin")
password := utils.ViperEnvVariable("solrpassword")
req.SetBasicAuth(username, password)
// Send the request using the http client
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Fatal("Error sending request:", err)
}
defer resp.Body.Close()

// Handle the response
if resp.StatusCode == http.StatusOK {
fmt.Println("Successfully posted to Solr")
return true
} else {
fmt.Printf("Failed to post to Solr. Status: %s\n", resp.Status)
return false
}

}
Loading

0 comments on commit 3a9c3fa

Please sign in to comment.