Skip to content

Commit

Permalink
feat: add connection creation endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jvallesm committed Aug 30, 2024
1 parent 6b05c6d commit 3e3eb11
Show file tree
Hide file tree
Showing 20 changed files with 1,016 additions and 106 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# version
GOLANG_VERSION=1.22.5
K6_VERSION=0.42.0

# service
SERVICE_NAME=pipeline-backend
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ WORKDIR /${SERVICE_NAME}

# -- install 3rd-party

ARG TARGETOS TARGETARCH K6_VERSION
ARG TARGETOS TARGETARCH

# Install Python, create virtual environment, and install pdfplumber
RUN apt update && \
Expand Down Expand Up @@ -34,8 +34,8 @@ RUN npm install -g @opendocsg/pdf2md
RUN --mount=target=. --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOOS=$TARGETOS GOARCH=$TARGETARCH go install github.com/cosmtrek/[email protected]

# k6
ADD https://github.com/grafana/k6/releases/download/v${K6_VERSION}/k6-v${K6_VERSION}-linux-$TARGETARCH.tar.gz k6-v${K6_VERSION}-linux-$TARGETARCH.tar.gz
RUN tar -xf k6-v${K6_VERSION}-linux-$TARGETARCH.tar.gz --strip-components 1 -C /usr/bin
RUN go install go.k6.io/xk6/cmd/[email protected]
RUN xk6 build --with github.com/grafana/xk6-sql --output /usr/bin/k6

# -- set up Go

Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ build: ## Build dev docker image
@docker build \
--build-arg SERVICE_NAME=${SERVICE_NAME} \
--build-arg GOLANG_VERSION=${GOLANG_VERSION} \
--build-arg K6_VERSION=${K6_VERSION} \
-f Dockerfile.dev -t instill/${SERVICE_NAME}:dev .

.PHONY: run-dev-services
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ database:
host: pg-sql
port: 5432
name: pipeline
version: 25
version: 26
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down
7 changes: 6 additions & 1 deletion integration-test/pipeline/const.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sql from 'k6/x/sql';
import { uuidv4 } from "https://jslib.k6.io/k6-utils/1.4.0/index.js";
import encoding from "k6/encoding";

Expand Down Expand Up @@ -27,9 +28,10 @@ if (__ENV.API_GATEWAY_PROTOCOL) {
export const pipelinePrivateHost = `http://pipeline-backend:3081`;
export const pipelinePublicHost = apiGatewayMode ? `${proto}://${__ENV.API_GATEWAY_URL}` : `http://api-gateway:8080`
export const mgmtPublicHost = apiGatewayMode ? `${proto}://${__ENV.API_GATEWAY_URL}` : `http://api-gateway:8080`
export const pipelineGRPCPrivateHost = apiGatewayMode ? `${__ENV.API_GATEWAY_URL}` : `pipeline-backend:3081`;
export const pipelineGRPCPrivateHost = `pipeline-backend:3081`;
export const pipelineGRPCPublicHost = apiGatewayMode ? `${__ENV.API_GATEWAY_URL}` : `api-gateway:8080`;
export const mgmtGRPCPublicHost = apiGatewayMode ? `${__ENV.API_GATEWAY_URL}` : `api-gateway:8080`;
export const mgmtGRPCPrivateHost = `mgmt-backend:3084`;

export const dogImg = encoding.b64encode(
open(`${__ENV.TEST_FOLDER_ABS_PATH}/integration-test/data/dog.jpg`, "b")
Expand Down Expand Up @@ -141,3 +143,6 @@ export const simplePayload = {
},
],
};

// TODO read from env
export const db = sql.open('postgres', 'postgres://postgres:password@pg-sql:5432/pipeline?sslmode=disable');
34 changes: 32 additions & 2 deletions integration-test/pipeline/rest-integration.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import http from "k6/http";
import { check, group } from "k6";
import { randomString } from "https://jslib.k6.io/k6-utils/1.1.0/index.js";

import {
pipelinePublicHost,
defaultUsername
} from "./const.js";

import { pipelinePublicHost } from "./const.js";
import { deepEqual } from "./helper.js";

export function CheckIntegrations() {
Expand Down Expand Up @@ -69,7 +74,8 @@ export function CheckIntegrations() {
check(http.request("GET", `${pipelinePublicHost}/v1beta/integrations?pageSize=2&pageToken=${tokenPageTwo}`, null, null), {
[`GET /v1beta/integrations?pageSize=2&pageToken=${tokenPageTwo} response status is 200`]: (r) => r.status === 200,
[`GET /v1beta/integrations?pageSize=2&pageToken=${tokenPageTwo} has page size 2"`]: (r) => r.json().integrations.length === 2,
[`GET /v1beta/integrations?pageSize=2&pageToken=${tokenPageTwo} has different elements than page 1"`]: (r) => r.json().integrations[0].id != firstPage.json().integrations[0].id,
[`GET /v1beta/integrations?pageSize=2&pageToken=${tokenPageTwo} has different elements than page 1"`]: (r) =>
r.json().integrations[0].id != firstPage.json().integrations[0].id,
});

// Filter featured
Expand All @@ -96,3 +102,27 @@ export function CheckIntegrations() {
});
});
}

