Skip to content

Commit

Permalink
Merge branch 'lavanet:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
dogonovm authored Oct 2, 2024
2 parents ba20874 + 7dda9b6 commit c2df302
Show file tree
Hide file tree
Showing 67 changed files with 2,603 additions and 664 deletions.
7 changes: 6 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ go.work.sum

Dockerfile
docker/docker-compose.*
cmd/**/Dockerfile
cmd/**/Dockerfile

.dockerignore
.gcloudignore

Makefile
4 changes: 3 additions & 1 deletion .github/workflows/lava.yml
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ jobs:
contents: write
packages: write
id-token: write
needs: [test-consensus, test-protocol]
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -423,6 +422,9 @@ jobs:
uses: docker/build-push-action@v5
continue-on-error: true
with:
provenance: false
sbom: false
context: .
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
file: cmd/${{ matrix.binary }}/Dockerfile
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish Lava Release
on:
push:
tags:
- 'v[0-9]+.[0-9]+.[0-9]+'
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)?'
workflow_dispatch:
inputs:
release_tag:
Expand Down
7 changes: 4 additions & 3 deletions cmd/lavad/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ RUN apk add --no-cache \

WORKDIR /lava

COPY go.mod go.sum ./

ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/root/go/pkg/mod \
go mod download
--mount=type=bind,source=go.sum,target=go.sum \
--mount=type=bind,source=go.mod,target=go.mod \
go mod download -x

COPY . .

Expand Down
5 changes: 4 additions & 1 deletion cmd/lavad/Dockerfile.Cosmovisor
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ WORKDIR /lava

COPY go.mod go.sum ./

ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/root/go/pkg/mod \
go mod download
--mount=type=bind,source=go.sum,target=go.sum \
--mount=type=bind,source=go.mod,target=go.mod \
go mod download -x

COPY . .

Expand Down
9 changes: 5 additions & 4 deletions cmd/lavap/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ RUN apk add --no-cache \

WORKDIR /lava

COPY go.mod go.sum ./

ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/root/go/pkg/mod \
go mod download

--mount=type=bind,source=go.sum,target=go.sum \
--mount=type=bind,source=go.mod,target=go.mod \
go mod download -x

COPY . .

