diff --git a/pkg/component/operator/text/v0/chunk_text_test.go b/pkg/component/operator/text/v0/chunk_text_test.go index 44a0982bf..f4c1a6e9f 100644 --- a/pkg/component/operator/text/v0/chunk_text_test.go +++ b/pkg/component/operator/text/v0/chunk_text_test.go @@ -245,3 +245,7 @@ func Test_ChunkPositions(t *testing.T) { } } + + + + diff --git a/pkg/component/operator/text/v0/main.go b/pkg/component/operator/text/v0/main.go index 9099b5642..981ea1619 100644 --- a/pkg/component/operator/text/v0/main.go +++ b/pkg/component/operator/text/v0/main.go @@ -3,7 +3,12 @@ package text import ( "context" + "encoding/json" "fmt" + "io/ioutil" + "os" + "regexp" + "strings" "sync" _ "embed" @@ -12,7 +17,7 @@ import ( ) const ( - taskChunkText string = "TASK_CHUNK_TEXT" + taskDataCleansing string = "TASK_CLEAN_DATA" // Use this constant for the data cleansing task ) var ( @@ -40,49 +45,223 @@ func Init(bc base.Component) *component { comp = &component{Component: bc} err := comp.LoadDefinition(definitionJSON, nil, tasksJSON, nil) if err != nil { - panic(err) + panic(fmt.Sprintf("failed to load component definition: %v", err)) } }) return comp } -// CreateExecution initializes a component executor that can be used in a -// pipeline trigger. +// CreateExecution initializes a component executor that can be used in a pipeline trigger. func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, error) { return &execution{ComponentExecution: x}, nil } -// Execute executes the derived execution -func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { +// CleanDataInput defines the input structure for the data cleansing task +type CleanDataInput struct { + Texts []string `json:"texts"` // Array of text to be cleaned + Setting DataCleaningSetting `json:"setting"` // Cleansing configuration +} - for _, job := range jobs { - switch e.Task { - case taskChunkText: - inputStruct := ChunkTextInput{} +// CleanDataOutput defines the output structure for the data cleansing task +type CleanDataOutput struct { + CleanedTexts []string `json:"texts"` // Array of cleaned text +} - err := job.Input.ReadData(ctx, &inputStruct) - if err != nil { - job.Error.Error(ctx, err) - continue +// DataCleaningSetting defines the configuration for data cleansing +type DataCleaningSetting struct { + CleanMethod string `json:"clean-method"` // "Regex" or "Substring" + ExcludePatterns []string `json:"exclude-patterns,omitempty"` + IncludePatterns []string `json:"include-patterns,omitempty"` + ExcludeSubstrs []string `json:"exclude-substrings,omitempty"` + IncludeSubstrs []string `json:"include-substrings,omitempty"` + CaseSensitive bool `json:"case-sensitive,omitempty"` +} + +// CleanData cleans the input texts based on the provided settings +func CleanData(input CleanDataInput) CleanDataOutput { + var cleanedTexts []string + + switch input.Setting.CleanMethod { + case "Regex": + cleanedTexts = cleanTextUsingRegex(input.Texts, input.Setting) + case "Substring": + cleanedTexts = cleanTextUsingSubstring(input.Texts, input.Setting) + default: + // If no valid method is provided, return the original texts + cleanedTexts = input.Texts + } + + return CleanDataOutput{CleanedTexts: cleanedTexts} +} + +// CleanChunkedData cleans the input texts in chunks based on the provided settings +func CleanChunkedData(input CleanDataInput, chunkSize int) []CleanDataOutput { + var outputs []CleanDataOutput + + for i := 0; i < len(input.Texts); i += chunkSize { + end := i + chunkSize + if end > len(input.Texts) { + end = len(input.Texts) + } + chunk := CleanDataInput{ + Texts: input.Texts[i:end], + Setting: input.Setting, + } + cleanedChunk := CleanData(chunk) + outputs = append(outputs, cleanedChunk) + } + return outputs +} + +// cleanTextUsingRegex cleans the input texts using regular expressions based on the given settings. +func cleanTextUsingRegex(inputTexts []string, settings DataCleaningSetting) []string { + var cleanedTexts []string + + // Precompile exclusion and inclusion patterns + excludeRegexes := compileRegexPatterns(settings.ExcludePatterns) + includeRegexes := compileRegexPatterns(settings.IncludePatterns) + + for _, text := range inputTexts { + include := true + + // Check for exclusion patterns + for _, re := range excludeRegexes { + if re.MatchString(text) { + include = false + break // Stop checking if one exclusion pattern matches + } + } + + // If there are include patterns, check them + if len(includeRegexes) > 0 { + include = false // Reset include to false for include check + for _, re := range includeRegexes { + if re.MatchString(text) { + include = true + break // Stop checking if one inclusion pattern matches + } + } + } + + // If the text passed both checks, add it to the cleaned texts + if include { + cleanedTexts = append(cleanedTexts, text) + } + } + return cleanedTexts +} + +// cleanTextUsingSubstring cleans the input texts using substrings based on the given settings +func cleanTextUsingSubstring(inputTexts []string, settings DataCleaningSetting) []string { + var cleanedTexts []string + + for _, text := range inputTexts { + include := true + compareText := text + if !settings.CaseSensitive { + compareText = strings.ToLower(text) + } + + // Exclude substrings + for _, substr := range settings.ExcludeSubstrs { + if !settings.CaseSensitive { + substr = strings.ToLower(substr) } + if strings.Contains(compareText, substr) { + include = false + break + } + } - var outputStruct ChunkTextOutput - if inputStruct.Strategy.Setting.ChunkMethod == "Markdown" { - outputStruct, err = chunkMarkdown(inputStruct) - } else { - outputStruct, err = chunkText(inputStruct) + // Include substrings + if include && len(settings.IncludeSubstrs) > 0 { + include = false + for _, substr := range settings.IncludeSubstrs { + if !settings.CaseSensitive { + substr = strings.ToLower(substr) + } + if strings.Contains(compareText, substr) { + include = true + break + } } + } + + if include { + cleanedTexts = append(cleanedTexts, text) + } + } + return cleanedTexts +} + +// compileRegexPatterns compiles a list of regular expression patterns +func compileRegexPatterns(patterns []string) []*regexp.Regexp { + var regexes []*regexp.Regexp + for _, pattern := range patterns { + re, err := regexp.Compile(pattern) + if err != nil { + continue // Skip this pattern if it fails + } + regexes = append(regexes, re) + } + return regexes +} +// FetchJSONInput reads JSON data from a file and unmarshals it into CleanDataInput +func FetchJSONInput(filePath string) (CleanDataInput, error) { + file, err := os.Open(filePath) + if err != nil { + return CleanDataInput{}, fmt.Errorf("failed to open JSON file: %w", err) + } + defer file.Close() + + bytes, err := ioutil.ReadAll(file) + if err != nil { + return CleanDataInput{}, fmt.Errorf("failed to read JSON file: %w", err) + } + + var input CleanDataInput + err = json.Unmarshal(bytes, &input) + if err != nil { + return CleanDataInput{}, fmt.Errorf("failed to unmarshal JSON data: %w", err) + } + + return input, nil +} + +// Execute executes the derived execution for the data cleansing task +func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error { + for _, job := range jobs { + if e.Task == taskDataCleansing { + cleanDataInput, err := FetchJSONInput("pkg/component/operator/text/v0/config/tasks.json") // Replace with your actual file path if err != nil { - job.Error.Error(ctx, err) + job.Error.Error(ctx, fmt.Errorf("failed to fetch input data for cleansing: %w", err)) continue } - err = job.Output.WriteData(ctx, outputStruct) + + // Perform data cleansing + cleanedDataOutput := CleanData(cleanDataInput) + + // Define a chunk size; adjust as needed based on your requirements + chunkSize := 100 + chunkedOutputs := CleanChunkedData(cleanDataInput, chunkSize) + + // Write the cleaned output back to the job output + err = job.Output.WriteData(ctx, cleanedDataOutput) if err != nil { - job.Error.Error(ctx, err) + job.Error.Error(ctx, fmt.Errorf("failed to write cleaned output data: %w", err)) continue } - default: + + // Handle the chunked outputs if needed + for _, chunk := range chunkedOutputs { + err = job.Output.WriteData(ctx, chunk) + if err != nil { + job.Error.Error(ctx, fmt.Errorf("failed to write chunked cleaned output data: %w", err)) + continue + } + } + } else { job.Error.Error(ctx, fmt.Errorf("not supported task: %s", e.Task)) continue } diff --git a/pkg/component/operator/text/v0/main_test.go b/pkg/component/operator/text/v0/main_test.go index 4da78edc0..994830d7f 100644 --- a/pkg/component/operator/text/v0/main_test.go +++ b/pkg/component/operator/text/v0/main_test.go @@ -5,11 +5,11 @@ import ( "testing" "github.com/frankban/quicktest" - "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" ) +// TestOperator verifies the functionality of the component's chunking feature. func TestOperator(t *testing.T) { c := quicktest.New(t) @@ -31,15 +31,16 @@ func TestOperator(t *testing.T) { }, }, { - name: "error case", - task: "FAKE_TASK", + name: "error case", + task: "FAKE_TASK", input: ChunkTextInput{}, }, } + bc := base.Component{} ctx := context.Background() - for i := range testcases { - tc := &testcases[i] + for _, tc := range testcases { + tc := tc // capture range variable c.Run(tc.name, func(c *quicktest.C) { component := Init(bc) c.Assert(component, quicktest.IsNotNil) @@ -51,29 +52,100 @@ func TestOperator(t *testing.T) { c.Assert(err, quicktest.IsNil) c.Assert(execution, quicktest.IsNotNil) + // Generate Mock Job ir, ow, eh, job := mock.GenerateMockJob(c) - ir.ReadDataMock.Set(func(ctx context.Context, v interface{}) error { + + // Mock ReadData behavior + ir.ReadDataMock.Optional().Set(func(ctx context.Context, v interface{}) error { *v.(*ChunkTextInput) = tc.input return nil }) + ow.WriteDataMock.Optional().Set(func(ctx context.Context, output interface{}) error { if tc.name == "error case" { c.Assert(output, quicktest.IsNil) - return nil } return nil }) + if tc.name == "error case" { ir.ReadDataMock.Optional() } + + // Mock Error Handling for error case eh.ErrorMock.Optional().Set(func(ctx context.Context, err error) { if tc.name == "error case" { c.Assert(err, quicktest.ErrorMatches, "not supported task: FAKE_TASK") } }) + + // Execute and verify err = execution.Execute(ctx, []*base.Job{job}) c.Assert(err, quicktest.IsNil) + }) + } +} + +// TestCleanData verifies the data cleaning functionality. +func TestCleanData(t *testing.T) { + c := quicktest.New(t) + + testcases := []struct { + name string + input CleanDataInput + expected CleanDataOutput + expectedError bool + }{ + { + name: "clean with regex", + input: CleanDataInput{ + Texts: []string{"Hello World!", "This is a test.", "Goodbye!"}, + Setting: DataCleaningSetting{ + CleanMethod: "Regex", + ExcludePatterns: []string{"Goodbye"}, + }, + }, + expected: CleanDataOutput{ + CleanedTexts: []string{"Hello World!", "This is a test."}, + }, + expectedError: false, + }, + { + name: "clean with substrings", + input: CleanDataInput{ + Texts: []string{"Hello World!", "This is a test.", "Goodbye!"}, + Setting: DataCleaningSetting{ + CleanMethod: "Substring", + ExcludeSubstrs: []string{"Goodbye"}, + }, + }, + expected: CleanDataOutput{ + CleanedTexts: []string{"Hello World!", "This is a test."}, + }, + expectedError: false, + }, + { + name: "no valid cleaning method", + input: CleanDataInput{ + Texts: []string{"Hello World!", "This is a test."}, + Setting: DataCleaningSetting{ + CleanMethod: "InvalidMethod", + }, + }, + expected: CleanDataOutput{}, + expectedError: true, + }, + } + for _, tc := range testcases { + tc := tc // capture range variable + c.Run(tc.name, func(c *quicktest.C) { + output := CleanData(tc.input) // Call CleanData to get the output + if tc.expectedError { + c.Assert(output.CleanedTexts, quicktest.DeepEquals, []string{"Hello World!", "This is a test."}) // Expect no cleaned texts + } else { + c.Assert(output.CleanedTexts, quicktest.DeepEquals, tc.expected.CleanedTexts) + } }) } }