Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Flexible Data Cleansing Functionality for Enhanced Text Processing #760

Closed
wants to merge 76 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
95feb6d
Update main.go
NailaRais Oct 18, 2024
ed1d3d1
Update main.go
NailaRais Oct 18, 2024
a001d5f
Merge branch 'instill-ai:main' into data-cleaning-function
NailaRais Nov 3, 2024
baec4bb
Update main.go
NailaRais Nov 4, 2024
8e3b26b
Merge branch 'instill-ai:main' into data-cleaning-function
NailaRais Nov 4, 2024
eb0af19
Merge branch 'instill-ai:main' into data-cleaning-function
NailaRais Nov 4, 2024
b8fbec9
Update main_test.go
NailaRais Nov 4, 2024
f009321
Update main_test.go
NailaRais Nov 4, 2024
005ec8c
Update main.go
NailaRais Nov 4, 2024
9fd2ea9
Update main.go
NailaRais Nov 4, 2024
637347d
Update main_test.go
NailaRais Nov 4, 2024
e2d3c90
Update main.go
NailaRais Nov 4, 2024
01104d0
Update main_test.go
NailaRais Nov 4, 2024
431cc97
Update main.go
NailaRais Nov 4, 2024
fd09ce7
Update main.go
NailaRais Nov 4, 2024
442a5d9
Update main_test.go
NailaRais Nov 4, 2024
be71a87
Update main_test.go
NailaRais Nov 4, 2024
fc4c2a9
Update main_test.go
NailaRais Nov 4, 2024
6401a68
Update main_test.go
NailaRais Nov 4, 2024
15aa276
Update main_test.go
NailaRais Nov 4, 2024
f6cfbba
Update main_test.go
NailaRais Nov 4, 2024
35f1e92
Update main_test.go
NailaRais Nov 4, 2024
f34955e
Update main_test.go
NailaRais Nov 4, 2024
2d2e570
Update main_test.go
NailaRais Nov 4, 2024
3daa8a7
Update main_test.go
NailaRais Nov 4, 2024
fccf4fb
Update main_test.go
NailaRais Nov 4, 2024
daec046
Update main_test.go
NailaRais Nov 4, 2024
e2322dc
Update main_test.go
NailaRais Nov 4, 2024
6686ead
Update main_test.go
NailaRais Nov 4, 2024
b81bed1
Update main_test.go
NailaRais Nov 4, 2024
ae4aa6b
Update main_test.go
NailaRais Nov 4, 2024
b010db8
Update main_test.go
NailaRais Nov 4, 2024
7834b12
Update main_test.go
NailaRais Nov 4, 2024
cc6686a
Update main_test.go
NailaRais Nov 4, 2024
17c2513
Update main_test.go
NailaRais Nov 4, 2024
1def477
Update main_test.go
NailaRais Nov 4, 2024
085be39
Update main_test.go
NailaRais Nov 4, 2024
0f59887
Update main_test.go
NailaRais Nov 4, 2024
9872db4
Update main_test.go
NailaRais Nov 4, 2024
609822b
Update main_test.go
NailaRais Nov 4, 2024
68225dc
Update main_test.go
NailaRais Nov 4, 2024
baa20b5
Update main_test.go
NailaRais Nov 4, 2024
531363d
Update main_test.go
NailaRais Nov 4, 2024
00c35db
Update main.go
NailaRais Nov 4, 2024
c39576e
Update main_test.go
NailaRais Nov 4, 2024
49fc57d
Update main_test.go
NailaRais Nov 4, 2024
157a54e
Update main.go
NailaRais Nov 4, 2024
0e87e36
Update main_test.go
NailaRais Nov 4, 2024
4d9a87f
Update main_test.go
NailaRais Nov 4, 2024
8806464
Update main_test.go
NailaRais Nov 4, 2024
bde265d
Update main_test.go
NailaRais Nov 4, 2024
a9f5760
Update main_test.go
NailaRais Nov 4, 2024
1f13ae0
Update main.go
NailaRais Nov 4, 2024
fccc3f3
Update main_test.go
NailaRais Nov 4, 2024
37fe3c6
Update main_test.go
NailaRais Nov 5, 2024
85406e6
Update main_test.go
NailaRais Nov 5, 2024
fe356db
Update main_test.go
NailaRais Nov 5, 2024
38d105a
Update main_test.go
NailaRais Nov 5, 2024
cf35974
Update chunk_text_test.go
NailaRais Nov 5, 2024
206edd0
Update main_test.go
NailaRais Nov 6, 2024
6e5947e
Update main_test.go
NailaRais Nov 6, 2024
5861612
Update main_test.go
NailaRais Nov 6, 2024
226bd66
Update main_test.go
NailaRais Nov 6, 2024
fc8b5a2
Update main_test.go
NailaRais Nov 6, 2024
708b054
Update main_test.go
NailaRais Nov 6, 2024
549aa66
Update main_test.go
NailaRais Nov 6, 2024
5181da6
Update main_test.go
NailaRais Nov 6, 2024
9a98a50
Update main_test.go
NailaRais Nov 6, 2024
716bbb6
Update main_test.go
NailaRais Nov 6, 2024
54d7252
Update main_test.go
NailaRais Nov 6, 2024
74338ac
Update main_test.go
NailaRais Nov 6, 2024
efbb226
Update main_test.go
NailaRais Nov 6, 2024
5b97d54
Update main_test.go
NailaRais Nov 6, 2024
aca408f
Update chunk_text_test.go
NailaRais Nov 6, 2024
bcc2d90
Update main_test.go
NailaRais Nov 6, 2024
0fae83f
Update main_test.go
NailaRais Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/component/operator/text/v0/chunk_text_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,7 @@ func Test_ChunkPositions(t *testing.T) {

}
}