export function CheckConnections(data) {
group("Integration API: Create connection", () => {
var okReq = http.request(
"POST",
`${pipelinePublicHost}/v1beta/namespaces/${defaultUsername}/connections`,
JSON.stringify({
id: randomString(32),
integrationId: "bar",
method: "METHOD_DICTIONARY",
setup: {},
}),
data.header
);
check(okReq, {
[`POST /v1beta/namespaces/${defaultUsername}/connections response status is 201`]: (r) =>
r.status === 201,
[`POST /v1beta/namespaces/${defaultUsername}/connections has a UID`]: (r) =>
r.json().connection.uid.length > 0,
[`POST /v1beta/namespaces/${defaultUsername}/connections has a creation time`]: (r) =>
new Date(r.json().connection.createTime).getTime() > new Date().setTime(0),
});
});
}
30 changes: 26 additions & 4 deletions integration-test/pipeline/rest.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import http from "k6/http";
import grpc from "k6/net/grpc";

import {
check,
group,
} from "k6";

import {
pipelinePublicHost,
} from "./const.js";
import { pipelinePublicHost } from "./const.js";

import * as componentDefinition from "./rest-component-definition.js";
import * as connectorDefinition from "./rest-connector-definition.js";
Expand All @@ -20,6 +19,12 @@ import * as pipelinePrivate from './rest-pipeline-private.js';
import * as trigger from './rest-trigger.js';
import * as triggerAsync from './rest-trigger-async.js';

const mgmtPrivateClient = new grpc.Client();
mgmtPrivateClient.load(
["../proto/core/mgmt/v1beta"],
"mgmt_private_service.proto"
);

export let options = {
setupTimeout: '300s',
insecureSkipTLSVerify: true,
Expand Down Expand Up @@ -96,16 +101,33 @@ export default function (data) {
componentDefinition.CheckList(data);

integration.CheckIntegrations();
integration.CheckConnections(data);
}
}

export function teardown(data) {

group("Pipeline API: Delete all pipelines created by this test", () => {
for (const pipeline of http.request("GET", `${pipelinePublicHost}/v1beta/${constant.namespace}/pipelines?pageSize=100`, null, data.header).json("pipelines")) {
check(http.request("DELETE", `${pipelinePublicHost}/v1beta/${constant.namespace}/pipelines/${pipeline.id}`, null, data.header), {
[`DELETE /v1beta/${constant.namespace}/pipelines response status is 204`]: (r) => r.status === 204,
});
}
});

group("Integration API: Delete all connections created by this test", () => {
mgmtPrivateClient.connect(constant.mgmtGRPCPrivateHost, {
plaintext: true,
timeout: "300s",
});

var namespaceCheck = mgmtPrivateClient.invoke(
"core.mgmt.v1beta.MgmtPrivateService/CheckNamespaceAdmin",
{ id: constant.defaultUsername },
{}
);
mgmtPrivateClient.close();

constant.db.exec(`DELETE FROM connection WHERE namespace_uid = '${namespaceCheck.message.uid}';`);//1b8d3394-aaef-4321-94b3-84d2b153e4e6';`); // TODO read from mgmt
constant.db.close();
});
}
24 changes: 24 additions & 0 deletions pkg/datamodel/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,27 @@ type Secret struct {
NamespaceID string `gorm:"type:namespace_id"`
NamespaceType string `gorm:"type:namespace_type"`
}

// ConnectionMethod is an alias type for the proto enum that allows us to use its string value in the database.
type ConnectionMethod pb.Role

// Scan function for custom GORM type ConnectionMethod
func (m *ConnectionMethod) Scan(value interface{}) error {
*m = ConnectionMethod(pb.Connection_Method_value[value.(string)])
return nil
}

// Value function for custom GORM type ConnectionMethod
func (m ConnectionMethod) Value() (driver.Value, error) {
return pb.Connection_Method(m).String(), nil
}

// Connection is the data model for the `integration` table
type Connection struct {
BaseDynamic
ID string
NamespaceUID uuid.UUID
IntegrationUID uuid.UUID
Method ConnectionMethod
Setup datatypes.JSON `gorm:"type:jsonb"`
}
8 changes: 8 additions & 0 deletions pkg/db/migration/000026_add_connections.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

DROP INDEX IF EXISTS unique_connection_id_namespace;
DROP TABLE IF EXISTS connection;
DROP TYPE valid_connection_method;

COMMIT;

22 changes: 22 additions & 0 deletions pkg/db/migration/000026_add_connections.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
BEGIN;

CREATE TYPE valid_connection_method AS ENUM (
'METHOD_DICTIONARY',
'METHOD_OAUTH'
);

