Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Firebase now supports mass transfer of data #18

Merged
merged 2 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.env
myenv/
firebaseConfig.json
tests/firebaseConfig.json
tests/firebaseConfig.json
pbweb.json
mongo.txt
11 changes: 6 additions & 5 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
inputconfig:
url: sdfxghjuk
inputmethod: WebSocket
topic: dsfdgfh
url: sdfgbh
inputmethod: Kafka
outputconfig:
topic: bfhtd
url: dfghjkl
outputmethod: Kafka
queuename: sfdgrfthgyj
url: sefrgdthfy
outputmethod: RabbitMQ
71 changes: 31 additions & 40 deletions integrations/firebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package integrations
import (
"context"
"encoding/json"
"errors"
_ "errors"
"fmt"
"strings"
_ "strings"
"sync"
"time"

firebase "firebase.google.com/go"
_ "google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/SkySingh04/fractal/interfaces"
Expand All @@ -30,8 +31,7 @@ type FirebaseDestination struct {
}

func (f FirebaseSource) FetchData(req interfaces.Request) (interface{}, error) {
logger.Infof("Connecting to Firebase Source: Collection=%s, Document=%s, using Service Account=%s",
req.Collection, req.Document, req.CredentialFileAddr)
logger.Infof("Connecting to Firebase Source: Collection=%s, using Service Account=%s", req.Collection, req.CredentialFileAddr)

opt := option.WithCredentialsFile(req.CredentialFileAddr)
app, err := firebase.NewApp(context.Background(), nil, opt)
Expand All @@ -45,41 +45,30 @@ func (f FirebaseSource) FetchData(req interfaces.Request) (interface{}, error) {
}
defer client.Close()

dataChan := make(chan map[string]interface{}, 1)
errChan := make(chan error, 1)
var wg sync.WaitGroup
docs, err := client.Collection(req.Collection).Documents(context.Background()).GetAll()
if err != nil {
return nil, fmt.Errorf("failed to fetch documents: %w", err)
}

wg.Add(1)
go func() {
defer wg.Done()
dsnap, err := client.Collection(req.Collection).Doc(req.Document).Get(context.Background())
if err != nil {
errChan <- fmt.Errorf("failed to fetch document from Firestore: %w", err)
return
}
logger.Infof("Fetched documents from Firebase: %d documents", len(docs))
for i, doc := range docs {
logger.Infof("Document %d ID: %s, Data: %v", i, doc.Ref.ID, doc.Data())
}

if !dsnap.Exists() {
errChan <- fmt.Errorf("document not found: Collection=%s, Document=%s", req.Collection, req.Document)
return
}
var allData []map[string]interface{}
for _, doc := range docs {
data := doc.Data()
logger.Infof("Fetched data from Firebase: %v", data)

dataChan <- dsnap.Data()
}()
data["_id"] = doc.Ref.ID

wg.Wait()
close(dataChan)
close(errChan)
validatedData := data
transformedData := validatedData

select {
case data := <-dataChan:
validatedData, err := validateFirebaseData(data)
if err != nil {
return nil, err
}
return transformFirebaseData(validatedData), nil
case err := <-errChan:
return nil, err
allData = append(allData, transformedData)
}

return allData, nil
}

func (f FirebaseDestination) SendData(data interface{}, req interfaces.Request) error {
Expand Down Expand Up @@ -127,6 +116,8 @@ func (f FirebaseDestination) SendData(data interface{}, req interfaces.Request)
}

func convertToMap(data interface{}, result *map[string]interface{}) error {
logger.Infof("Firebase data to map: %v", data)

temp, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data to JSON: %w", err)
Expand All @@ -140,18 +131,18 @@ func convertToMap(data interface{}, result *map[string]interface{}) error {

func validateFirebaseData(data map[string]interface{}) (map[string]interface{}, error) {
logger.Infof("Validating Firebase data: %v", data)
message, ok := data["data"].(string)
if !ok || strings.TrimSpace(message) == "" {
return nil, errors.New("invalid or missing 'data' field")
}
// // message, ok := data;
// if !ok || strings.TrimSpace(message) == "" {
// return nil, errors.New("invalid or missing 'data' field")
// }
return data, nil
}

func transformFirebaseData(data map[string]interface{}) map[string]interface{} {
logger.Infof("Transforming Firebase data: %v", data)
if message, ok := data["data"].(string); ok {
data["data"] = strings.ToUpper(message)
}
// if message, ok := data["data"].(string); ok {
// data["data"] = strings.ToUpper(message)
// }
data["processed"] = time.Now().Format(time.RFC3339)
return data
}
Expand Down
1 change: 1 addition & 0 deletions integrations/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func init() {
}

func TransformDataToBSON(data interface{}) ([]bson.M, error) {
logger.Infof("Data received for BSON conversion insertion: %+v", data)
switch v := data.(type) {
case map[string]interface{}: // Single document
return []bson.M{v}, nil
Expand Down
Loading