225 changes: 202 additions & 23 deletions pkg/component/operator/text/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package text

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
"sync"

_ "embed"
Expand All @@ -12,7 +17,7 @@ import (
)

const (
taskChunkText string = "TASK_CHUNK_TEXT"
taskDataCleansing string = "TASK_CLEAN_DATA" // Use this constant for the data cleansing task
)

var (
Expand Down Expand Up @@ -40,49 +45,223 @@ func Init(bc base.Component) *component {
comp = &component{Component: bc}
err := comp.LoadDefinition(definitionJSON, nil, tasksJSON, nil)
if err != nil {
panic(err)
panic(fmt.Sprintf("failed to load component definition: %v", err))
}
})
return comp
}

// CreateExecution initializes a component executor that can be used in a
// pipeline trigger.
// CreateExecution initializes a component executor that can be used in a pipeline trigger.
func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, error) {
return &execution{ComponentExecution: x}, nil
}

// Execute executes the derived execution
func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error {
// CleanDataInput defines the input structure for the data cleansing task
type CleanDataInput struct {
Texts []string `json:"texts"` // Array of text to be cleaned
Setting DataCleaningSetting `json:"setting"` // Cleansing configuration
}

for _, job := range jobs {
switch e.Task {
case taskChunkText:
inputStruct := ChunkTextInput{}
// CleanDataOutput defines the output structure for the data cleansing task
type CleanDataOutput struct {
CleanedTexts []string `json:"texts"` // Array of cleaned text
}

err := job.Input.ReadData(ctx, &inputStruct)
if err != nil {
job.Error.Error(ctx, err)
continue
// DataCleaningSetting defines the configuration for data cleansing
type DataCleaningSetting struct {
CleanMethod string `json:"clean-method"` // "Regex" or "Substring"
ExcludePatterns []string `json:"exclude-patterns,omitempty"`
IncludePatterns []string `json:"include-patterns,omitempty"`
ExcludeSubstrs []string `json:"exclude-substrings,omitempty"`
IncludeSubstrs []string `json:"include-substrings,omitempty"`
CaseSensitive bool `json:"case-sensitive,omitempty"`
}

// CleanData cleans the input texts based on the provided settings
func CleanData(input CleanDataInput) CleanDataOutput {
var cleanedTexts []string

switch input.Setting.CleanMethod {
case "Regex":
cleanedTexts = cleanTextUsingRegex(input.Texts, input.Setting)
case "Substring":
cleanedTexts = cleanTextUsingSubstring(input.Texts, input.Setting)
default:
// If no valid method is provided, return the original texts
cleanedTexts = input.Texts
}

return CleanDataOutput{CleanedTexts: cleanedTexts}
}

// CleanChunkedData cleans the input texts in chunks based on the provided settings
func CleanChunkedData(input CleanDataInput, chunkSize int) []CleanDataOutput {
var outputs []CleanDataOutput

for i := 0; i < len(input.Texts); i += chunkSize {
end := i + chunkSize
if end > len(input.Texts) {
end = len(input.Texts)
}
chunk := CleanDataInput{
Texts: input.Texts[i:end],
Setting: input.Setting,
}
cleanedChunk := CleanData(chunk)
outputs = append(outputs, cleanedChunk)
}
return outputs
}

// cleanTextUsingRegex cleans the input texts using regular expressions based on the given settings.
func cleanTextUsingRegex(inputTexts []string, settings DataCleaningSetting) []string {
var cleanedTexts []string

// Precompile exclusion and inclusion patterns
excludeRegexes := compileRegexPatterns(settings.ExcludePatterns)
includeRegexes := compileRegexPatterns(settings.IncludePatterns)

for _, text := range inputTexts {
include := true

// Check for exclusion patterns
for _, re := range excludeRegexes {
if re.MatchString(text) {
include = false
break // Stop checking if one exclusion pattern matches
}
}

// If there are include patterns, check them
if len(includeRegexes) > 0 {
include = false // Reset include to false for include check
for _, re := range includeRegexes {
if re.MatchString(text) {
include = true
break // Stop checking if one inclusion pattern matches
}
}
}

// If the text passed both checks, add it to the cleaned texts
if include {
cleanedTexts = append(cleanedTexts, text)
}
}
return cleanedTexts
}

// cleanTextUsingSubstring cleans the input texts using substrings based on the given settings
func cleanTextUsingSubstring(inputTexts []string, settings DataCleaningSetting) []string {
var cleanedTexts []string

for _, text := range inputTexts {
include := true
compareText := text
if !settings.CaseSensitive {
compareText = strings.ToLower(text)
}

// Exclude substrings
for _, substr := range settings.ExcludeSubstrs {
if !settings.CaseSensitive {
substr = strings.ToLower(substr)
}
if strings.Contains(compareText, substr) {
include = false
break
}
}

var outputStruct ChunkTextOutput
if inputStruct.Strategy.Setting.ChunkMethod == "Markdown" {
outputStruct, err = chunkMarkdown(inputStruct)
} else {
outputStruct, err = chunkText(inputStruct)
// Include substrings
if include && len(settings.IncludeSubstrs) > 0 {
include = false
for _, substr := range settings.IncludeSubstrs {
if !settings.CaseSensitive {
substr = strings.ToLower(substr)
}
if strings.Contains(compareText, substr) {
include = true
break
}
}
}

if include {
cleanedTexts = append(cleanedTexts, text)
}
}
return cleanedTexts
}

// compileRegexPatterns compiles a list of regular expression patterns
func compileRegexPatterns(patterns []string) []*regexp.Regexp {
var regexes []*regexp.Regexp
for _, pattern := range patterns {
re, err := regexp.Compile(pattern)
if err != nil {
continue // Skip this pattern if it fails
}
regexes = append(regexes, re)
}
return regexes
}

// FetchJSONInput reads JSON data from a file and unmarshals it into CleanDataInput
func FetchJSONInput(filePath string) (CleanDataInput, error) {
file, err := os.Open(filePath)
if err != nil {
return CleanDataInput{}, fmt.Errorf("failed to open JSON file: %w", err)
}
defer file.Close()

bytes, err := ioutil.ReadAll(file)
if err != nil {
return CleanDataInput{}, fmt.Errorf("failed to read JSON file: %w", err)
}

var input CleanDataInput
err = json.Unmarshal(bytes, &input)
if err != nil {
return CleanDataInput{}, fmt.Errorf("failed to unmarshal JSON data: %w", err)
}

return input, nil
}

// Execute executes the derived execution for the data cleansing task
func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error {
for _, job := range jobs {
if e.Task == taskDataCleansing {
cleanDataInput, err := FetchJSONInput("pkg/component/operator/text/v0/config/tasks.json") // Replace with your actual file path
if err != nil {
job.Error.Error(ctx, err)
job.Error.Error(ctx, fmt.Errorf("failed to fetch input data for cleansing: %w", err))
continue
}
err = job.Output.WriteData(ctx, outputStruct)

// Perform data cleansing
cleanedDataOutput := CleanData(cleanDataInput)

// Define a chunk size; adjust as needed based on your requirements
chunkSize := 100
chunkedOutputs := CleanChunkedData(cleanDataInput, chunkSize)

// Write the cleaned output back to the job output
err = job.Output.WriteData(ctx, cleanedDataOutput)
if err != nil {
job.Error.Error(ctx, err)
job.Error.Error(ctx, fmt.Errorf("failed to write cleaned output data: %w", err))
continue
}
default:

// Handle the chunked outputs if needed
for _, chunk := range chunkedOutputs {
err = job.Output.WriteData(ctx, chunk)
if err != nil {
job.Error.Error(ctx, fmt.Errorf("failed to write chunked cleaned output data: %w", err))
continue
}
}
} else {
job.Error.Error(ctx, fmt.Errorf("not supported task: %s", e.Task))
continue
}
Expand Down
Loading
Loading