CREATE TABLE IF NOT EXISTS connection (
uid UUID PRIMARY KEY,
id VARCHAR(255) NOT NULL,
namespace_uid UUID NOT NULL,
integration_uid UUID NOT NULL REFERENCES component_definition_index,
method valid_connection_method NOT NULL,
setup JSONB NOT NULL,
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMPTZ
);

CREATE UNIQUE INDEX unique_connection_id_namespace ON connection (id, namespace_uid) WHERE delete_time IS NULL;

COMMIT;
5 changes: 5 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package errors

import (
"fmt"

"github.com/instill-ai/x/errmsg"
)

var (
Expand All @@ -24,4 +26,7 @@ var (
// ErrUnauthorized is used when a request can't be performed due to
// insufficient permissions.
ErrUnauthorized = fmt.Errorf("unauthorized")
// ErrAlreadyExists is used when a resource can't be created because it
// already exists.
ErrAlreadyExists = errmsg.AddMessage(fmt.Errorf("resource already exists"), "Resource already exists.")
)
35 changes: 35 additions & 0 deletions pkg/handler/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package handler

import (
"context"
"net/http"
"strconv"

"github.com/instill-ai/pipeline-backend/pkg/logger"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)
Expand Down Expand Up @@ -46,3 +50,34 @@ func (h *PublicHandler) ListIntegrations(ctx context.Context, req *pb.ListIntegr
logger.Info("ListIntegrations")
return resp, nil
}

// CreateNamespaceConnection creates a connection under the ownership of
// a namespace.
func (h *PublicHandler) CreateNamespaceConnection(ctx context.Context, req *pb.CreateNamespaceConnectionRequest) (*pb.CreateNamespaceConnectionResponse, error) {
ctx, span := tracer.Start(ctx, "CreateNamespaceConnection", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logger, _ := logger.GetZapLogger(ctx)

if err := authenticateUser(ctx, false); err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

conn, err := h.service.CreateNamespaceConnection(ctx, req.GetConnection())
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

// Manually set the custom header to have a StatusCreated http response for
// REST endpoint.
err = grpc.SetHeader(ctx, metadata.Pairs("x-http-code", strconv.Itoa(http.StatusCreated)))
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

logger.Info("CreateNamespaceConnection")
return &pb.CreateNamespaceConnectionResponse{Connection: conn}, nil
}
7 changes: 1 addition & 6 deletions pkg/middleware/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"gorm.io/gorm"

grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
Expand Down Expand Up @@ -83,14 +82,10 @@ func AsGRPCError(err error) error {

var code codes.Code
switch {
case
errors.Is(err, gorm.ErrDuplicatedKey),
errors.Is(err, repository.ErrNameExists):

case errors.Is(err, errdomain.ErrAlreadyExists):
code = codes.AlreadyExists
case
errors.Is(err, errdomain.ErrNotFound),
errors.Is(err, gorm.ErrRecordNotFound),
errors.Is(err, repository.ErrNoDataDeleted),
errors.Is(err, repository.ErrNoDataUpdated),
errors.Is(err, acl.ErrMembershipNotFound):
Expand Down
35 changes: 14 additions & 21 deletions pkg/middleware/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"fmt"
"testing"

qt "github.com/frankban/quicktest"
errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"
"github.com/instill-ai/x/errmsg"
"github.com/jackc/pgconn"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gorm.io/gorm"

qt "github.com/frankban/quicktest"

"github.com/instill-ai/x/errmsg"

errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"
)

func TestAsGRPCError(t *testing.T) {
Expand Down Expand Up @@ -38,23 +40,8 @@ func TestAsGRPCError(t *testing.T) {
wantMessage: ".*FATAL.*connection_failure.*",
},
{
name: "pq unique constraint",
in: &pgconn.PgError{
Severity: "FATAL",
Code: "23505",
Message: "unique_violation",
Detail: "unique_violation",
ConstraintName: "idx_mytable_mycolumn",
},
wantCode: codes.Unknown,
wantMessage: ".*FATAL.*unique_violation.*",
},
{
name: "with end-user message",
in: errmsg.AddMessage(
fmt.Errorf("already exists: %w", gorm.ErrDuplicatedKey),
"Resource already exists.",
),
name: "resource exists",
in: errdomain.ErrAlreadyExists,
wantCode: codes.AlreadyExists,
wantMessage: "Resource already exists.",
},
Expand All @@ -73,6 +60,12 @@ func TestAsGRPCError(t *testing.T) {
wantCode: codes.FailedPrecondition,
wantMessage: "Invalid recipe in pipeline",
},
{
name: "not found",
in: fmt.Errorf("finding item: %w", errdomain.ErrNotFound),
wantCode: codes.NotFound,
wantMessage: "finding item: not found",
},
{
name: "unauthorized",
in: fmt.Errorf("checking requester permission: %w", errdomain.ErrUnauthorized),
Expand Down
Loading

0 comments on commit 3e3eb11

Please sign in to comment.