ARG GIT_VERSION
Expand Down
21 changes: 21 additions & 0 deletions cookbook/specs/solana.json
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,27 @@
"expected_value": "*"
}
]
},
{
"name": "tokens-owner-indexed",
"parse_directive": {
"function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}",
"function_tag": "VERIFICATION",
"result_parsing": {
"parser_arg": [
"0",
"value"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getTokenAccountsByOwner"
},
"values": [
{
"expected_value": "*",
"severity": "Warning"
}
]
}
]
}
Expand Down
10 changes: 5 additions & 5 deletions ecosystem/cache/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,34 +107,34 @@ func (cs *CacheServer) Serve(ctx context.Context,
if strings.HasPrefix(listenAddr, unixPrefix) { // Unix socket
host, port, err := net.SplitHostPort(listenAddr)
if err != nil {
utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock: %v\n", err)
utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock", err)
return
}

syscall.Unlink(port)

addr, err := net.ResolveUnixAddr(host, port)
if err != nil {
utils.LavaFormatFatal("Failed to resolve unix socket address: %v\n", err)
utils.LavaFormatFatal("Failed to resolve unix socket address", err)
return
}

lis, err = net.ListenUnix(host, addr)
if err != nil {
utils.LavaFormatFatal("Faild to listen to unix socket listener: %v\n", err)
utils.LavaFormatFatal("Failed to listen to unix socket listener", err)
return
}

// Set permissions for the Unix socket
err = os.Chmod(port, 0o600)
if err != nil {
utils.LavaFormatFatal("Failed to set permissions for Unix socket: %v\n", err)
utils.LavaFormatFatal("Failed to set permissions for Unix socket", err)
return
}
} else {
lis, err = net.Listen("tcp", listenAddr)
if err != nil {
utils.LavaFormatFatal("Cache server failure setting up TCP listener: %v\n", err)
utils.LavaFormatFatal("Cache server failure setting up TCP listener", err)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/fullstorydev/grpcurl v1.8.5
github.com/goccy/go-json v0.10.2
github.com/gogo/status v1.1.0
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
github.com/itchyny/gojq v0.12.16
github.com/jhump/protoreflect v1.15.1
Expand Down Expand Up @@ -85,7 +86,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
Expand Down
80 changes: 72 additions & 8 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package chainlib
import (
"context"
"strconv"
"sync/atomic"
"time"

gojson "github.com/goccy/go-json"
"github.com/goccy/go-json"
"github.com/gofiber/websocket/v2"
formatter "github.com/lavanet/lava/v3/ecosystem/cache/format"
"github.com/lavanet/lava/v3/protocol/common"
"github.com/lavanet/lava/v3/protocol/metrics"
"github.com/lavanet/lava/v3/utils"
"github.com/lavanet/lava/v3/utils/rand"
spectypes "github.com/lavanet/lava/v3/x/spec/types"
"github.com/tidwall/gjson"
)

var (
WebSocketRateLimit = -1 // rate limit requests per second on websocket connection
WebSocketBanDuration = time.Duration(0) // once rate limit is reached, will not allow new incoming message for a duration
)

type ConsumerWebsocketManager struct {
Expand Down Expand Up @@ -67,6 +74,27 @@ func (cwm *ConsumerWebsocketManager) GetWebSocketConnectionUniqueId(dappId, user
return dappId + "__" + userIp + "__" + cwm.WebsocketConnectionUID
}

func (cwm *ConsumerWebsocketManager) handleRateLimitReached(inpData []byte) ([]byte, error) {
rateLimitError := common.JsonRpcRateLimitError
id := 0
result := gjson.GetBytes(inpData, "id")
switch result.Type {
case gjson.Number:
id = int(result.Int())
case gjson.String:
idParsed, err := strconv.Atoi(result.Raw)
if err == nil {
id = idParsed
}
}
rateLimitError.Id = id
bytesRateLimitError, err := json.Marshal(rateLimitError)
if err != nil {
return []byte{}, utils.LavaFormatError("failed marshalling jsonrpc rate limit error", err)
}
return bytesRateLimitError, nil
}

func (cwm *ConsumerWebsocketManager) ListenToMessages() {
var (
messageType int
Expand Down Expand Up @@ -110,6 +138,33 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}()

// rate limit routine
requestsPerSecond := &atomic.Uint64{}
go func() {
if WebSocketRateLimit <= 0 {
return
}
ticker := time.NewTicker(time.Second) // rate limit per second.
defer ticker.Stop()
for {
select {
case <-webSocketCtx.Done():
return
case <-ticker.C:
// check if rate limit reached, and ban is required
if WebSocketBanDuration > 0 && requestsPerSecond.Load() > uint64(WebSocketRateLimit) {
// wait the ban duration before resetting the store.
select {
case <-webSocketCtx.Done():
return
case <-time.After(WebSocketBanDuration): // just continue
}
}
requestsPerSecond.Store(0)
}
}
}()

for {
startTime := time.Now()
msgSeed := guidString + "_" + strconv.Itoa(rand.Intn(10000000000)) // use message seed with original guid and new int
Expand All @@ -125,6 +180,15 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
break
}

// Check rate limit is met
if WebSocketRateLimit > 0 && requestsPerSecond.Add(1) > uint64(WebSocketRateLimit) {
rateLimitResponse, err := cwm.handleRateLimitReached(msg)
if err == nil {
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: rateLimitResponse}
}
continue
}

dappID, ok := websocketConn.Locals("dapp-id").(string)
if !ok {
// Log and remove the analyze
Expand Down Expand Up @@ -160,18 +224,17 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
continue
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
// check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("error unsubscribing from subscription", err, utils.LogAttr("GUID", webSocketCtx))
if err == common.SubscriptionNotFoundError {
msgData, err := gojson.Marshal(common.JsonRpcSubscriptionNotFoundError)
msgData, err := json.Marshal(common.JsonRpcSubscriptionNotFoundError)
if err != nil {
continue
}

websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: msgData}
}
}
Expand All @@ -189,17 +252,18 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: formatterMsg}
continue
}
continue
}

relayResultReply := relayResult.GetReply()
if relayResultReply != nil {
// No need to verify signature since this is already happening inside the SendParsedRelay flow
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: relayResult.GetReply().Data}
continue
} else {
utils.LavaFormatError("Relay result is nil over websocket normal request flow, should not happen", err, utils.LogAttr("messageType", messageType))
}
utils.LavaFormatError("Relay result is nil over websocket normal request flow, should not happen", err, utils.LogAttr("messageType", messageType))
continue
}
}

Expand All @@ -224,7 +288,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

// Handle the case when the error is a method not found error
if common.APINotSupportedError.Is(err) {
msgData, err := gojson.Marshal(common.JsonRpcMethodNotFoundError)
msgData, err := json.Marshal(common.JsonRpcMethodNotFoundError)
if err != nil {
continue
}
Expand Down
Loading

0 comments on commit c2df302

Please sign in to comment.