Skip to content

Commit

Permalink
force functions and connector using v3 api admin client (#126)
Browse files Browse the repository at this point in the history
* deprecate api_version in pulsar provider

* fix docs

* fix ci

* better err log

* more logs

* print config

* fix client

* fix client in test

---------

Co-authored-by: Benjamin Nelson <[email protected]>
  • Loading branch information
freeznet and illegalnumbers authored Oct 1, 2024
1 parent ded6267 commit 96d1e22
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 58 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ A resource for creating and managing Apache Pulsar Functions.
```hcl
provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}
resource "pulsar_function" "function-1" {
Expand Down Expand Up @@ -462,7 +461,6 @@ A resource for creating and managing Apache Pulsar Sources.
```hcl
provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}
resource "pulsar_source" "source-1" {
Expand Down Expand Up @@ -514,7 +512,6 @@ A resource for creating and managing Apache Pulsar Sinks.
```hcl
provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}
resource "pulsar_sink" "sample-sink-1" {
Expand Down
1 change: 0 additions & 1 deletion examples/functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ terraform {

provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

// Note: function resource requires v3 api.
Expand Down
1 change: 0 additions & 1 deletion examples/sinks/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ terraform {

provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

// Note: sink resource requires v3 api.
Expand Down
1 change: 0 additions & 1 deletion examples/sources/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ terraform {

provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

resource "pulsar_source" "source-1" {
Expand Down
6 changes: 5 additions & 1 deletion pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ import (
)

func getClientFromMeta(meta interface{}) admin.Client {
return meta.(admin.Client)
return meta.(PulsarClientBundle).Client
}

func getV3ClientFromMeta(meta interface{}) admin.Client {
return meta.(PulsarClientBundle).V3Client
}
49 changes: 45 additions & 4 deletions pulsar/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"os"
"strconv"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
pulsaradmin "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
adminconfig "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/pkg/errors"
Expand Down Expand Up @@ -69,6 +70,12 @@ func init() {
}
}

// PulsarClientBundle is a struct that holds the pulsar admin client for both v2 and v3 api versions
type PulsarClientBundle struct {
Client pulsaradmin.Client
V3Client pulsaradmin.Client
}

// Provider returns a schema.Provider
func Provider() *schema.Provider {
provider := &schema.Provider{
Expand Down Expand Up @@ -204,10 +211,17 @@ func providerConfigure(d *schema.ResourceData, tfVersion string) (interface{}, d
return nil, diag.FromErr(fmt.Errorf("ERROR_PULSAR_CONFIG_tls_TRUST_FILE_NOTEXIST: %q", TLSTrustCertsFilePath))
}

config := &config.Config{
configVersion := adminconfig.APIVersion(apiVersion)
// for backward compatibility, if user state api_version as 3
// we will use v2 as the default client version because we have v3 as individual client
if configVersion == adminconfig.V3 {
configVersion = adminconfig.APIVersion(0) // v2 will be the default client version
}

config := &adminconfig.Config{
WebServiceURL: clusterURL,
Token: token,
PulsarAPIVersion: config.APIVersion(apiVersion),
PulsarAPIVersion: configVersion,
TLSTrustCertsFilePath: TLSTrustCertsFilePath,
TLSAllowInsecureConnection: TLSAllowInsecureConnection,
IssuerEndpoint: issuerEndpoint,
Expand All @@ -226,7 +240,34 @@ func providerConfigure(d *schema.ResourceData, tfVersion string) (interface{}, d
return nil, diag.FromErr(err)
}

return client, nil
configV3 := &adminconfig.Config{
WebServiceURL: clusterURL,
Token: token,
PulsarAPIVersion: adminconfig.V3,
TLSTrustCertsFilePath: TLSTrustCertsFilePath,
TLSAllowInsecureConnection: TLSAllowInsecureConnection,
IssuerEndpoint: issuerEndpoint,
ClientID: clientID,
Audience: audience,
Scope: scope,
KeyFile: keyFilePath,
TLSKeyFile: TLSKeyFilePath,
TLSCertFile: TLSCertFilePath,
}

clientV3, err := admin.NewPulsarAdminClient(&admin.PulsarAdminConfig{
Config: configV3,
})
if err != nil {
return nil, diag.FromErr(err)
}

clientBundle := PulsarClientBundle{
Client: client,
V3Client: clientV3,
}

return clientBundle, nil
}

// Exists reports whether the named file or directory exists.
Expand Down
70 changes: 53 additions & 17 deletions pulsar/resource_pulsar_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/hashicorp/terraform-plugin-log/tflog"
Expand Down Expand Up @@ -380,7 +379,7 @@ func resourcePulsarFunction() *schema.Resource {
}

func resourcePulsarFunctionRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

tenant := d.Get(resourceFunctionTenantKey).(string)
namespace := d.Get(resourceFunctionNamespaceKey).(string)
Expand All @@ -396,13 +395,17 @@ func resourcePulsarFunctionRead(ctx context.Context, d *schema.ResourceData, met
return diag.FromErr(errors.Wrapf(err, "failed to get function %s", d.Id()))
}

unmarshalFunctionConfig(functionConfig, d)
err = unmarshalFunctionConfig(functionConfig, d)
if err != nil {
tflog.Debug(ctx, fmt.Sprintf("@@@Read function: %v", err))
return diag.Errorf("ERROR_UNMARSHAL_FUNCTION_CONFIG: %v", err)
}

return nil
}

func resourcePulsarFunctionCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

functionConfig, err := marshalFunctionConfig(d)
if err != nil {
Expand Down Expand Up @@ -434,7 +437,7 @@ func resourcePulsarFunctionCreate(ctx context.Context, d *schema.ResourceData, m
}

func resourcePulsarFunctionUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

functionConfig, err := marshalFunctionConfig(d)
if err != nil {
Expand Down Expand Up @@ -465,7 +468,7 @@ func resourcePulsarFunctionUpdate(ctx context.Context, d *schema.ResourceData, m
}

func resourcePulsarFunctionDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

tenant := d.Get(resourceFunctionTenantKey).(string)
namespace := d.Get(resourceFunctionNamespaceKey).(string)
Expand Down Expand Up @@ -676,19 +679,31 @@ func marshalFunctionConfig(d *schema.ResourceData) (*utils.FunctionConfig, error

func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.ResourceData) error {
if functionConfig.Jar != nil {
d.Set(resourceFunctionJarKey, *functionConfig.Jar)
err := d.Set(resourceFunctionJarKey, *functionConfig.Jar)
if err != nil {
return err
}
}

if functionConfig.Py != nil {
d.Set(resourceFunctionPyKey, *functionConfig.Py)
err := d.Set(resourceFunctionPyKey, *functionConfig.Py)
if err != nil {
return err
}
}

if functionConfig.Go != nil {
d.Set(resourceFunctionGoKey, *functionConfig.Go)
err := d.Set(resourceFunctionGoKey, *functionConfig.Go)
if err != nil {
return err
}
}

if functionConfig.ClassName != "" {
d.Set(resourceFunctionClassNameKey, functionConfig.ClassName)
err := d.Set(resourceFunctionClassNameKey, functionConfig.ClassName)
if err != nil {
return err
}
}

if len(functionConfig.Inputs) != 0 {
Expand All @@ -702,31 +717,52 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso
}

if functionConfig.TopicsPattern != nil {
d.Set(resourceFunctionTopicsPatternKey, *functionConfig.TopicsPattern)
err := d.Set(resourceFunctionTopicsPatternKey, *functionConfig.TopicsPattern)
if err != nil {
return err
}
}

if functionConfig.Parallelism != 0 {
d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
err := d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
if err != nil {
return err
}
}

if functionConfig.Output != "" {
d.Set(resourceFunctionOutputKey, functionConfig.Output)
err := d.Set(resourceFunctionOutputKey, functionConfig.Output)
if err != nil {
return err
}
}

if functionConfig.Parallelism != 0 {
d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
err := d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
if err != nil {
return err
}
}

if functionConfig.ProcessingGuarantees != "" {
d.Set(resourceFunctionProcessingGuaranteesKey, functionConfig.ProcessingGuarantees)
err := d.Set(resourceFunctionProcessingGuaranteesKey, functionConfig.ProcessingGuarantees)
if err != nil {
return err
}
}

if functionConfig.SubName != "" {
d.Set(resourceFunctionSubscriptionNameKey, functionConfig.SubName)
err := d.Set(resourceFunctionSubscriptionNameKey, functionConfig.SubName)
if err != nil {
return err
}
}

if functionConfig.SubscriptionPosition != "" {
d.Set(resourceFunctionSubscriptionPositionKey, functionConfig.SubscriptionPosition)
err := d.Set(resourceFunctionSubscriptionPositionKey, functionConfig.SubscriptionPosition)
if err != nil {
return err
}
}

err := d.Set(resourceFunctionCleanupSubscriptionKey, functionConfig.CleanupSubscription)
Expand Down
11 changes: 6 additions & 5 deletions pulsar/resource_pulsar_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ package pulsar
import (
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
Expand All @@ -37,13 +36,13 @@ func init() {
}

func TestFunction(t *testing.T) {
configBytes, err := ioutil.ReadFile("testdata/function/main.tf")
configBytes, err := os.ReadFile("testdata/function/main.tf")
if err != nil {
t.Fatal(err)
}

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheckWithAPIVersion(t, config.V3) },
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
PreventPostDestroyRefresh: false,
CheckDestroy: testPulsarFunctionDestroy,
Expand All @@ -65,11 +64,13 @@ func TestFunction(t *testing.T) {
if config == nil {
return fmt.Errorf("failed to create %s function", rs.Primary.ID)
}
fmt.Printf("config: %v\n", config)

assert.Equal(t, "function-1", config.Name)
assert.Equal(t, "public", config.Tenant)
assert.Equal(t, "default", config.Namespace)
assert.Equal(t, ProcessingGuaranteesAtLeastOnce, config.ProcessingGuarantees)
assert.NotNil(t, config.TimeoutMs)
assert.Equal(t, int64(6666), *config.TimeoutMs)
assert.NotNil(t, config.Resources)

Expand Down Expand Up @@ -100,7 +101,7 @@ func testPulsarFunctionDestroy(s *terraform.State) error {
}

func getPulsarFunctionByResourceID(id string) (*utils.FunctionConfig, error) {
client := getClientFromMeta(testAccProvider.Meta()).Functions()
client := getV3ClientFromMeta(testAccProvider.Meta()).Functions()

parts := strings.Split(id, "/")
if len(parts) != 3 {
Expand Down
9 changes: 4 additions & 5 deletions pulsar/resource_pulsar_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -360,7 +359,7 @@ func resourcePulsarSink() *schema.Resource {
}

func resourcePulsarSinkCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

sinkConfig, err := marshalSinkConfig(d)
if err != nil {
Expand All @@ -380,7 +379,7 @@ func resourcePulsarSinkCreate(ctx context.Context, d *schema.ResourceData, meta
}

func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

tenant := d.Get(resourceSinkTenantKey).(string)
namespace := d.Get(resourceSinkNamespaceKey).(string)
Expand Down Expand Up @@ -599,7 +598,7 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in
}

func resourcePulsarSinkUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

sinkConfig, err := marshalSinkConfig(d)
if err != nil {
Expand All @@ -620,7 +619,7 @@ func resourcePulsarSinkUpdate(ctx context.Context, d *schema.ResourceData, meta
}

func resourcePulsarSinkDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

tenant := d.Get(resourceSinkTenantKey).(string)
namespace := d.Get(resourceSinkNamespaceKey).(string)
Expand Down
Loading

0 comments on commit 96d1e22

Please sign in to comment.