Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/SkySingh04/fractal
Browse files Browse the repository at this point in the history
  • Loading branch information
SkySingh04 committed Jan 1, 2025
2 parents 7cd3226 + f708526 commit a2212fc
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 46 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.env
myenv/
firebaseConfig.json
tests/firebaseConfig.json
tests/firebaseConfig.json
pbweb.json
mongo.txt
File renamed without changes.
11 changes: 6 additions & 5 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
errorhandling:
strategy: LOG_AND_CONTINUE
inputconfig:
url: sdfxghjuk
inputmethod: WebSocket
topic: dsfdgfh
url: sdfgbh
inputmethod: Kafka
outputconfig:
topic: bfhtd
url: dfghjkl
outputmethod: Kafka
queuename: sfdgrfthgyj
url: sefrgdthfy
outputmethod: RabbitMQ
71 changes: 31 additions & 40 deletions integrations/firebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package integrations
import (
"context"
"encoding/json"
"errors"
_ "errors"
"fmt"
"strings"
_ "strings"
"sync"
"time"

firebase "firebase.google.com/go"
_ "google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/SkySingh04/fractal/interfaces"
Expand All @@ -30,8 +31,7 @@ type FirebaseDestination struct {
}

func (f FirebaseSource) FetchData(req interfaces.Request) (interface{}, error) {
logger.Infof("Connecting to Firebase Source: Collection=%s, Document=%s, using Service Account=%s",
req.Collection, req.Document, req.CredentialFileAddr)
logger.Infof("Connecting to Firebase Source: Collection=%s, using Service Account=%s", req.Collection, req.CredentialFileAddr)

opt := option.WithCredentialsFile(req.CredentialFileAddr)
app, err := firebase.NewApp(context.Background(), nil, opt)
Expand All @@ -45,41 +45,30 @@ func (f FirebaseSource) FetchData(req interfaces.Request) (interface{}, error) {
}
defer client.Close()

dataChan := make(chan map[string]interface{}, 1)
errChan := make(chan error, 1)
var wg sync.WaitGroup
docs, err := client.Collection(req.Collection).Documents(context.Background()).GetAll()
if err != nil {
return nil, fmt.Errorf("failed to fetch documents: %w", err)
}

wg.Add(1)
go func() {
defer wg.Done()
dsnap, err := client.Collection(req.Collection).Doc(req.Document).Get(context.Background())
if err != nil {
errChan <- fmt.Errorf("failed to fetch document from Firestore: %w", err)
return
}
logger.Infof("Fetched documents from Firebase: %d documents", len(docs))
for i, doc := range docs {
logger.Infof("Document %d ID: %s, Data: %v", i, doc.Ref.ID, doc.Data())
}

if !dsnap.Exists() {
errChan <- fmt.Errorf("document not found: Collection=%s, Document=%s", req.Collection, req.Document)
return
}
var allData []map[string]interface{}
for _, doc := range docs {
data := doc.Data()
logger.Infof("Fetched data from Firebase: %v", data)

dataChan <- dsnap.Data()
}()
data["_id"] = doc.Ref.ID

wg.Wait()
close(dataChan)
close(errChan)
validatedData := data
transformedData := validatedData

select {
case data := <-dataChan:
validatedData, err := validateFirebaseData(data)
if err != nil {
return nil, err
}
return transformFirebaseData(validatedData), nil
case err := <-errChan:
return nil, err
allData = append(allData, transformedData)
}

return allData, nil
}

func (f FirebaseDestination) SendData(data interface{}, req interfaces.Request) error {
Expand Down Expand Up @@ -127,6 +116,8 @@ func (f FirebaseDestination) SendData(data interface{}, req interfaces.Request)
}

func convertToMap(data interface{}, result *map[string]interface{}) error {
logger.Infof("Firebase data to map: %v", data)

temp, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data to JSON: %w", err)
Expand All @@ -140,18 +131,18 @@ func convertToMap(data interface{}, result *map[string]interface{}) error {

func validateFirebaseData(data map[string]interface{}) (map[string]interface{}, error) {

Check failure on line 132 in integrations/firebase.go

View workflow job for this annotation

GitHub Actions / Linting

func `validateFirebaseData` is unused (unused)
logger.Infof("Validating Firebase data: %v", data)
message, ok := data["data"].(string)
if !ok || strings.TrimSpace(message) == "" {
return nil, errors.New("invalid or missing 'data' field")
}
// // message, ok := data;
// if !ok || strings.TrimSpace(message) == "" {
// return nil, errors.New("invalid or missing 'data' field")
// }
return data, nil
}

func transformFirebaseData(data map[string]interface{}) map[string]interface{} {

Check failure on line 141 in integrations/firebase.go

View workflow job for this annotation

GitHub Actions / Linting

func `transformFirebaseData` is unused (unused)
logger.Infof("Transforming Firebase data: %v", data)
if message, ok := data["data"].(string); ok {
data["data"] = strings.ToUpper(message)
}
// if message, ok := data["data"].(string); ok {
// data["data"] = strings.ToUpper(message)
// }
data["processed"] = time.Now().Format(time.RFC3339)
return data
}
Expand Down
1 change: 1 addition & 0 deletions integrations/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func init() {
}

func TransformDataToBSON(data interface{}) ([]bson.M, error) {
logger.Infof("Data received for BSON conversion insertion: %+v", data)
switch v := data.(type) {
case map[string]interface{}: // Single document
return []bson.M{v}, nil
Expand Down

0 comments on commit a2212fc

Please sign in to comment.