diff --git a/.github/actions/gen_tag_name/action.yml b/.github/actions/gen_tag_name/action.yml index 85d909a1..36bb09ea 100644 --- a/.github/actions/gen_tag_name/action.yml +++ b/.github/actions/gen_tag_name/action.yml @@ -11,13 +11,13 @@ runs: VMIN=0 VPAT=0 set +o pipefail - VLAST=$(git describe --tags --abbrev=0 --match='v[1-9]*' 2>/dev/null | cut -c2-) + VLAST=$(git describe --tags --abbrev=0 --match='v[1-9]*' refs/remotes/origin/main 2>/dev/null | cut -c2-) [ $VLAST ] && declare $(echo $VLAST | awk -F '.' '{print "VMAJ="$1" VMIN="$2" VPAT="$3}') if [ "$GITHUB_REF_NAME" = "main" ] then VPAT=0 VMIN=$((VMIN+1)) - VFULL=${VMAJ}.${VMIN} + VFULL=${VMAJ}.${VMIN}.${VPAT} VTAG=v$VFULL else MB=$(git merge-base refs/remotes/origin/main HEAD) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5b62cac8..982f6573 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,6 +12,7 @@ on: branches: - main - test + - concom - dev paths: ['.github/workflows/**', '**/Makefile', '**/*.go', '**/*.json', '**/*.yml', '**/*.ts', '**/*.js'] @@ -72,6 +73,7 @@ jobs: --form "ref=${{ env.gitlab_branch }}" \ --form "variables[SOURCE_REPO]=${{ github.repository }}" \ --form "variables[SOURCE_BRANCH]=${{ github.ref_name }}" \ + --form "variables[GITHUB_VFULL]=${{ env.VFULL }}" \ --form "variables[GITHUB_TAG]=${{ env.TAG_NAME }}") # Parse JSON response using jq @@ -150,6 +152,8 @@ jobs: - name: Pack artifacts id: pack_artifacts run: | + VFULL=${VFULL:-0.0.1} + echo "VFULL: $VFULL" ARTIFACT="mor-launch-$TAG_NAME-ubuntu-x64.zip" echo "Artifact: $ARTIFACT" LLAMACPP=llama-b3256-bin-ubuntu-x64.zip @@ -157,9 +161,11 @@ jobs: wget -nv https://github.com/ggerganov/llama.cpp/releases/download/b3256/$LLAMACPP wget -nv https://huggingface.co/TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF/resolve/main/$MODEL unzip -o -j $LLAMACPP build/bin/llama-server - echo '{"run":["./llama-server -m ./'$MODEL'","./proxy-router","./morpheus-ui-${VFULL}-x86_64-linux.AppImage"]}' > mor-launch.json + echo '{"run":["./llama-server -m ./'"$MODEL"'","./proxy-router","./morpheus-ui-'"$VFULL"'-x86_64-linux.AppImage"]}' > mor-launch.json + echo "Contents of mor-launch.json: " + cat mor-launch.json mv ./cli/mor-cli mor-cli - zip -j $ARTIFACT ./LICENSE ./launcher/mor-launch llama-server ./proxy-router/bin/proxy-router .env $MODEL mor-launch.json ./ui-desktop/dist/morpheus-ui-${VFULL}-x86_64-linux.AppImage models-config.json rating-config.json mor-cli + zip -j $ARTIFACT ./LICENSE ./launcher/mor-launch llama-server ./proxy-router/bin/proxy-router .env $MODEL mor-launch.json ./ui-desktop/dist/morpheus-ui-$VFULL-x86_64-linux.AppImage models-config.json rating-config.json mor-cli - name: Upload artifacts uses: actions/upload-artifact@v4 @@ -235,6 +241,8 @@ jobs: wget -nv https://huggingface.co/TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF/resolve/main/$MODEL unzip -o -j $LLAMACPP build/bin/llama-server echo '{"run":["./llama-server -m ./'$MODEL'","./proxy-router","./MorpheusUI.app/Contents/MacOS/MorpheusUI"]}' > mor-launch.json + echo "Contents of mor-launch.json: " + cat mor-launch.json mv ./cli/mor-cli mor-cli unzip ./ui-desktop/dist/morpheus-ui-${VFULL}-x64-mac.zip zip -j $ARTIFACT ./LICENSE ./launcher/mor-launch ./proxy-router/bin/proxy-router .env llama-server $MODEL mor-launch.json models-config.json rating-config.json mor-cli @@ -314,6 +322,8 @@ jobs: wget -nv https://huggingface.co/TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF/resolve/main/$MODEL unzip -o -j $LLAMACPP build/bin/llama-server echo '{"run":["./llama-server -m ./'$MODEL'","./proxy-router","./MorpheusUI.app/Contents/MacOS/MorpheusUI"]}' > mor-launch.json + echo "Contents of mor-launch.json: " + cat mor-launch.json mv ./cli/mor-cli mor-cli unzip ./ui-desktop/dist/morpheus-ui-${VFULL}-arm64-mac.zip zip -j $ARTIFACT ./LICENSE ./launcher/mor-launch ./proxy-router/bin/proxy-router .env llama-server $MODEL mor-launch.json models-config.json rating-config.json mor-cli @@ -389,6 +399,8 @@ jobs: - name: Pack artifacts id: pack_artifacts run: | + VFULL=${VFULL:-0.0.1} + echo "VFULL: $VFULL" ARTIFACT="mor-launch-$TAG_NAME-win-x64.zip" echo "Artifact: $ARTIFACT" LLAMACPP=llama-b3256-bin-win-avx2-x64.zip @@ -396,14 +408,16 @@ jobs: wget -nv https://github.com/ggerganov/llama.cpp/releases/download/b3256/$LLAMACPP wget -nv https://huggingface.co/TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF/resolve/main/$MODEL unzip -o -j $LLAMACPP llama-server.exe llama.dll ggml.dll - echo '{"run":["./llama-server.exe -m ./'$MODEL'","./proxy-router.exe","./morpheus-ui-${VFULL}-x64-win.exe"]}' > mor-launch.json + echo '{"run":["./llama-server.exe -m ./'"$MODEL"'","./proxy-router.exe","./morpheus-ui-'"$VFULL"'-x64-win.exe"]}' > mor-launch.json + echo "Contents of mor-launch.json: " + cat mor-launch.json mv .env .env.tmp sed 's|\./data/|.\\data\\|g' .env.tmp > .env mv ./proxy-router/bin/proxy-router proxy-router.exe mv ./cli/mor-cli mor-cli.exe mv ./launcher/mor-launch mor-launch.exe - mv "./ui-desktop/dist/morpheus-ui-${VFULL}-x64-win" morpheus-ui-${VFULL}-x64-win.exe - 7z a $ARTIFACT LICENSE mor-launch.exe proxy-router.exe .env llama-server.exe llama.dll ggml.dll $MODEL mor-launch.json morpheus-ui-${VFULL}-x64-win.exe models-config.json rating-config.json mor-cli.exe + mv "./ui-desktop/dist/morpheus-ui-$VFULL-x64-win" morpheus-ui-$VFULL-x64-win.exe + 7z a $ARTIFACT LICENSE mor-launch.exe proxy-router.exe .env llama-server.exe llama.dll ggml.dll $MODEL mor-launch.json morpheus-ui-$VFULL-x64-win.exe models-config.json rating-config.json mor-cli.exe - name: Upload artifacts uses: actions/upload-artifact@v4 @@ -412,7 +426,7 @@ jobs: name: mor-launch-win-x64.zip release: - if: ${{ github.repository != 'MorpheusAIs/Morpheus-Lumerin-Node' && (github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/test' )) || github.event.inputs.create_release == 'true' }} + if: ${{ github.repository != 'MorpheusAIs/Morpheus-Lumerin-Node' && (github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/test' || github.ref == 'refs/heads/concom' )) || github.event.inputs.create_release == 'true' }} runs-on: ubuntu-latest needs: - Ubuntu-22-x64 diff --git a/.github/workflows/proxy-router.main.env b/.github/workflows/proxy-router.main.env index 31e5eaa8..d786c36e 100644 --- a/.github/workflows/proxy-router.main.env +++ b/.github/workflows/proxy-router.main.env @@ -23,6 +23,7 @@ PROXY_ADDRESS=0.0.0.0:3333 WEB_ADDRESS=0.0.0.0:8082 WEB_PUBLIC_URL=http://localhost:8082 MODELS_CONFIG_PATH= +RATING_CONFIG_PATH= ETH_NODE_USE_SUBSCRIPTIONS=false ETH_NODE_ADDRESS= ETH_NODE_LEGACY_TX=false diff --git a/.github/workflows/proxy-router.test.env b/.github/workflows/proxy-router.test.env index 69950f53..5adf136f 100644 --- a/.github/workflows/proxy-router.test.env +++ b/.github/workflows/proxy-router.test.env @@ -23,6 +23,7 @@ PROXY_ADDRESS=0.0.0.0:3333 WEB_ADDRESS=0.0.0.0:8082 WEB_PUBLIC_URL=http://localhost:8082 MODELS_CONFIG_PATH= +RATING_CONFIG_PATH= ETH_NODE_USE_SUBSCRIPTIONS=false ETH_NODE_ADDRESS= ETH_NODE_LEGACY_TX=false diff --git a/docs/02a-proxy-router-docker.md b/docs/02a-proxy-router-docker.md new file mode 100644 index 00000000..8446e2de --- /dev/null +++ b/docs/02a-proxy-router-docker.md @@ -0,0 +1,76 @@ + +# Proxy-Router Docker Setup: + +**This document describes setting up the proxy-router component of the Morpheus AI Network in a Docker container ONLY and accessing it via the Swagger API Interface...no GUI or Wallet components are included in this setup** + +## Overview: +* The Proxy-Router is a critical component of the Morpheus AI Network. It is responsible for monitoring the blockchain for events and managing the AI models and providers that are available to the network. +* The Proxy-Router is a standalone application that can be run on any server that has access to the blockchain and the AI models. +* This document walks through using Docker to build and run the proxy-router image on your server. + +## Pre-Requisites: +* Your AI model has been configured, started and made available to the proxy-router server via a private endpoint (IP:PORT or DNS:PORT) eg: `http://mycoolaimodel.domain.com:8080` +* You have an existing funded wallet with MOR and ETH and also have the `private key` for the wallet (this will be needed for the .env file configuration) +* Your proxy-router must have a **publicly accessible endpoint** for the provider (ip:port or fqdn:port no protocol) eg: `mycoolmornode.domain.com:3333` - this will be used when creating the provider on the blockchain +* Docker and Git are installed and current on your server +* The three key configuration files are located in the same directory as the proxy-router code: + * .env + * models-config.json + * rating-config.json + +## Installation & Configuration Steps: +1. Clone the source code from the repository: + * `git clone -b main https://github.com/Lumerin-protocol/Morpheus-Lumerin-Node.git` +1. Change to the proxy-router directory: + * `cd Morpheus-Lumerin-Node\proxy-router` +1. Configure the 3 key files for your environment: + 1. Environment file configuration + 1. Copy the example file to .env: + * Linux/Mac: `cp .env.example .env` + * Windows: `copy .env.example.win .env` + 1. Edit values within the .env as desired + * Add your private key to`WALLET_PRIVATE_KEY=` + * Modify the following values to ensure that those files remain "outside of the running container" for persistence and are mounted by the docker-compose.yml file's `volume` directive + * `MODELS_CONFIG_PATH=/app/data/models-config.json` + * `RATING_CONFIG_PATH=/app/data/rating-config.json` + * `PROXY_STORAGE_PATH=/app/data/` + 1. Choose the **blockchain** you'd like to work on...**Arbitrum MAINNET is the default** + * To operate on the Sepolia Arbitrum TESTNET, + * Uncomment the `TESTNET VALUES` and comment the `MAINNET VALUES` lines & save the file + 1. Models Configuration file + 1. Copy the example file to models-config.json: + * Linux/Mac: `cp models-config.json.example models-config.json` + * Windows: `copy models-config.json.example models-config.json` + 1. Edit the models-config.json file to include the models you'd like to use. + 1. Details here: [models-config.json.md](models-config.json.md) + 1. Once your provider is up and running, deploy a new model and model bid via the API interface (you will need to update the `modelId` for the configuration) + 1. Rating Configuration file + 1. Copy the example file to rating-config.json: + * Linux/Mac: `cp rating-config.json.example rating-config.json` + * Windows: `copy rating-config.json.example rating-config.json` + 1. Edit the rating-config.json file to include the weights and preferred providers you'd like to use. + 1. Details here: [rating-config.json.md](rating-config.json.md) + +## Build the proxy-router Docker Image: +1. Build the Docker image using the provided `docker_build.sh` script + * `./docker_build.sh --build` + * This script pulls the current Git tag and version, builds the docker image with all the port and defaults defined in the docker-compose.yml file + +## Running the proxy-router Docker Container: +1. Run the Docker container using the provided `docker_build.sh` script + * `./docker_build.sh --run` + * This script runs the docker container with the port and volume mappings defined in the docker-compose.yml file + * The proxy-router will start and begin listening for blockchain events + +## Validating Steps: +1. Once the proxy-router is running, you can navigate to the Swagger API Interface (http://localhost:8082/swagger/index.html as example) to validate that the proxy-router is running and listening for blockchain events +1. Once validated, you can move on and create your provider, model and bid on the blockchain [03-provider-offer.md](03-provider-offer.md) + +## NOTES: +* We have created docker-compose.yml, Dockerfile and docker_build.sh scripts to ease configuration and deployment of the proxy-router in a containerized environment +* Use these files as guides for applying to your system needs and configurations, private key, eth_node, ports, endpoints, volumes, .env, models-config.json and rating-config.json will need to be adjusted to your specific needs +* In most cases, the default .env file will work for the proxy-router...In some cases you will want to modify the .env file with advanced capability (log entries, private keys, private endpoints, etc) +* Please see the following for more information on these key config files: + * [proxy-router.all.env](proxy-router.all.env) + * [models-config.json.md](models-config.json.md) + * [rating-config.json.md](rating-config.json.md) \ No newline at end of file diff --git a/proxy-router/.env.example b/proxy-router/.env.example index c15ead37..d7aa6ebe 100644 --- a/proxy-router/.env.example +++ b/proxy-router/.env.example @@ -24,6 +24,7 @@ PROXY_ADDRESS=0.0.0.0:3333 WEB_ADDRESS=0.0.0.0:8082 WEB_PUBLIC_URL=http://localhost:8082 MODELS_CONFIG_PATH= +RATING_CONFIG_PATH= ETH_NODE_USE_SUBSCRIPTIONS=false ETH_NODE_ADDRESS= ETH_NODE_LEGACY_TX=false diff --git a/proxy-router/.env.example.win b/proxy-router/.env.example.win index 45a0ece4..452bd4a6 100644 --- a/proxy-router/.env.example.win +++ b/proxy-router/.env.example.win @@ -24,6 +24,7 @@ PROXY_ADDRESS=0.0.0.0:3333 WEB_ADDRESS=0.0.0.0:8082 WEB_PUBLIC_URL=http://localhost:8082 MODELS_CONFIG_PATH= +RATING_CONFIG_PATH= ETH_NODE_USE_SUBSCRIPTIONS=false ETH_NODE_ADDRESS= ETH_NODE_LEGACY_TX=false diff --git a/proxy-router/.gitignore b/proxy-router/.gitignore index e77303d8..381e8305 100644 --- a/proxy-router/.gitignore +++ b/proxy-router/.gitignore @@ -10,4 +10,14 @@ bin data* -models-config.json \ No newline at end of file +models-config.json +rating-config.json + +#Badger DB +*.vlog +*.mem +DISCARD +KEYREGISTRY +LOCK +MANIFEST +chats/ \ No newline at end of file diff --git a/proxy-router/Dockerfile b/proxy-router/Dockerfile index f3581e27..0e558363 100644 --- a/proxy-router/Dockerfile +++ b/proxy-router/Dockerfile @@ -1,5 +1,5 @@ # Stage 1: Build -FROM golang:1.22.3-alpine as builder +FROM golang:1.22.3-alpine AS builder # Capture the Git tag and commit hash during build ARG TAG_NAME @@ -10,16 +10,11 @@ ENV COMMIT=$COMMIT WORKDIR /app COPY . . -# Build the Go binary (recommended for linux/amd64...for MacARM use buildx) -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ +# Build the Go binary (Docker will build automatically to the OS and Arch that is hosting) +RUN CGO_ENABLED=0 \ TAG_NAME=$TAG_NAME COMMIT=$COMMIT ./build.sh && \ cp /bin/sh /app/sh && chmod +x /app/sh - # Multiplatform Build Notes: -# to support both amd64 and arm64, use Docker’s Buildx to create a multi-architecture image -# docker buildx create --use -# docker buildx build --platform linux/amd64,linux/arm64 -t proxy-router:latest . - # Stage 2: Final Image FROM scratch WORKDIR /app @@ -29,8 +24,8 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /app/bin/proxy-router /usr/bin/ # Optional Copy utilities from busybox image -# COPY --from=busybox /bin /bin -# COPY --from=busybox /lib /lib +COPY --from=busybox /bin /bin +COPY --from=busybox /lib /lib SHELL ["/bin/sh", "-c"] EXPOSE 3333 8082 diff --git a/proxy-router/cmd/main.go b/proxy-router/cmd/main.go index 8465e09c..5556ecfc 100644 --- a/proxy-router/cmd/main.go +++ b/proxy-router/cmd/main.go @@ -86,11 +86,11 @@ func start() error { docs.SwaggerInfo.Host = hostWithoutProtocol } else if cfg.Web.Address != "" { docs.SwaggerInfo.Host = cfg.Web.Address - } else { - docs.SwaggerInfo.Host = "localhost:8082" } docs.SwaggerInfo.Version = config.BuildVersion + docs.SwaggerInfo.Version = config.BuildVersion + log, err := lib.NewLogger(cfg.Log.LevelApp, cfg.Log.Color, cfg.Log.IsProd, cfg.Log.JSON, mainLogFilePath) if err != nil { return err @@ -227,11 +227,6 @@ func start() error { } appLog.Infof("connected to ethereum node: %s, chainID: %d", cfg.Blockchain.EthNodeAddress, chainID) - publicUrl, err := url.Parse(cfg.Web.PublicUrl) - if err != nil { - return err - } - storage := storages.NewStorage(badgerLog, cfg.Proxy.StoragePath) sessionStorage := storages.NewSessionStorage(storage) @@ -246,19 +241,13 @@ func start() error { var logWatcher contracts.LogWatcher if cfg.Blockchain.UseSubscriptions { - logWatcher = contracts.NewLogWatcherSubscription(ethClient, cfg.Blockchain.MaxReconnects, log) + logWatcher = contracts.NewLogWatcherSubscription(ethClient, cfg.Blockchain.MaxReconnects, rpcLog) appLog.Infof("using websocket log subscription for blockchain events") } else { - logWatcher = contracts.NewLogWatcherPolling(ethClient, cfg.Blockchain.PollingInterval, cfg.Blockchain.MaxReconnects, log) + logWatcher = contracts.NewLogWatcherPolling(ethClient, cfg.Blockchain.PollingInterval, cfg.Blockchain.MaxReconnects, rpcLog) appLog.Infof("using polling for blockchain events") } - modelConfigLoader := config.NewModelConfigLoader(cfg.Proxy.ModelsConfigPath, log) - err = modelConfigLoader.Init() - if err != nil { - log.Warnf("failed to load model config: %s, run with empty", err) - } - scorer, err := config.LoadRating(cfg.Proxy.RatingConfigPath, log) if err != nil { return err @@ -271,10 +260,17 @@ func start() error { sessionRouter := registries.NewSessionRouter(*cfg.Marketplace.DiamondContractAddress, ethClient, multicallBackend, log) marketplace := registries.NewMarketplace(*cfg.Marketplace.DiamondContractAddress, ethClient, multicallBackend, log) sessionRepo := sessionrepo.NewSessionRepositoryCached(sessionStorage, sessionRouter, marketplace) - proxyRouterApi := proxyapi.NewProxySender(chainID, publicUrl, wallet, contractLogStorage, sessionStorage, sessionRepo, log) + proxyRouterApi := proxyapi.NewProxySender(chainID, wallet, contractLogStorage, sessionStorage, sessionRepo, log) explorer := blockchainapi.NewExplorerClient(cfg.Blockchain.ExplorerApiUrl, *cfg.Marketplace.MorTokenAddress, cfg.Blockchain.ExplorerRetryDelay, cfg.Blockchain.ExplorerMaxRetries) blockchainApi := blockchainapi.NewBlockchainService(ethClient, multicallBackend, *cfg.Marketplace.DiamondContractAddress, *cfg.Marketplace.MorTokenAddress, explorer, wallet, proxyRouterApi, sessionRepo, scorer, proxyLog, cfg.Blockchain.EthLegacyTx) proxyRouterApi.SetSessionService(blockchainApi) + + modelConfigLoader := config.NewModelConfigLoader(cfg.Proxy.ModelsConfigPath, valid, blockchainApi, &aiengine.ConnectionChecker{}, log) + err = modelConfigLoader.Init() + if err != nil { + log.Warnf("failed to load model config, running with empty: %s", err) + } + aiEngine := aiengine.NewAiEngine(proxyRouterApi, chatStorage, modelConfigLoader, log) eventListener := blockchainapi.NewEventsListener(sessionRepo, sessionRouter, wallet, logWatcher, log) @@ -299,6 +295,8 @@ func start() error { cancel() }() + log.Infof("API docs available at %s/swagger/index.html", cfg.Web.PublicUrl) + proxy := proxyctl.NewProxyCtl(eventListener, wallet, chainID, log, connLog, cfg.Proxy.Address, schedulerLogFactory, sessionStorage, modelConfigLoader, valid, aiEngine, blockchainApi, sessionRepo, sessionExpiryHandler) err = proxy.Run(ctx) diff --git a/proxy-router/docker-compose.yml b/proxy-router/docker-compose.yml index e4beebfa..2819466a 100644 --- a/proxy-router/docker-compose.yml +++ b/proxy-router/docker-compose.yml @@ -1,8 +1,15 @@ services: proxy-router: - build: . + build: + context: . + args: + COMMIT: ${COMMIT:-unknown} + TAG_NAME: ${TAG_NAME:-latest} + image: proxy-router:${TAG_NAME} env_file: - .env ports: - - 8082:8082 - - 3333:3333 \ No newline at end of file + - "8082:8082" + - "3333:3333" + volumes: + - .:/app/data \ No newline at end of file diff --git a/proxy-router/docker_build.sh b/proxy-router/docker_build.sh new file mode 100755 index 00000000..282b98c1 --- /dev/null +++ b/proxy-router/docker_build.sh @@ -0,0 +1,43 @@ +#!/bin/sh + +# This script assumes you've cloned the repository and want to build with proper commit and version numbers +# It will use the latest Git tag as the version number and the latest short commit hash as the commit number +# It also leverages the docker-compose.yml file to build the Docker image + +# Pre-requisites: Docker & Git installed + +# Assumptions: +# - properly formatted .env in current directory +# - properly formatted models-config.json in current directory +# - properly formatted rating-config.json in current directory + +# Check if TAG_NAME is set; if not, use the latest Git tag +if [ -z "$TAG_NAME" ]; then + VLAST=$(git describe --tags --abbrev=0 --match='v[1-9]*' refs/remotes/origin/main 2>/dev/null | cut -c2-) + [ $VLAST ] && declare $(echo $VLAST | awk -F '.' '{print "VMAJ="$1" VMIN="$2" VPAT="$3}') + MB=$(git merge-base refs/remotes/origin/main HEAD) + VPAT=$(git rev-list --count --no-merges ${MB}..HEAD) + TAG_NAME=${VMAJ}.${VMIN}.${VPAT} +fi +VERSION=$TAG_NAME +echo VERSION=$VERSION + +# if commit is not set, use the latest commit +if [ -z "$COMMIT" ]; then + SHORT_COMMIT=$(git rev-parse --short HEAD) +fi +COMMIT=$SHORT_COMMIT +echo COMMIT=$COMMIT +export VERSION COMMIT TAG_NAME + +# Check if the user wants to build or run the Docker image +if [ "$1" = "--build" ]; then + echo "Building Docker image..." + docker-compose build + docker tag proxy-router:$VERSION proxy-router:latest +elif [ "$1" = "--run" ]; then + echo "Running Docker container..." + docker-compose up +else + echo "Usage: $0 [--build | --run]" +fi \ No newline at end of file diff --git a/proxy-router/docs/docs.go b/proxy-router/docs/docs.go index 930af9a0..e6e04388 100644 --- a/proxy-router/docs/docs.go +++ b/proxy-router/docs/docs.go @@ -1192,6 +1192,37 @@ const docTemplate = `{ } } }, + "/proxy/provider/ping": { + "post": { + "description": "sends a ping to the provider on the RPC level", + "produces": [ + "application/json" + ], + "tags": [ + "chat" + ], + "summary": "Ping Provider", + "parameters": [ + { + "description": "Ping Request", + "name": "pingReq", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/proxyapi.PingReq" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/proxyapi.PingRes" + } + } + } + } + }, "/proxy/sessions/initiate": { "post": { "description": "sends a handshake to the provider", @@ -1844,6 +1875,29 @@ const docTemplate = `{ } } }, + "proxyapi.PingReq": { + "type": "object", + "required": [ + "providerAddr", + "providerUrl" + ], + "properties": { + "providerAddr": { + "type": "string" + }, + "providerUrl": { + "type": "string" + } + } + }, + "proxyapi.PingRes": { + "type": "object", + "properties": { + "ping": { + "type": "integer" + } + } + }, "proxyapi.ResultResponse": { "type": "object", "properties": { diff --git a/proxy-router/docs/swagger.json b/proxy-router/docs/swagger.json index 837cebc2..10440a9e 100644 --- a/proxy-router/docs/swagger.json +++ b/proxy-router/docs/swagger.json @@ -1184,6 +1184,37 @@ } } }, + "/proxy/provider/ping": { + "post": { + "description": "sends a ping to the provider on the RPC level", + "produces": [ + "application/json" + ], + "tags": [ + "chat" + ], + "summary": "Ping Provider", + "parameters": [ + { + "description": "Ping Request", + "name": "pingReq", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/proxyapi.PingReq" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/proxyapi.PingRes" + } + } + } + } + }, "/proxy/sessions/initiate": { "post": { "description": "sends a handshake to the provider", @@ -1836,6 +1867,29 @@ } } }, + "proxyapi.PingReq": { + "type": "object", + "required": [ + "providerAddr", + "providerUrl" + ], + "properties": { + "providerAddr": { + "type": "string" + }, + "providerUrl": { + "type": "string" + } + } + }, + "proxyapi.PingRes": { + "type": "object", + "properties": { + "ping": { + "type": "integer" + } + } + }, "proxyapi.ResultResponse": { "type": "object", "properties": { diff --git a/proxy-router/docs/swagger.yaml b/proxy-router/docs/swagger.yaml index c99974f1..e448fe1c 100644 --- a/proxy-router/docs/swagger.yaml +++ b/proxy-router/docs/swagger.yaml @@ -195,6 +195,21 @@ definitions: - spend - user type: object + proxyapi.PingReq: + properties: + providerAddr: + type: string + providerUrl: + type: string + required: + - providerAddr + - providerUrl + type: object + proxyapi.PingRes: + properties: + ping: + type: integer + type: object proxyapi.ResultResponse: properties: result: @@ -1375,6 +1390,26 @@ paths: summary: Healthcheck example tags: - system + /proxy/provider/ping: + post: + description: sends a ping to the provider on the RPC level + parameters: + - description: Ping Request + in: body + name: pingReq + required: true + schema: + $ref: '#/definitions/proxyapi.PingReq' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/proxyapi.PingRes' + summary: Ping Provider + tags: + - chat /proxy/sessions/{id}/providerClaim: post: description: Claim provider balance from session diff --git a/proxy-router/internal/aiengine/ai_engine.go b/proxy-router/internal/aiengine/ai_engine.go index 886e41c3..6063330c 100644 --- a/proxy-router/internal/aiengine/ai_engine.go +++ b/proxy-router/internal/aiengine/ai_engine.go @@ -69,17 +69,13 @@ func (a *AiEngine) GetAdapter(ctx context.Context, chatID, modelID, sessionID co return engine, nil } -func (a *AiEngine) GetModelsConfig() ([]string, []config.ModelConfig) { - return a.modelsConfigLoader.GetAll() -} - func (a *AiEngine) GetLocalModels() ([]LocalModel, error) { models := []LocalModel{} IDs, modelsFromConfig := a.modelsConfigLoader.GetAll() for i, model := range modelsFromConfig { models = append(models, LocalModel{ - Id: IDs[i], + Id: IDs[i].Hex(), Name: model.ModelName, Model: model.ModelName, ApiType: model.ApiType, diff --git a/proxy-router/internal/aiengine/conn_checker.go b/proxy-router/internal/aiengine/conn_checker.go new file mode 100644 index 00000000..9298fe74 --- /dev/null +++ b/proxy-router/internal/aiengine/conn_checker.go @@ -0,0 +1,53 @@ +package aiengine + +import ( + "context" + "errors" + "net" + "net/url" + "time" + + "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/lib" +) + +const ( + TimeoutConnectDefault = 3 * time.Second +) + +var ( + ErrCannotParseURL = errors.New("cannot parse URL") + ErrCannotConnect = errors.New("cannot connect") + ErrConnectTimeout = errors.New("connection timeout") +) + +type ConnectionChecker struct{} + +func (*ConnectionChecker) TryConnect(ctx context.Context, URL string) error { + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeoutCause(ctx, TimeoutConnectDefault, ErrConnectTimeout) + defer cancel() + } + u, err := url.Parse(URL) + if err != nil { + return lib.WrapError(ErrCannotParseURL, err) + } + + var host string + if u.Port() == "" { + host = net.JoinHostPort(u.Hostname(), "443") + } else { + host = u.Host + } + + dialer := net.Dialer{} + + conn, err := dialer.DialContext(ctx, "tcp", host) + if err != nil { + return lib.WrapError(ErrCannotConnect, err) + } + + defer conn.Close() + + return nil +} diff --git a/proxy-router/internal/blockchainapi/service.go b/proxy-router/internal/blockchainapi/service.go index 521000f6..605903ca 100644 --- a/proxy-router/internal/blockchainapi/service.go +++ b/proxy-router/internal/blockchainapi/service.go @@ -1,10 +1,14 @@ package blockchainapi import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "math/big" + "net" + "net/http" "sort" "strconv" "time" @@ -21,7 +25,6 @@ import ( "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/multicall" r "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/registries" sessionrepo "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/session" - "github.com/gin-gonic/gin" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -30,6 +33,9 @@ import ( "github.com/ethereum/go-ethereum/crypto" ) +// basefeeWiggleMultiplier is a multiplier for the basefee to set the maxFeePerGas +const basefeeWiggleMultiplier = 2 + type BlockchainService struct { ethClient i.EthClient providerRegistry *r.ProviderRegistry @@ -134,6 +140,24 @@ func (s *BlockchainService) GetProviders(ctx context.Context, offset *big.Int, l return mapProviders(addrs, providers), nil } +// GetMyAddress returns the provider by its wallet address, returns nil if provider is not registered +func (s *BlockchainService) GetProvider(ctx context.Context, providerAddr common.Address) (*structs.Provider, error) { + provider, err := s.providerRegistry.GetProviderById(ctx, providerAddr) + if err != nil { + return nil, err + } + + if provider.IsDeleted { + return nil, nil + } + + if provider.CreatedAt.Cmp(big.NewInt(0)) == 0 { + return nil, nil + } + + return mapProvider(providerAddr, *provider), nil +} + func (s *BlockchainService) GetAllModels(ctx context.Context) ([]*structs.Model, error) { ids, models, err := s.modelRegistry.GetAllModels(ctx) if err != nil { @@ -179,6 +203,10 @@ func (s *BlockchainService) GetActiveBidsByModel(ctx context.Context, modelId co return mapBids(ids, bids), nil } +func (s *BlockchainService) GetActiveBidsByProviderCount(ctx context.Context, provider common.Address) (*big.Int, error) { + return s.marketplace.GetActiveBidsByProviderCount(ctx, provider) +} + func (s *BlockchainService) GetActiveBidsByProvider(ctx context.Context, provider common.Address, offset *big.Int, limit uint8, order r.Order) ([]*structs.Bid, error) { ids, bids, err := s.marketplace.GetActiveBidsByProvider(ctx, provider, offset, limit, order) if err != nil { @@ -270,6 +298,11 @@ func (s *BlockchainService) OpenSession(ctx context.Context, approval, approvalS return common.Hash{}, lib.WrapError(ErrPrKey, err) } + _, err = s.Approve(ctx, s.diamonContractAddr, stake) + if err != nil { + return common.Hash{}, lib.WrapError(ErrApprove, err) + } + transactOpt, err := s.getTransactOpts(ctx, prKey) if err != nil { return common.Hash{}, lib.WrapError(ErrTxOpts, err) @@ -294,6 +327,11 @@ func (s *BlockchainService) CreateNewProvider(ctx context.Context, stake *lib.Bi return nil, lib.WrapError(ErrTxOpts, err) } + _, err = s.Approve(ctx, s.diamonContractAddr, &stake.Int) + if err != nil { + return nil, lib.WrapError(ErrApprove, err) + } + err = s.providerRegistry.CreateNewProvider(transactOpt, stake, endpoint) if err != nil { return nil, lib.WrapError(ErrSendTx, err) @@ -324,6 +362,11 @@ func (s *BlockchainService) CreateNewModel(ctx context.Context, modelID common.H return nil, lib.WrapError(ErrTxOpts, err) } + _, err = s.Approve(ctx, s.diamonContractAddr, &stake.Int) + if err != nil { + return nil, lib.WrapError(ErrApprove, err) + } + err = s.modelRegistry.CreateNewModel(transactOpt, modelID, ipfsID, fee, stake, name, tags) if err != nil { return nil, lib.WrapError(ErrSendTx, err) @@ -371,7 +414,33 @@ func (s *BlockchainService) DeregisterModel(ctx context.Context, modelId common. return tx, nil } +func (s *BlockchainService) ModelExists(ctx context.Context, modelID common.Hash) (bool, error) { + m, err := s.modelRegistry.GetModelById(ctx, modelID) + + // cannot pull blockchain data + if err != nil { + return false, err + } + + // model never existed + if m.CreatedAt.Cmp(big.NewInt(0)) == 0 { + return false, nil + } + + // model was deleted + if m.IsDeleted { + return false, nil + } + + return true, nil +} + func (s *BlockchainService) CreateNewBid(ctx context.Context, modelID common.Hash, pricePerSecond *lib.BigInt) (*structs.Bid, error) { + fee, err := s.marketplace.GetBidFee(ctx) + if err != nil { + return nil, err + } + prKey, err := s.privateKey.GetPrivateKey() if err != nil { return nil, lib.WrapError(ErrPrKey, err) @@ -382,14 +451,17 @@ func (s *BlockchainService) CreateNewBid(ctx context.Context, modelID common.Has return nil, lib.WrapError(ErrTxOpts, err) } + _, err = s.Approve(ctx, s.diamonContractAddr, fee) + if err != nil { + return nil, lib.WrapError(ErrApprove, err) + } + newBidId, err := s.marketplace.PostModelBid(transactOpt, modelID, &pricePerSecond.Int) if err != nil { return nil, lib.WrapError(ErrSendTx, err) } - s.log.Infof("Created new Bid with Id %s", newBidId) bid, err := s.GetBidByID(ctx, newBidId) - if err != nil { return nil, lib.WrapError(ErrBid, err) } @@ -483,11 +555,11 @@ func (s *BlockchainService) GetSession(ctx context.Context, sessionID common.Has return mapSession(sessionID, *ses, *bid), nil } -func (s *BlockchainService) GetProviderClaimableBalance(ctx *gin.Context, sessionID common.Hash) (*big.Int, error) { +func (s *BlockchainService) GetProviderClaimableBalance(ctx context.Context, sessionID common.Hash) (*big.Int, error) { return s.sessionRouter.GetProviderClaimableBalance(ctx, sessionID) } -func (s *BlockchainService) GetBalance(ctx *gin.Context) (*big.Int, *big.Int, error) { +func (s *BlockchainService) GetBalance(ctx context.Context) (eth *big.Int, mor *big.Int, err error) { prKey, err := s.privateKey.GetPrivateKey() if err != nil { return nil, nil, lib.WrapError(ErrPrKey, err) @@ -511,51 +583,87 @@ func (s *BlockchainService) GetBalance(ctx *gin.Context) (*big.Int, *big.Int, er return ethBalance, morBalance, nil } -func (s *BlockchainService) SendETH(ctx *gin.Context, to common.Address, amount *big.Int) (common.Hash, error) { +func (s *BlockchainService) SendETH(ctx context.Context, to common.Address, amount *big.Int) (common.Hash, error) { + signedTx, err := s.createSignedTransaction(ctx, &types.DynamicFeeTx{ + To: &to, + Value: amount, + }) + + err = s.ethClient.SendTransaction(ctx, signedTx) + if err != nil { + return common.Hash{}, lib.WrapError(ErrSendTx, err) + } + + _, err = bind.WaitMined(ctx, s.ethClient, signedTx) + if err != nil { + return common.Hash{}, lib.WrapError(ErrWaitMined, err) + } + + return signedTx.Hash(), nil +} + +func (s *BlockchainService) createSignedTransaction(ctx context.Context, txdata *types.DynamicFeeTx) (*types.Transaction, error) { prKey, err := s.privateKey.GetPrivateKey() if err != nil { - return common.Hash{}, lib.WrapError(ErrPrKey, err) + return nil, lib.WrapError(ErrPrKey, err) + } + addr, err := lib.PrivKeyBytesToAddr(prKey) + if err != nil { + return nil, err } - transactOpt, err := s.getTransactOpts(ctx, prKey) + gasTipCap, err := s.ethClient.SuggestGasTipCap(ctx) if err != nil { - return common.Hash{}, lib.WrapError(ErrTxOpts, err) + return nil, err } - nonce, err := s.ethClient.PendingNonceAt(ctx, transactOpt.From) + head, err := s.ethClient.HeaderByNumber(ctx, nil) if err != nil { - return common.Hash{}, lib.WrapError(ErrNonce, err) + return nil, err } - estimatedGas, err := s.ethClient.EstimateGas(ctx, ethereum.CallMsg{ - From: transactOpt.From, - To: &to, - Value: amount, - }) + nonce, err := s.ethClient.PendingNonceAt(ctx, addr) if err != nil { - return common.Hash{}, lib.WrapError(ErrEstimateGas, err) + return nil, err } - //TODO: check if this is the right way to calculate gas - gas := float64(estimatedGas) * 1.5 - tx := types.NewTransaction(nonce, to, amount, uint64(gas), transactOpt.GasPrice, nil) - signedTx, err := s.signTx(ctx, tx, prKey) + gasFeeCap := new(big.Int).Add( + gasTipCap, + new(big.Int).Mul(head.BaseFee, big.NewInt(basefeeWiggleMultiplier)), + ) + + gas, err := s.ethClient.EstimateGas(ctx, ethereum.CallMsg{ + From: addr, + To: txdata.To, + Value: txdata.Value, + }) + + chainID, err := s.ethClient.ChainID(ctx) if err != nil { - return common.Hash{}, lib.WrapError(ErrSignTx, err) + return nil, err } - err = s.ethClient.SendTransaction(ctx, signedTx) + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: chainID, + Nonce: nonce, + GasTipCap: gasTipCap, + GasFeeCap: gasFeeCap, + Gas: gas, + To: txdata.To, + Value: txdata.Value, + }) + + privateKey, err := crypto.ToECDSA(prKey) if err != nil { - return common.Hash{}, lib.WrapError(ErrSendTx, err) + return nil, err } - // Wait for the transaction receipt - _, err = bind.WaitMined(ctx, s.ethClient, signedTx) + signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(tx.ChainId()), privateKey) if err != nil { - return common.Hash{}, lib.WrapError(ErrWaitMined, err) + return nil, lib.WrapError(ErrSignTx, err) } - return signedTx.Hash(), nil + return signedTx, nil } func (s *BlockchainService) SendMOR(ctx context.Context, to common.Address, amount *big.Int) (common.Hash, error) { @@ -642,17 +750,17 @@ func (s *BlockchainService) GetTodaysBudget(ctx context.Context) (*big.Int, erro return s.sessionRouter.GetTodaysBudget(ctx, big.NewInt(time.Now().Unix())) } -func (s *BlockchainService) GetSessions(ctx *gin.Context, user, provider common.Address, offset *big.Int, limit uint8, order r.Order) ([]*structs.Session, error) { +func (s *BlockchainService) GetSessions(ctx context.Context, user, provider common.Address, offset *big.Int, limit uint8, order r.Order) ([]*structs.Session, error) { var ( ids [][32]byte sessions []sr.ISessionStorageSession err error ) if (user != common.Address{}) { - ids, sessions, err = s.sessionRouter.GetSessionsByUser(ctx, common.HexToAddress(ctx.Query("user")), offset, limit, order) + ids, sessions, err = s.sessionRouter.GetSessionsByUser(ctx, user, offset, limit, order) } else { // hasProvider - ids, sessions, err = s.sessionRouter.GetSessionsByProvider(ctx, common.HexToAddress(ctx.Query("provider")), offset, limit, order) + ids, sessions, err = s.sessionRouter.GetSessionsByProvider(ctx, provider, offset, limit, order) } if err != nil { return nil, err @@ -738,39 +846,21 @@ func (s *BlockchainService) openSessionByBid(ctx context.Context, bidID common.H return common.Hash{}, lib.WrapError(ErrBudget, err) } - bid, err := s.marketplace.GetBidById(ctx, bidID) + bid, err := s.GetBidByID(ctx, bidID) if err != nil { return common.Hash{}, lib.WrapError(ErrBid, err) } - totalCost := duration.Mul(bid.PricePerSecond, duration) - stake := totalCost.Div(totalCost.Mul(supply, totalCost), budget) - userAddr, err := s.GetMyAddress(ctx) if err != nil { return common.Hash{}, lib.WrapError(ErrMyAddress, err) } if bid.Provider == userAddr { - return common.Hash{}, lib.WrapError(ErrOpenOwnBid, fmt.Errorf("failed to open session")) - } - - provider, err := s.providerRegistry.GetProviderById(ctx, bid.Provider) - if err != nil { - return common.Hash{}, lib.WrapError(ErrProvider, err) - } - - initRes, err := s.proxyService.InitiateSession(ctx, userAddr, bid.Provider, stake, bidID, provider.Endpoint) - if err != nil { - return common.Hash{}, lib.WrapError(ErrInitSession, err) + return common.Hash{}, ErrOpenOwnBid } - _, err = s.Approve(ctx, s.diamonContractAddr, stake) - if err != nil { - return common.Hash{}, lib.WrapError(ErrApprove, err) - } - - return s.OpenSession(ctx, initRes.Approval, initRes.ApprovalSig, stake, false) + return s.tryOpenSession(ctx, bid, duration, supply, budget, userAddr, false, false) } func (s *BlockchainService) OpenSessionByModelId(ctx context.Context, modelID common.Hash, duration *big.Int, directPayment bool, isFailoverEnabled bool, omitProvider common.Address) (common.Hash, error) { @@ -824,7 +914,7 @@ func (s *BlockchainService) OpenSessionByModelId(ctx context.Context, modelID co s.log.Infof("trying to open session with provider #%d %s", i, bid.Bid.Provider.String()) durationCopy := new(big.Int).Set(duration) - hash, err := s.tryOpenSession(ctx, bid, durationCopy, supply, budget, userAddr, directPayment, isFailoverEnabled) + hash, err := s.tryOpenSession(ctx, &bid.Bid, durationCopy, supply, budget, userAddr, directPayment, isFailoverEnabled) if err != nil { s.log.Errorf("failed to open session with provider %s: %s", bid.Bid.Provider.String(), err.Error()) continue @@ -881,12 +971,12 @@ func (s *BlockchainService) GetAllBidsWithRating(ctx context.Context, modelAgent return ids, bids, providerModelStats, providers, nil } -func (s *BlockchainService) tryOpenSession(ctx context.Context, bid structs.ScoredBid, duration, supply, budget *big.Int, userAddr common.Address, directPayment bool, failoverEnabled bool) (common.Hash, error) { - provider, err := s.providerRegistry.GetProviderById(ctx, bid.Bid.Provider) +func (s *BlockchainService) tryOpenSession(ctx context.Context, bid *structs.Bid, duration, supply, budget *big.Int, userAddr common.Address, directPayment bool, failoverEnabled bool) (common.Hash, error) { + provider, err := s.providerRegistry.GetProviderById(ctx, bid.Provider) if err != nil { return common.Hash{}, lib.WrapError(ErrProvider, err) } - sessionCost := (&big.Int{}).Mul(&bid.Bid.PricePerSecond.Int, duration) + sessionCost := (&big.Int{}).Mul(&bid.PricePerSecond.Int, duration) var amountTransferred = new(big.Int) if directPayment { @@ -899,15 +989,15 @@ func (s *BlockchainService) tryOpenSession(ctx context.Context, bid structs.Scor } s.log.Infof("attempting to initiate session %s", map[string]string{ - "provider": bid.Bid.Provider.String(), + "provider": bid.Provider.String(), "directPayment": strconv.FormatBool(directPayment), "duration": duration.String(), - "bid": bid.Bid.Id.String(), + "bid": bid.Id.String(), "endpoint": provider.Endpoint, "amountTransferred": amountTransferred.String(), }) - initRes, err := s.proxyService.InitiateSession(ctx, userAddr, bid.Bid.Provider, amountTransferred, bid.Bid.Id, provider.Endpoint) + initRes, err := s.proxyService.InitiateSession(ctx, userAddr, bid.Provider, amountTransferred, bid.Id, provider.Endpoint) if err != nil { return common.Hash{}, lib.WrapError(ErrInitSession, err) } @@ -946,6 +1036,69 @@ func (s *BlockchainService) GetMyAddress(ctx context.Context) (common.Address, e return lib.PrivKeyBytesToAddr(prKey) } +func (s *BlockchainService) CheckConnectivity(ctx context.Context, url string, addr common.Address) (time.Duration, error) { + return s.proxyService.Ping(ctx, url, addr) +} + +func (s *BlockchainService) CheckPortOpen(ctx context.Context, urlStr string) (bool, error) { + host, port, err := net.SplitHostPort(urlStr) + if err != nil { + return false, err + } + portInt, err := strconv.ParseInt(port, 10, 0) + if err != nil { + return false, err + } + + body, _ := json.Marshal(struct { + Host string `json:"host"` + Ports []int64 `json:"ports"` + }{ + Host: host, + Ports: []int64{portInt}, + }) + + req, err := http.NewRequestWithContext(ctx, "POST", "https://portchecker.io/api/query", bytes.NewBuffer(body)) + if err != nil { + return false, err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return false, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return false, fmt.Errorf("unexpected status code: %d", res.StatusCode) + } + + var response struct { + Error bool + Msg string + Check []struct { + Port int + Status bool + } + Host string + } + + err = json.NewDecoder(res.Body).Decode(&response) + if err != nil { + return false, err + } + + if response.Error { + return false, fmt.Errorf("portchecker.io error: %s", response.Msg) + } + + if len(response.Check) != 1 { + return false, fmt.Errorf("unexpected response from portchecker.io") + } + + return response.Check[0].Status, nil +} + func (s *BlockchainService) getTransactOpts(ctx context.Context, privKey lib.HexString) (*bind.TransactOpts, error) { privateKey, err := crypto.ToECDSA(privKey) if err != nil { diff --git a/proxy-router/internal/blockchainapi/session_expiry_handler.go b/proxy-router/internal/blockchainapi/session_expiry_handler.go index a94872da..c5d4906b 100644 --- a/proxy-router/internal/blockchainapi/session_expiry_handler.go +++ b/proxy-router/internal/blockchainapi/session_expiry_handler.go @@ -21,7 +21,7 @@ func NewSessionExpiryHandler(blockchainService *BlockchainService, sessionStorag blockchainService: blockchainService, sessionStorage: sessionStorage, wallet: wallet, - log: log, + log: log.Named("SESSION_CLOSER"), } } diff --git a/proxy-router/internal/config/models_config.go b/proxy-router/internal/config/models_config.go index 4678804b..420c1980 100644 --- a/proxy-router/internal/config/models_config.go +++ b/proxy-router/internal/config/models_config.go @@ -1,24 +1,48 @@ package config import ( + "context" "encoding/json" + "errors" "fmt" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/lib" + "github.com/ethereum/go-ethereum/common" ) +const ( + ConfigPathDefault = "models-config.json" +) + +var ( + ErrModelNotFound = errors.New("model not found in blockchain, local-only") + ErrValidate = errors.New("cannot perform validation") + ErrConnect = errors.New("cannot connect to the model") +) + +type BlockchainChecker interface { + ModelExists(ctx context.Context, ID common.Hash) (bool, error) +} + +type ConnectionChecker interface { + TryConnect(ctx context.Context, url string) error +} + type ModelConfigLoader struct { - log *lib.Logger - modelConfigsMap ModelConfigs - path string + log lib.ILogger + modelConfigs ModelConfigs + validator Validator + blockchainChecker BlockchainChecker + connectionChecker ConnectionChecker + configPath string } type ModelConfig struct { - ModelName string `json:"modelName"` - ApiType string `json:"apiType"` - ApiURL string `json:"apiUrl"` + ModelName string `json:"modelName" validate:"required"` + ApiType string `json:"apiType" validate:"required"` + ApiURL string `json:"apiUrl" validate:"required,url"` ApiKey string `json:"apiKey"` - ConcurrentSlots int `json:"concurrentSlots"` + ConcurrentSlots int `json:"concurrentSlots" validate:"number"` CapacityPolicy string `json:"capacityPolicy"` } @@ -30,26 +54,29 @@ type ModelConfigsV2 struct { } `json:"models"` } -func NewModelConfigLoader(path string, log *lib.Logger) *ModelConfigLoader { +func NewModelConfigLoader(configPath string, validator Validator, blockchainChecker BlockchainChecker, connectionChecker ConnectionChecker, log lib.ILogger) *ModelConfigLoader { return &ModelConfigLoader{ - log: log, - modelConfigsMap: ModelConfigs{}, - path: path, + log: log.Named("MODEL_LOADER"), + modelConfigs: ModelConfigs{}, + validator: validator, + blockchainChecker: blockchainChecker, + connectionChecker: connectionChecker, + configPath: configPath, } } func (e *ModelConfigLoader) Init() error { - filePath := "models-config.json" - if e.path != "" { - filePath = e.path + filePath := ConfigPathDefault + if e.configPath != "" { + filePath = e.configPath } modelsConfig, err := lib.ReadJSONFile(filePath) if err != nil { e.log.Errorf("failed to read models config file: %s", err) - e.log.Warn("trying to load models config from persistent storage") // TODO: load models config from persistent storage + // e.log.Warn("trying to load models config from persistent storage") return err } @@ -68,8 +95,8 @@ func (e *ModelConfigLoader) Init() error { return fmt.Errorf("invalid models config V2 format: %s", err) } for _, v := range modelConfigsV2.Models { - e.modelConfigsMap[v.ID] = v.ModelConfig - e.log.Infof("local model: %s", v.ModelName) + e.modelConfigs[v.ID] = v.ModelConfig + _ = e.Validate(context.Background(), common.HexToHash(v.ID), v.ModelConfig) } return nil } @@ -80,11 +107,15 @@ func (e *ModelConfigLoader) Init() error { var modelConfigs ModelConfigs err = json.Unmarshal([]byte(modelsConfig), &modelConfigs) if err != nil { - e.log.Errorf("failed to unmarshal models config: %s", err) - return err + return fmt.Errorf("invalid models config: %w", err) } - e.modelConfigsMap = modelConfigs + err = e.validator.Struct(modelConfigs) + if err != nil { + return fmt.Errorf("invalid models config: %w", err) + } + + e.modelConfigs = modelConfigs return nil } @@ -93,7 +124,7 @@ func (e *ModelConfigLoader) ModelConfigFromID(ID string) *ModelConfig { return &ModelConfig{} } - modelConfig := e.modelConfigsMap[ID] + modelConfig := e.modelConfigs[ID] if modelConfig == (ModelConfig{}) { e.log.Warnf("model config not found for ID: %s", ID) return &ModelConfig{} @@ -102,13 +133,45 @@ func (e *ModelConfigLoader) ModelConfigFromID(ID string) *ModelConfig { return &modelConfig } -func (e *ModelConfigLoader) GetAll() ([]string, []ModelConfig) { +func (e *ModelConfigLoader) GetAll() ([]common.Hash, []ModelConfig) { var modelConfigs []ModelConfig - var modelIDs []string - for ID, v := range e.modelConfigsMap { + var modelIDs []common.Hash + for ID, v := range e.modelConfigs { modelConfigs = append(modelConfigs, v) - modelIDs = append(modelIDs, ID) + modelIDs = append(modelIDs, common.HexToHash(ID)) } return modelIDs, modelConfigs } + +func (e *ModelConfigLoader) Validate(ctx context.Context, modelID common.Hash, cfg ModelConfig) error { + // check if model exists + exists, err := e.blockchainChecker.ModelExists(ctx, modelID) + if err != nil { + err = lib.WrapError(ErrValidate, err) + } else if !exists { + err = ErrModelNotFound + } + + if err != nil { + e.log.Warnf(e.formatLogPrefix(modelID, cfg)+"%s", err) + } + + // try to connect to the model + err = e.connectionChecker.TryConnect(ctx, cfg.ApiURL) + if err != nil { + err = lib.WrapError(ErrConnect, err) + e.log.Warnf(e.formatLogPrefix(modelID, cfg)+"%s", err) + } + + if exists && err == nil { + e.log.Infof(e.formatLogPrefix(modelID, cfg) + "loaded and validated") + } + + return nil +} + +func (e *ModelConfigLoader) formatLogPrefix(modelID common.Hash, config ModelConfig) string { + return fmt.Sprintf("modelID %s, name %s: ", + lib.Short(modelID), config.ModelName) +} diff --git a/proxy-router/internal/config/rating_config.go b/proxy-router/internal/config/rating_config.go index 3648a456..65aa512a 100644 --- a/proxy-router/internal/config/rating_config.go +++ b/proxy-router/internal/config/rating_config.go @@ -1,17 +1,33 @@ package config import ( - "fmt" - "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/lib" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/rating" ) const ( DefaultRatingConfigPath = "rating-config.json" + RatingConfigDefault = ` + { + "$schema": "./internal/rating/rating-config-schema.json", + "algorithm": "default", + "providerAllowlist": [], + "params": { + "weights": { + "tps": 0.24, + "ttft": 0.08, + "duration": 0.24, + "success": 0.32, + "stake": 0.12 + } + } + } + ` ) func LoadRating(path string, log lib.ILogger) (*rating.Rating, error) { + log = log.Named("RATING_LOADER") + filePath := DefaultRatingConfigPath if path != "" { filePath = path @@ -19,10 +35,11 @@ func LoadRating(path string, log lib.ILogger) (*rating.Rating, error) { config, err := lib.ReadJSONFile(filePath) if err != nil { - return nil, fmt.Errorf("failed to rating config file: %s", err) + log.Warnf("failed to load rating config file, using defaults") + config = RatingConfigDefault + } else { + log.Infof("rating config loaded from file: %s", filePath) } - log.Infof("rating config loaded from file: %s", filePath) - return rating.NewRatingFromConfig([]byte(config), log) } diff --git a/proxy-router/internal/handlers/tcphandlers/tcp.go b/proxy-router/internal/handlers/tcphandlers/tcp.go index 855d2f6e..37cfeec4 100644 --- a/proxy-router/internal/handlers/tcphandlers/tcp.go +++ b/proxy-router/internal/handlers/tcphandlers/tcp.go @@ -22,13 +22,13 @@ func NewTCPHandler( sourceLog := connLog.Named("SRC").With("SrcAddr", addr) defer func() { - sourceLog.Info("Closing connection") + sourceLog.Debugf("closing connection") conn.Close() }() msg, err := getMessage(conn) if err != nil { - sourceLog.Error("Error reading message", err) + sourceLog.Error("error reading message", err) return } diff --git a/proxy-router/internal/lib/addr.go b/proxy-router/internal/lib/addr.go index 5d49f3df..4535decc 100644 --- a/proxy-router/internal/lib/addr.go +++ b/proxy-router/internal/lib/addr.go @@ -21,6 +21,15 @@ func AddrShort(addr string) string { return StrShortConf(addr, 5, 3) } +type Hexable interface { + Hex() string +} + +// Short returns a short representation of a Hexable in "0x123..567" format +func Short(s Hexable) string { + return StrShortConf(s.Hex(), 5, 3) +} + func RemoveHexPrefix(s string) string { if len(s) >= 2 && s[0:2] == "0x" { return s[2:] diff --git a/proxy-router/internal/lib/crypto.go b/proxy-router/internal/lib/crypto.go index 4361a41a..be1f6caf 100644 --- a/proxy-router/internal/lib/crypto.go +++ b/proxy-router/internal/lib/crypto.go @@ -151,3 +151,24 @@ func VerifySignature(params []byte, signature []byte, publicKeyBytes []byte) boo signatureNoRecoverID := signature[:len(signature)-1] // remove recovery ID return crypto.VerifySignature(publicKeyBytes, hash.Bytes(), signatureNoRecoverID) } + +func VerifySignatureAddr(params []byte, signature []byte, addr common.Address) bool { + hash := crypto.Keccak256Hash(params) + if len(signature) == 0 { + return false + } + // signature = signature[:len(signature)-1] // remove recovery ID + + recoveredPubKey, err := crypto.Ecrecover(hash.Bytes(), signature) + if err != nil { + return false + } + + pubKey, err := crypto.UnmarshalPubkey(recoveredPubKey) + if err != nil { + return false + } + + recoveredAddress := crypto.PubkeyToAddress(*pubKey) + return recoveredAddress == addr +} diff --git a/proxy-router/internal/lib/ethclient.go b/proxy-router/internal/lib/ethclient.go index 7fa5f2f8..7c4f4abb 100644 --- a/proxy-router/internal/lib/ethclient.go +++ b/proxy-router/internal/lib/ethclient.go @@ -2,6 +2,7 @@ package lib import ( "fmt" + "reflect" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/contracts/bindings/lumerintoken" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/contracts/bindings/marketplace" @@ -36,6 +37,23 @@ func (e EVMError) Error() string { return fmt.Sprintf("EVM error: %s %+v", e.Abi.Sig, e.Args) } +// Implement As() method to check if EVMError can be converted to another type. +func (e EVMError) As(target interface{}) bool { + // Ensure that the target is a pointer. + if reflect.TypeOf(target).Kind() != reflect.Ptr { + // As target should be a pointer + return false + } + + switch v := target.(type) { + case *EVMError: + *v = e // Assign the concrete EVMError to the target + return true + default: + return false + } +} + // TryConvertGethError attempts to convert geth error to an EVMError, otherwise just returns original error func TryConvertGethError(err error) error { evmErr, ok := ConvertGethError(err, allContractsMeta) diff --git a/proxy-router/internal/lib/number.go b/proxy-router/internal/lib/number.go index 44d7fae6..ced5475d 100644 --- a/proxy-router/internal/lib/number.go +++ b/proxy-router/internal/lib/number.go @@ -32,3 +32,7 @@ func NewRat(numerator, denominator *big.Int) *big.Rat { bFloat := new(big.Rat).SetInt(denominator) return new(big.Rat).Quo(aFloat, bFloat) } + +func Exp10(exp int) *big.Int { + return big.NewInt(0).Exp(big.NewInt(10), big.NewInt(int64(exp)), nil) +} diff --git a/proxy-router/internal/proxyapi/controller_http.go b/proxy-router/internal/proxyapi/controller_http.go index 38eb34cf..3516008e 100644 --- a/proxy-router/internal/proxyapi/controller_http.go +++ b/proxy-router/internal/proxyapi/controller_http.go @@ -45,6 +45,7 @@ func NewProxyController(service *ProxyServiceSender, aiEngine AIEngine, chatStor } func (s *ProxyController) RegisterRoutes(r interfaces.Router) { + r.POST("/proxy/provider/ping", s.Ping) r.POST("/proxy/sessions/initiate", s.InitiateSession) r.POST("/v1/chat/completions", s.Prompt) r.GET("/v1/models", s.Models) @@ -54,6 +55,31 @@ func (s *ProxyController) RegisterRoutes(r interfaces.Router) { r.POST("/v1/chats/:id", s.UpdateChatTitle) } +// Ping godoc +// +// @Summary Ping Provider +// @Description sends a ping to the provider on the RPC level +// @Tags chat +// @Produce json +// @Param pingReq body proxyapi.PingReq true "Ping Request" +// @Success 200 {object} proxyapi.PingRes +// @Router /proxy/provider/ping [post] +func (s *ProxyController) Ping(ctx *gin.Context) { + var req *PingReq + if err := ctx.ShouldBindJSON(&req); err != nil { + ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + ping, err := s.service.Ping(ctx, req.ProviderURL, req.ProviderAddr) + if err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + ctx.JSON(http.StatusOK, &PingRes{PingMs: ping.Milliseconds()}) +} + // InitiateSession godoc // // @Summary Initiate Session with Provider diff --git a/proxy-router/internal/proxyapi/controller_morrpc.go b/proxy-router/internal/proxyapi/controller_morrpc.go index 14af93a3..b695fc05 100644 --- a/proxy-router/internal/proxyapi/controller_morrpc.go +++ b/proxy-router/internal/proxyapi/controller_morrpc.go @@ -20,6 +20,7 @@ type MORRPCController struct { validator *validator.Validate sessionStorage *storages.SessionStorage morRpc *m.MORRPCMessage + prKey lib.HexString } type SendResponse func(*msg.RpcResponse) error @@ -28,13 +29,14 @@ var ( ErrUnknownMethod = fmt.Errorf("unknown method") ) -func NewMORRPCController(service *ProxyReceiver, validator *validator.Validate, sessionRepo *sessionrepo.SessionRepositoryCached, sessionStorage *storages.SessionStorage) *MORRPCController { +func NewMORRPCController(service *ProxyReceiver, validator *validator.Validate, sessionRepo *sessionrepo.SessionRepositoryCached, sessionStorage *storages.SessionStorage, prKey lib.HexString) *MORRPCController { c := &MORRPCController{ service: service, validator: validator, sessionStorage: sessionStorage, sessionRepo: sessionRepo, morRpc: m.NewMorRpc(), + prKey: prKey, } return c @@ -42,6 +44,8 @@ func NewMORRPCController(service *ProxyReceiver, validator *validator.Validate, func (s *MORRPCController) Handle(ctx context.Context, msg m.RPCMessage, sourceLog lib.ILogger, sendResponse SendResponse) error { switch msg.Method { + case "network.ping": + return s.networkPing(ctx, msg, sendResponse, sourceLog) case "session.request": return s.sessionRequest(ctx, msg, sendResponse, sourceLog) case "session.prompt": @@ -59,6 +63,26 @@ var ( ErrGenerateReport = fmt.Errorf("failed to generate report") ) +func (s *MORRPCController) networkPing(_ context.Context, msg m.RPCMessage, sendResponse SendResponse, sourceLog lib.ILogger) error { + var req m.PingReq + err := json.Unmarshal(msg.Params, &req) + if err != nil { + return lib.WrapError(ErrUnmarshal, err) + } + + if err := s.validator.Struct(req); err != nil { + return lib.WrapError(ErrValidation, err) + } + + res, err := s.morRpc.PongResponce(msg.ID, s.prKey, req.Nonce) + if err != nil { + sourceLog.Error(err) + return err + } + + return sendResponse(res) +} + func (s *MORRPCController) sessionRequest(ctx context.Context, msg m.RPCMessage, sendResponse SendResponse, sourceLog lib.ILogger) error { var req m.SessionReq err := json.Unmarshal(msg.Params, &req) diff --git a/proxy-router/internal/proxyapi/morrpcmessage/mor_rpc.go b/proxy-router/internal/proxyapi/morrpcmessage/mor_rpc.go index d9733119..3bbf3a5b 100644 --- a/proxy-router/internal/proxyapi/morrpcmessage/mor_rpc.go +++ b/proxy-router/internal/proxyapi/morrpcmessage/mor_rpc.go @@ -18,6 +18,30 @@ func NewMorRpc() *MORRPCMessage { // RESPONSES +func (m *MORRPCMessage) PongResponce(requestId string, providerPrKey lib.HexString, nonce lib.HexString) (*RpcResponse, error) { + params := PongRes{ + Nonce: nonce, + } + signature, err := m.generateSignature(params, providerPrKey) + if err != nil { + return &RpcResponse{}, err + } + + params.Signature = signature + + paramsBytes, err := json.Marshal(params) + if err != nil { + return &RpcResponse{}, err + } + + paramsJSON := json.RawMessage(paramsBytes) + + return &RpcResponse{ + ID: requestId, + Result: ¶msJSON, + }, nil +} + func (m *MORRPCMessage) InitiateSessionResponse(providerPubKey lib.HexString, userAddr common.Address, bidID common.Hash, providerPrivateKeyHex lib.HexString, requestID string, chainID *big.Int) (*RpcResponse, error) { timestamp := m.generateTimestamp() @@ -195,6 +219,28 @@ func (m *MORRPCMessage) SpendLimitError(privateKeyHex lib.HexString, requestId s // REQUESTS +func (m *MORRPCMessage) PingRequest(requestId string, userPrivateKeyHex lib.HexString, nonce lib.HexString) (*RPCMessage, error) { + params := PingReq{ + Nonce: nonce, + } + signature, err := m.generateSignature(params, userPrivateKeyHex) + if err != nil { + return &RPCMessage{}, err + } + params.Signature = signature + + serializedParams, err := json.Marshal(params) + if err != nil { + return &RPCMessage{}, err + } + + return &RPCMessage{ + ID: requestId, + Method: "network.ping", + Params: serializedParams, + }, nil +} + func (m *MORRPCMessage) InitiateSessionRequest(user common.Address, provider common.Address, spend *big.Int, bidID common.Hash, userPrivateKeyHex lib.HexString, requestId string) (*RPCMessage, error) { method := "session.request" pbKey, err := lib.PubKeyFromPrivate(userPrivateKeyHex) @@ -290,6 +336,16 @@ func (m *MORRPCMessage) VerifySignature(params any, signature lib.HexString, pub return lib.VerifySignature(paramsBytes, signature, publicKey) } +func (m *MORRPCMessage) VerifySignatureAddr(params any, signature lib.HexString, addr common.Address, sourceLog lib.ILogger) bool { + paramsBytes, err := json.Marshal(params) + if err != nil { + sourceLog.Error("Error marshalling params", err) + return false + } + + return lib.VerifySignatureAddr(paramsBytes, signature, addr) +} + func (m *MORRPCMessage) generateTimestamp() uint64 { now := time.Now() return uint64(now.UnixMilli()) @@ -310,5 +366,6 @@ func (m *MORRPCMessage) generateSignature(params any, privateKeyHex lib.HexStrin if err != nil { return nil, err } + return signature, nil } diff --git a/proxy-router/internal/proxyapi/morrpcmessage/request.go b/proxy-router/internal/proxyapi/morrpcmessage/request.go index 46e6f289..95f01619 100644 --- a/proxy-router/internal/proxyapi/morrpcmessage/request.go +++ b/proxy-router/internal/proxyapi/morrpcmessage/request.go @@ -96,3 +96,13 @@ type ReqObject struct { Res uint `json:"res"` Toks uint `json:"toks"` } + +type PingReq struct { + Nonce lib.HexString `json:"nonce" validate:"required,hexadecimal"` + Signature lib.HexString `json:"signature" validate:"required,hexadecimal"` +} + +type PongRes struct { + Nonce lib.HexString `json:"nonce" validate:"required,hexadecimal"` + Signature lib.HexString `json:"signature" validate:"required,hexadecimal"` +} diff --git a/proxy-router/internal/proxyapi/proxy_sender.go b/proxy-router/internal/proxyapi/proxy_sender.go index 0bc97de1..b6040c7e 100644 --- a/proxy-router/internal/proxyapi/proxy_sender.go +++ b/proxy-router/internal/proxyapi/proxy_sender.go @@ -4,13 +4,13 @@ import ( "bufio" "bytes" "context" + "crypto/rand" "encoding/json" "fmt" "io" "math/big" "net" "net/http" - "net/url" "strconv" "time" @@ -44,9 +44,12 @@ var ( ErrWriteProvider = fmt.Errorf("failed to write to provider") ) +const ( + TimeoutPingDefault = 5 * time.Second +) + type ProxyServiceSender struct { chainID *big.Int - publicUrl *url.URL privateKey interfaces.PrKeyProvider logStorage *lib.Collection[*interfaces.LogStorage] sessionStorage *storages.SessionStorage @@ -56,10 +59,9 @@ type ProxyServiceSender struct { log lib.ILogger } -func NewProxySender(chainID *big.Int, publicUrl *url.URL, privateKey interfaces.PrKeyProvider, logStorage *lib.Collection[*interfaces.LogStorage], sessionStorage *storages.SessionStorage, sessionRepo *sessionrepo.SessionRepositoryCached, log lib.ILogger) *ProxyServiceSender { +func NewProxySender(chainID *big.Int, privateKey interfaces.PrKeyProvider, logStorage *lib.Collection[*interfaces.LogStorage], sessionStorage *storages.SessionStorage, sessionRepo *sessionrepo.SessionRepositoryCached, log lib.ILogger) *ProxyServiceSender { return &ProxyServiceSender{ chainID: chainID, - publicUrl: publicUrl, privateKey: privateKey, logStorage: logStorage, sessionStorage: sessionStorage, @@ -73,6 +75,58 @@ func (p *ProxyServiceSender) SetSessionService(service SessionService) { p.sessionService = service } +func (p *ProxyServiceSender) Ping(ctx context.Context, providerURL string, providerAddr common.Address) (time.Duration, error) { + prKey, err := p.privateKey.GetPrivateKey() + if err != nil { + return 0, ErrMissingPrKey + } + + // check if context has timeout set + if _, ok := ctx.Deadline(); !ok { + subCtx, cancel := context.WithTimeout(ctx, TimeoutPingDefault) + defer cancel() + ctx = subCtx + } + + nonce := make([]byte, 8) + _, err = rand.Read(nonce) + if err != nil { + return 0, lib.WrapError(ErrCreateReq, err) + } + + msg, err := p.morRPC.PingRequest("0", prKey, nonce) + if err != nil { + return 0, lib.WrapError(ErrCreateReq, err) + } + + reqStartTime := time.Now() + res, code, err := p.rpcRequest(providerURL, msg) + if err != nil { + return 0, lib.WrapError(ErrProvider, fmt.Errorf("code: %d, msg: %v, error: %s", code, res, err)) + } + pingDuration := time.Since(reqStartTime) + + var typedMsg *msgs.PongRes + err = json.Unmarshal(*res.Result, &typedMsg) + if err != nil { + return pingDuration, lib.WrapError(ErrInvalidResponse, fmt.Errorf("expected PongRes, got %s", res.Result)) + } + + err = binding.Validator.ValidateStruct(typedMsg) + if err != nil { + return pingDuration, lib.WrapError(ErrInvalidResponse, err) + } + + signature := typedMsg.Signature + typedMsg.Signature = lib.HexString{} + + if !p.morRPC.VerifySignatureAddr(typedMsg, signature, providerAddr, p.log) { + return pingDuration, ErrInvalidSig + } + + return pingDuration, nil +} + func (p *ProxyServiceSender) InitiateSession(ctx context.Context, user common.Address, provider common.Address, spend *big.Int, bidID common.Hash, providerURL string) (*msgs.SessionRes, error) { requestID := "1" @@ -243,6 +297,9 @@ func (p *ProxyServiceSender) GetSessionReportFromUser(ctx context.Context, sessi } func (p *ProxyServiceSender) rpcRequest(url string, rpcMessage *msgs.RPCMessage) (*msgs.RpcResponse, int, error) { + // TODO: enable request-response matching by using requestID + // TODO: add context cancellation + TIMEOUT_TO_ESTABLISH_CONNECTION := time.Second * 3 dialer := net.Dialer{Timeout: TIMEOUT_TO_ESTABLISH_CONNECTION} @@ -293,6 +350,10 @@ func (p *ProxyServiceSender) GetModelIdSession(ctx context.Context, sessionID co return session.ModelID(), nil } +func (p *ProxyServiceSender) validateMsgSignatureAddr(result any, signature lib.HexString, providerAddr common.Address) bool { + return p.morRPC.VerifySignatureAddr(result, signature, providerAddr, p.log) +} + func (p *ProxyServiceSender) SendPromptV2(ctx context.Context, sessionID common.Hash, prompt *openai.ChatCompletionRequest, cb gcs.CompletionCallback) (interface{}, error) { session, err := p.sessionRepo.GetSession(ctx, sessionID) if err != nil { diff --git a/proxy-router/internal/proxyapi/requests.go b/proxy-router/internal/proxyapi/requests.go index bf8fc47f..60fcb0af 100644 --- a/proxy-router/internal/proxyapi/requests.go +++ b/proxy-router/internal/proxyapi/requests.go @@ -7,6 +7,15 @@ import ( "github.com/ethereum/go-ethereum/common" ) +type PingReq struct { + ProviderAddr common.Address `json:"providerAddr" validate:"required,eth_addr"` + ProviderURL string `json:"providerUrl" validate:"required,hostname_port"` +} + +type PingRes struct { + PingMs int64 `json:"ping,omitempty"` +} + type InitiateSessionReq struct { User common.Address `json:"user" validate:"required,eth_addr"` Provider common.Address `json:"provider" validate:"required,eth_addr"` @@ -17,14 +26,14 @@ type InitiateSessionReq struct { type PromptReq struct { Signature string `json:"signature" validate:"required,hexadecimal"` - Message json.RawMessage `json:"message" validate:"required"` + Message json.RawMessage `json:"message" validate:"required"` Timestamp string `json:"timestamp" validate:"required,timestamp"` } type PromptHead struct { SessionID lib.Hash `header:"session_id" validate:"hex32"` - ModelID lib.Hash `header:"model_id" validate:"hex32"` - ChatID lib.Hash `header:"chat_id" validate:"hex32"` + ModelID lib.Hash `header:"model_id" validate:"hex32"` + ChatID lib.Hash `header:"chat_id" validate:"hex32"` } type InferenceRes struct { diff --git a/proxy-router/internal/proxyctl/proxyctl.go b/proxy-router/internal/proxyctl/proxyctl.go index 1fa6e738..93cc6743 100644 --- a/proxy-router/internal/proxyctl/proxyctl.go +++ b/proxy-router/internal/proxyctl/proxyctl.go @@ -2,18 +2,23 @@ package proxyctl import ( "context" + "errors" + "fmt" "math/big" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/aiengine" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/blockchainapi" + "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/blockchainapi/structs" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/config" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/handlers/tcphandlers" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/interfaces" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/lib" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/proxyapi" + "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/registries" sessionrepo "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/session" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/repositories/transport" "github.com/MorpheusAIs/Morpheus-Lumerin-Node/proxy-router/internal/storages" + "github.com/ethereum/go-ethereum/common" "github.com/go-playground/validator/v10" "golang.org/x/sync/errgroup" ) @@ -27,6 +32,16 @@ const ( StateStopping = 3 ) +const ( + MORDecimals = 18 + ETHDecimals = 18 +) + +var ( + MORBalanceThreshold = *lib.Exp10(MORDecimals) // 1 MOR the balance to show a warning + ETHBalanceThreshold = *lib.Exp10(ETHDecimals - 1) // 0.1 ETH the balance to show a warning +) + func (s ProxyState) String() string { switch s { case StateStopped: @@ -60,8 +75,9 @@ type Proxy struct { blockchainService *blockchainapi.BlockchainService sessionExpiryHandler *blockchainapi.SessionExpiryHandler - state lib.AtomicValue[ProxyState] - tsk *lib.Task + state lib.AtomicValue[ProxyState] + tsk *lib.Task + serverStarted <-chan struct{} } // NewProxyCtl creates a new Proxy controller instance @@ -81,6 +97,7 @@ func NewProxyCtl(eventListerer *blockchainapi.EventsListener, wallet interfaces. blockchainService: blockchainService, sessionRepo: sessionRepo, sessionExpiryHandler: sessionExpiryHandler, + serverStarted: make(chan struct{}), } } @@ -118,7 +135,13 @@ func (p *Proxy) Run(ctx context.Context) error { case <-tsk.Done(): err := tsk.Err() if err != nil { - p.log.Errorf("proxy stopped with error: %s", err) + var logFunc func(string, ...interface{}) + if errors.Is(err, context.Canceled) { + logFunc = p.log.Warnf + } else { + logFunc = p.log.Errorf + } + logFunc("proxy stopped with error: %s", err) return err } } @@ -138,14 +161,37 @@ func (p *Proxy) run(ctx context.Context, prKey lib.HexString) error { } p.log.Infof("Wallet address: %s", walletAddr.String()) + ethBalance, morBalance, err := p.blockchainService.GetBalance(ctx) + if err != nil { + return err + } + + if ethBalance.Cmp(ÐBalanceThreshold) < 0 { + p.log.Warnf( + "ETH balance is too low: %s (< %s)", + formatETH(ethBalance), + formatETH(ÐBalanceThreshold), + ) + } else { + p.log.Infof("ETH balance: %s", formatETH(ethBalance)) + } + if morBalance.Cmp(&MORBalanceThreshold) < 0 { + p.log.Warnf( + "MOR balance is too low: %s (< %s)", + formatMOR(morBalance), + formatMOR(&MORBalanceThreshold), + ) + } else { + p.log.Infof("MOR balance: %s", formatMOR(morBalance)) + } + pubKey, err := lib.PubKeyFromPrivate(prKey) if err != nil { return err } proxyReceiver := proxyapi.NewProxyReceiver(prKey, pubKey, p.sessionStorage, p.aiEngine, p.chainID, p.modelConfigLoader, p.blockchainService, p.sessionRepo) - - morTcpHandler := proxyapi.NewMORRPCController(proxyReceiver, p.validator, p.sessionRepo, p.sessionStorage) + morTcpHandler := proxyapi.NewMORRPCController(proxyReceiver, p.validator, p.sessionRepo, p.sessionStorage, prKey) tcpHandler := tcphandlers.NewTCPHandler( p.log, p.connLog, p.schedulerLogFactory, morTcpHandler, ) @@ -156,6 +202,11 @@ func (p *Proxy) run(ctx context.Context, prKey lib.HexString) error { return tcpServer.Run(errCtx) }) + g.Go(func() error { + <-tcpServer.Started() + return p.afterStart(errCtx, walletAddr) + }) + g.Go(func() error { return p.eventListener.Run(errCtx) }) @@ -167,6 +218,69 @@ func (p *Proxy) run(ctx context.Context, prKey lib.HexString) error { return g.Wait() } +func (p *Proxy) afterStart(ctx context.Context, walletAddr common.Address) error { + log := p.log.Named("PROVIDER_CHECK") + + // check if provider exists + pr, err := p.blockchainService.GetProvider(ctx, walletAddr) + if err != nil { + return fmt.Errorf("cannot get provider %s: %w", walletAddr, err) + } else if pr == nil { + log.Warnf("provider is not registered under this wallet address: %s", walletAddr) + } else { + log.Infof("provider is registered in blockchain, url: %s", pr.Endpoint) + } + + if pr != nil { + // check provider connectivity from localhost + pingDuration, err := p.blockchainService.CheckConnectivity(ctx, pr.Endpoint, pr.Address) + if err != nil { + log.Warnf("provider %s is not reachable from localhost by address %s: %s", pr.Address, pr.Endpoint, err) + } else { + log.Infof("provider is reachable from localhost, ping: %s", pingDuration) + } + + // check connectivity from outer network + ok, err := p.blockchainService.CheckPortOpen(ctx, pr.Endpoint) + if err != nil { + log.Warnf("cannot check if port open for %s %s %s", pr.Address, pr.Endpoint, err) + } else if !ok { + log.Warnf("provider is not reachable from internet by %s", pr.Endpoint) + } else { + log.Infof("provider is reachable from internet") + } + + // check if provider has active bids + bids, err := p.blockchainService.GetActiveBidsByProvider(ctx, walletAddr, big.NewInt(0), 100, registries.OrderDESC) + if err != nil { + log.Warnf("cannot get active bids by provider: %s", walletAddr, err) + } else if len(bids) == 0 { + log.Warnf("provider has no bids available") + } else { + log.Infof("provider has %d bids available", len(bids)) + } + + bidsByModelID := make(map[common.Hash][]*structs.Bid) + for _, bid := range bids { + bidsByModelID[bid.ModelAgentId] = append(bidsByModelID[bid.ModelAgentId], bid) + } + + // check if provider has active bids for each model + modelIDs, _ := p.modelConfigLoader.GetAll() + + for _, modelID := range modelIDs { + count := len(bidsByModelID[modelID]) + if count == 0 { + log.Warnf("model %s, no active bids", lib.Short(modelID)) + } else { + log.Infof("model %s, active bids %d", lib.Short(modelID), count) + } + } + } + + return nil +} + func (p *Proxy) GetState() ProxyState { return p.state.Load() } @@ -175,3 +289,19 @@ func (p *Proxy) setState(s ProxyState) { p.state.Store(s) p.log.Infof("proxy state: %s", s) } + +func (p *Proxy) ServerStarted() <-chan struct{} { + return p.serverStarted +} + +func formatDecimal(n *big.Int, decimals int) string { + return lib.NewRat(n, lib.Exp10(decimals)).FloatString(3) +} + +func formatMOR(n *big.Int) string { + return formatDecimal(n, MORDecimals) + " MOR" +} + +func formatETH(n *big.Int) string { + return formatDecimal(n, ETHDecimals) + " ETH" +} diff --git a/proxy-router/internal/rating/rating_factory.go b/proxy-router/internal/rating/rating_factory.go index 0522eaf8..58f3081d 100644 --- a/proxy-router/internal/rating/rating_factory.go +++ b/proxy-router/internal/rating/rating_factory.go @@ -26,7 +26,7 @@ func NewRatingFromConfig(config json.RawMessage, log lib.ILogger) (*Rating, erro return nil, fmt.Errorf("failed to unmarshal rating config: %w", err) } - log.Infof("rating algorithm: %s, params %s", cfg.Algorithm, string(cfg.Params)) + log.Infof("rating algorithm: %s", cfg.Algorithm) scorer, err := factory(cfg.Algorithm, cfg.Params) if err != nil { @@ -58,13 +58,13 @@ func NewRating(scorer Scorer, providerAllowList []common.Address, log lib.ILogge } if len(allowList) == 0 { - log.Infof("providerAllowList is disabled, all providers are allowed") + log.Infof("provider filtering is disabled") } else { keys := maps.Keys(allowList) sort.Slice(keys, func(i, j int) bool { return keys[i].Hex() < keys[j].Hex() }) - log.Infof("providerAllowList: %v", keys) + log.Warnf("provider filtering is enabled, allowList: %v", keys) } return &Rating{ diff --git a/proxy-router/internal/repositories/ethclient/ethclient.go b/proxy-router/internal/repositories/ethclient/ethclient.go index 842e8689..cfd44d6f 100644 --- a/proxy-router/internal/repositories/ethclient/ethclient.go +++ b/proxy-router/internal/repositories/ethclient/ethclient.go @@ -16,12 +16,13 @@ import ( // Client defines typed wrappers for the Ethereum RPC API. type Client struct { - c RPCClient + c RPCClient + chainID *big.Int } // NewClient creates a client that uses the given RPC client. func NewClient(c RPCClient) *Client { - return &Client{c} + return &Client{c, nil} } // Close closes the underlying RPC connection. @@ -38,12 +39,16 @@ func (ec *Client) Client() RPCClient { // ChainID retrieves the current chain ID for transaction replay protection. func (ec *Client) ChainID(ctx context.Context) (*big.Int, error) { - var result hexutil.Big - err := ec.c.CallContext(ctx, &result, "eth_chainId") - if err != nil { - return nil, err + if ec.chainID == nil { + var result hexutil.Big + err := ec.c.CallContext(ctx, &result, "eth_chainId") + if err != nil { + return nil, err + } + ec.chainID = (*big.Int)(&result) } - return (*big.Int)(&result), err + + return new(big.Int).Set(ec.chainID), nil } // BlockNumber returns the most recent block number diff --git a/proxy-router/internal/repositories/registries/marketplace.go b/proxy-router/internal/repositories/registries/marketplace.go index bcdc224a..8bfdbdbc 100644 --- a/proxy-router/internal/repositories/registries/marketplace.go +++ b/proxy-router/internal/repositories/registries/marketplace.go @@ -2,6 +2,7 @@ package registries import ( "context" + "errors" "fmt" "math/big" @@ -26,6 +27,10 @@ type Marketplace struct { log lib.ILogger } +var ( + ErrBidPricePerSecondInvalid = errors.New("Invalid bid price per second") +) + func NewMarketplace(marketplaceAddr common.Address, client i.ContractBackend, multicall mc.MulticallBackend, log lib.ILogger) *Marketplace { mp, err := marketplace.NewMarketplace(marketplaceAddr, client) if err != nil { @@ -49,6 +54,20 @@ func NewMarketplace(marketplaceAddr common.Address, client i.ContractBackend, mu func (g *Marketplace) PostModelBid(opts *bind.TransactOpts, model common.Hash, pricePerSecond *big.Int) (common.Hash, error) { tx, err := g.marketplace.PostModelBid(opts, opts.From, model, pricePerSecond) if err != nil { + err = lib.TryConvertGethError(err) + + evmErr := lib.EVMError{} + if errors.As(err, &evmErr) { + if evmErr.Abi.Name == "MarketplaceBidPricePerSecondInvalid" { + min, max, err := g.GetMinMaxBidPricePerSecond(opts.Context) + if err != nil { + return common.Hash{}, lib.WrapError(ErrBidPricePerSecondInvalid, err) + } + + return common.Hash{}, lib.WrapError(ErrBidPricePerSecondInvalid, fmt.Errorf("must be between %s and %s, %w", min.String(), max.String(), evmErr)) + } + } + return common.Hash{}, lib.TryConvertGethError(err) } @@ -158,8 +177,16 @@ func (g *Marketplace) GetBidsByModelAgent(ctx context.Context, modelAgentId comm return g.GetMultipleBids(ctx, bidIDs) } -func (g *Marketplace) GetActiveBidsByProvider(ctx context.Context, provider common.Address, offset *big.Int, limit uint8, order Order) ([][32]byte, []marketplace.IBidStorageBid, error) { +func (g *Marketplace) GetActiveBidsByProviderCount(ctx context.Context, provider common.Address) (*big.Int, error) { _, len, err := g.marketplace.GetProviderActiveBids(&bind.CallOpts{Context: ctx}, provider, big.NewInt(0), big.NewInt(0)) + if err != nil { + return nil, err + } + return len, nil +} + +func (g *Marketplace) GetActiveBidsByProvider(ctx context.Context, provider common.Address, offset *big.Int, limit uint8, order Order) ([][32]byte, []marketplace.IBidStorageBid, error) { + len, err := g.GetActiveBidsByProviderCount(ctx, provider) if err != nil { return nil, nil, err } @@ -199,3 +226,19 @@ func (g *Marketplace) GetMultipleBids(ctx context.Context, IDs [][32]byte) ([][3 } return IDs, bids, nil } + +func (g *Marketplace) GetBidFee(ctx context.Context) (*big.Int, error) { + fee, err := g.marketplace.GetBidFee(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, lib.TryConvertGethError(err) + } + return fee, nil +} + +func (g *Marketplace) GetMinMaxBidPricePerSecond(ctx context.Context) (*big.Int, *big.Int, error) { + min, max, err := g.marketplace.GetMinMaxBidPricePerSecond(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, nil, lib.TryConvertGethError(err) + } + return min, max, nil +} diff --git a/proxy-router/internal/repositories/registries/session_router.go b/proxy-router/internal/repositories/registries/session_router.go index a637130b..366a79e1 100644 --- a/proxy-router/internal/repositories/registries/session_router.go +++ b/proxy-router/internal/repositories/registries/session_router.go @@ -112,6 +112,7 @@ func (g *SessionRouter) GetSessionsByUser(ctx context.Context, userAddr common.A if err != nil { return nil, nil, lib.TryConvertGethError(err) } + _offset, _limit := adjustPagination(order, length, offset, limit) ids, _, err := g.sessionRouter.GetUserSessions(&bind.CallOpts{Context: ctx}, userAddr, _offset, _limit) if err != nil { diff --git a/proxy-router/internal/repositories/transport/tcp_server.go b/proxy-router/internal/repositories/transport/tcp_server.go index ba92ea20..5a19d0f4 100644 --- a/proxy-router/internal/repositories/transport/tcp_server.go +++ b/proxy-router/internal/repositories/transport/tcp_server.go @@ -14,12 +14,14 @@ import ( type TCPServer struct { serverAddr string handler Handler + started chan struct{} log lib.ILogger } func NewTCPServer(serverAddr string, log lib.ILogger) *TCPServer { return &TCPServer{ serverAddr: serverAddr, + started: make(chan struct{}), log: log, } } @@ -35,10 +37,10 @@ func (p *TCPServer) Run(ctx context.Context) error { } listener, err := net.Listen("tcp", add.String()) - if err != nil { return fmt.Errorf("listener error %s %w", p.serverAddr, err) } + close(p.started) p.log.Infof("tcp server is listening: %s", p.serverAddr) @@ -64,6 +66,10 @@ func (p *TCPServer) Run(ctx context.Context) error { return err } +func (p *TCPServer) Started() <-chan struct{} { + return p.started +} + func (p *TCPServer) startAccepting(ctx context.Context, listener net.Listener) error { wg := sync.WaitGroup{} // waits for all handlers to finish to ensure proper cleanup defer wg.Wait() diff --git a/proxy-router/rating-config.json b/proxy-router/rating-config.json.example similarity index 100% rename from proxy-router/rating-config.json rename to proxy-router/rating-config.json.example diff --git a/ui-desktop/.env.example b/ui-desktop/.env.example index 42d956a3..8f68da8d 100644 --- a/ui-desktop/.env.example +++ b/ui-desktop/.env.example @@ -1,16 +1,32 @@ +# This file is used to set the environment variables for the cicd workflow +# Contract and Token current as of 11/5/2024 + +# MAINNET VALUES +CHAIN_ID=42161 +DIAMOND_ADDRESS=0xDE819AaEE474626E3f34Ef0263373357e5a6C71b +DISPLAY_NAME=Arbitrum +EXPLORER_URL=https://arbiscan.io/tx/{{hash}} +TOKEN_ADDRESS=0x092bAaDB7DEf4C3981454dD9c0A0D7FF07bCFc86 +DEV_TOOLS=false +SYMBOL_ETH=ETH +SYMBOL_COIN=MOR + +# TESTNET VALUES +#CHAIN_ID=421614 +#DIAMOND_ADDRESS=0xb8C55cD613af947E73E262F0d3C54b7211Af16CF +#DISPLAY_NAME=Sepolia Arbitrum +#EXPLORER_URL=https://sepolia.arbiscan.io/tx/{{hash}} +#TOKEN_ADDRESS=0x34a285a1b1c166420df5b6630132542923b5b27e +#DEV_TOOLS=true +#SYMBOL_ETH=ETH +#SYMBOL_COIN=MOR + +# COMMON BYPASS_AUTH=false -CHAIN_ID=421614 DEBUG=false DEFAULT_SELLER_CURRENCY=BTC -DEV_TOOLS=true -DIAMOND_ADDRESS=0xb8C55cD613af947E73E262F0d3C54b7211Af16CF -DISPLAY_NAME=Sepolia Arbitrum -EXPLORER_URL=https://sepolia.arbiscan.io/tx/{{hash}} IGNORE_DEBUG_LOGS=false PROXY_WEB_DEFAULT_PORT=8082 SENTRY_DSN= -SYMBOL_ETH=ETH -SYMBOL_COIN=MOR -TOKEN_ADDRESS=0x34a285a1b1c166420df5b6630132542923b5b27e TRACKING_ID= -FAILOVER_ENABLED= \ No newline at end of file +FAILOVER_ENABLED= diff --git a/ui-desktop/src/main/src/client/apiGateway.js b/ui-desktop/src/main/src/client/apiGateway.js index 2728519e..dda44599 100644 --- a/ui-desktop/src/main/src/client/apiGateway.js +++ b/ui-desktop/src/main/src/client/apiGateway.js @@ -216,6 +216,35 @@ const updateChatHistoryTitle = async ({ id, title}) => { } } + /** + * @param {string} address + * @param {string} endpoint + * @returns {Promise} +*/ +const checkProviderConnectivity = async ({ address, endpoint}) => { + try { + const path = `${config.chain.localProxyRouterUrl}/proxy/provider/ping`; + const response = await fetch(path, { + method: "POST", + body: JSON.stringify({ + providerAddr: address, + providerUrl: endpoint + }), + }); + + if(!response.ok) { + return false; + } + + const body = await response.json(); + return !!body.ping; + } + catch (e) { + console.log("checkProviderConnectivity: Error", e) + return false; + } + } + export default { getAllModels, getBalances, @@ -229,4 +258,5 @@ export default { getChatHistory, updateChatHistoryTitle, deleteChatHistory, + checkProviderConnectivity } \ No newline at end of file diff --git a/ui-desktop/src/main/src/client/subscriptions/no-core.js b/ui-desktop/src/main/src/client/subscriptions/no-core.js index cc03149d..f12bf403 100644 --- a/ui-desktop/src/main/src/client/subscriptions/no-core.js +++ b/ui-desktop/src/main/src/client/subscriptions/no-core.js @@ -36,7 +36,8 @@ const listeners = { "update-chat-history-title": handlers.updateChatHistoryTitle, // Failover "get-failover-setting": handlers.isFailoverEnabled, - "set-failover-setting": handlers.setFailoverSetting + "set-failover-setting": handlers.setFailoverSetting, + "check-provider-connectivity": handlers.checkProviderConnectivity } // Subscribe to messages where no core has to react diff --git a/ui-desktop/src/renderer/src/client/index.jsx b/ui-desktop/src/renderer/src/client/index.jsx index d966bbe2..deb438f6 100644 --- a/ui-desktop/src/renderer/src/client/index.jsx +++ b/ui-desktop/src/renderer/src/client/index.jsx @@ -145,6 +145,7 @@ const createClient = function (createStore) { // Failover getFailoverSetting: utils.forwardToMainProcess('get-failover-setting', 750000), setFailoverSetting: utils.forwardToMainProcess('set-failover-setting', 750000), + checkProviderConnectivity: utils.forwardToMainProcess('check-provider-connectivity', 750000) } const api = { diff --git a/ui-desktop/src/renderer/src/components/chat/Chat.tsx b/ui-desktop/src/renderer/src/components/chat/Chat.tsx index 17736767..b0f73aeb 100644 --- a/ui-desktop/src/renderer/src/components/chat/Chat.tsx +++ b/ui-desktop/src/renderer/src/components/chat/Chat.tsx @@ -45,12 +45,14 @@ const userMessage = { user: 'Me', role: "user", icon: "M", color: "#20dc8e" }; const Chat = (props) => { const chatBlockRef = useRef(null); + const bidsSpinWaitClosed = useRef(false); const [value, setValue] = useState(""); const [isLoading, setIsLoading] = useState(true); const [messages, setMessages] = useState([]); const [isOpen, setIsOpen] = useState(false); const [sessions, setSessions] = useState(); + const [providersAvailability, setProvidersAvailability] = useState([]); const [isSpinning, setIsSpinning] = useState(false); const [meta, setMeta] = useState({ budget: 0, supply: 0 }); @@ -81,14 +83,14 @@ const Chat = (props) => { useEffect(() => { (async () => { - const [meta, chainData, chats, userBalances] = await Promise.all([ - props.getMetaInfo(), + console.time("LOAD") + const [chainData, userSessions, chats] = await Promise.all([ props.getModelsData(), - props.client.getChatHistoryTitles() as Promise, - props.getBalances()]); + props.getSessionsByUser(props.address), + props.client.getChatHistoryTitles() as Promise]); - setBalances(userBalances) - setMeta(meta); + setBalances(chainData.userBalances) + setMeta(chainData.meta); setChainData(chainData) const mappedChatData = chats.reduce((res, item) => { @@ -106,7 +108,15 @@ const Chat = (props) => { }, [] as ChatData[]) setChatsData(mappedChatData); - const sessions = await refreshSessions(chainData?.models); + const sessions = userSessions.reduce((res, item) => { + const sessionModel = chainData.models.find(x => x.Id == item.ModelAgentId); + if (sessionModel) { + item.ModelName = sessionModel.Name; + res.push(item); + } + return res; + }, []); + setSessions(sessions); const openSessions = sessions.filter(s => !isClosed(s)); @@ -120,6 +130,7 @@ const Chat = (props) => { if (!openSessions.length) { useLocalModelChat(); + console.timeEnd("LOAD") return; } @@ -128,10 +139,11 @@ const Chat = (props) => { if (!latestSessionModel) { useLocalModelChat(); + console.timeEnd("LOAD") return; } - const openBid = latestSessionModel?.bids?.find(b => b.Id == latestSession.BidID); + const openBid = await props.getBidInfo(latestSession.BidID) if (!openBid) { useLocalModelChat(); @@ -141,12 +153,66 @@ const Chat = (props) => { setSelectedBid(openBid); setActiveSession(latestSession); setChat({ id: generateHashId(), createdAt: new Date(), modelId: latestSessionModel.ModelAgentId }); - })().then(() => { + console.timeEnd("LOAD") + })() + .then(() => { setIsLoading(false); }) }, []) + useEffect(() => { + if(!chainData) + return; + + (async () => { + const providersMap = chainData.providers.reduce((a, b) => ({ ...a, [b.Address.toLowerCase()]: b }), {}); + const modelsWithBids= (await Promise.all( + chainData.models.map(async m => { + const id = m.Id; + if(m.isLocal){ + return { id } + } + const bids = (await props.getBidsByModelId(id)) + .map(b => ({ ...b, ProviderData: providersMap[b.Provider.toLowerCase()], Model: m })) + .filter(b => b.ProviderData); + + if(!bids.length){ + return null; + } + + return { id, bids } + }) + )).reduce((acc, next) => { + if(!next) { + return acc; + } + const model = chainData.models.find(m => m.Id == next.id); + return [...acc, { ...model, bids: next.bids}] + }, []); + + setChainData({...chainData, models: modelsWithBids}) + bidsSpinWaitClosed.current = true; + })(); + + (async () => { + const availabilityResults = await props.getProvidersAvailability(chainData.providers); + setProvidersAvailability(availabilityResults); + })(); + + }, chainData) + + const spinWaitForBids = async () => { + if(bidsSpinWaitClosed.current) + return; + setIsLoading(true); + while(!bidsSpinWaitClosed.current) { + await new Promise(resolve => setTimeout(resolve, 300)); + } + setIsLoading(false); + } + const toggleDrawer = async () => { + spinWaitForBids(); setIsOpen((prevState) => !prevState) } @@ -228,9 +294,9 @@ const Chat = (props) => { } } - const refreshSessions = async (models = null) => { + const refreshSessions = async () => { const sessions = (await props.getSessionsByUser(props.address)).reduce((res, item) => { - const sessionModel = (models || chainData.models).find(x => x.Id == item.ModelAgentId); + const sessionModel = chainData.models.find(x => x.Id == item.ModelAgentId); if (sessionModel) { item.ModelName = sessionModel.Name; res.push(item); @@ -272,8 +338,6 @@ const Chat = (props) => { setSelectedModel(selectedModel); setIsReadonly(false); - // toggleDrawer(); - setChat({ ...chatData }) if (chatData.isLocal) { @@ -301,6 +365,7 @@ const Chat = (props) => { } const handleReopen = async () => { + spinWaitForBids(); setIsLoading(true); const newSessionId = await onOpenSession(true); setIsReadonly(false); @@ -428,11 +493,9 @@ const Chat = (props) => { const otherMessages = memoState.filter(m => m.id != part.id); if (imageContent) { result = [...otherMessages, { id: part.job, user: modelName, role: "assistant", text: imageContent, isImageContent: true, ...iconProps }]; - } - if (videoRawContent) { + } else if (videoRawContent) { result = [...otherMessages, { id: part.job, user: modelName, role: "assistant", text: videoRawContent, isVideoRawContent: true, ...iconProps }]; - } - else { + } else { const text = `${message?.text || ''}${part?.choices[0]?.delta?.content || ''}`.replace("<|im_start|>", "").replace("<|im_end|>", ""); result = [...otherMessages, { id: part.id, user: modelName, role: "assistant", text: text, ...iconProps }]; } @@ -521,7 +584,10 @@ const Chat = (props) => { setIsReadonly(false); setChat({ id: generateHashId(), createdAt: new Date(), modelId, isLocal }); - const selectedModel = isLocal ? chainData.models.find((m: any) => m.Id == modelId) : chainData.models.find((m: any) => m.Id == modelId && m.bids); + const selectedModel = isLocal + ? chainData.models.find((m: any) => m.Id == modelId) + : chainData.models.find((m: any) => m.Id == modelId && m.bids); + setSelectedModel(selectedModel); if (isLocal) { @@ -535,9 +601,7 @@ const Chat = (props) => { if (openModelSession) { const selectedBid = selectedModel.bids.find(b => b.Id == openModelSession.BidID && b.bids); - if (selectedBid) { - setSelectedBid(selectedBid); - } + setSelectedBid(selectedBid); setActiveSession(openModelSession) return; } @@ -616,7 +680,10 @@ const Chat = (props) => { setOpenChangeModal(true)}> + onClick={async () => { + await spinWaitForBids(); + setOpenChangeModal(true); + } }> New chat @@ -717,6 +784,7 @@ const Chat = (props) => { models={(chainData as any)?.models} isActive={openChangeModal} symbol={props.symbol} + providersAvailability={providersAvailability} onChangeModel={(eventData) => { onCreateNewChat(eventData); }} diff --git a/ui-desktop/src/renderer/src/components/chat/modals/ModelRow.tsx b/ui-desktop/src/renderer/src/components/chat/modals/ModelRow.tsx index 2d2b1581..ea0e0bb3 100644 --- a/ui-desktop/src/renderer/src/components/chat/modals/ModelRow.tsx +++ b/ui-desktop/src/renderer/src/components/chat/modals/ModelRow.tsx @@ -99,12 +99,7 @@ function ModelRow(props) { const bids = props?.model?.bids || []; const modelId = props?.model?.Id || ''; const isLocal = props?.model?.isLocal; - const lastAvailabilityCheck: Date = (() => { - if(!bids?.length) { - return new Date(); - } - return bids.map(b => new Date(b.ProviderData?.availabilityUpdatedAt ?? new Date()))[0]; - })(); + const lastAvailabilityCheck: Date = new Date(props?.model?.lastCheck ?? new Date()); const [selected, changeSelected] = useState(); const [useSelect, setUseSelect] = useState(); diff --git a/ui-desktop/src/renderer/src/components/chat/modals/ModelSelectionModal.tsx b/ui-desktop/src/renderer/src/components/chat/modals/ModelSelectionModal.tsx index 23e7bc5c..56b0593e 100644 --- a/ui-desktop/src/renderer/src/components/chat/modals/ModelSelectionModal.tsx +++ b/ui-desktop/src/renderer/src/components/chat/modals/ModelSelectionModal.tsx @@ -36,7 +36,7 @@ const RVContainer = styled(RVList)` overflow: visible !important; }` -const ModelSelectionModal = ({ isActive, handleClose, models, onChangeModel, symbol }) => { +const ModelSelectionModal = ({ isActive, handleClose, models, onChangeModel, symbol, providersAvailability }) => { const [search, setSearch] = useState(); if (!isActive) { @@ -49,7 +49,30 @@ const ModelSelectionModal = ({ isActive, handleClose, models, onChangeModel, sym } const sortedModels = models - .map(m => ({ ...m, isOnline: m.isLocal || m.bids.some(b => b.ProviderData?.availabilityStatus != "disconnected") })) + .map(m => { + if(m.isLocal || !providersAvailability){ + return ({...m, isOnline: true }) + } + + const info = m.bids.reduce((acc, next) => { + const entry = providersAvailability.find(pa => pa.id == next.Provider); + if(!entry) { + return acc; + } + + if(entry.isOnline) { + return acc; + } + + const isOnline = entry.status != "disconnected"; + + return { + isOnline, + lastCheck: !isOnline ? entry.time : undefined + } + }, {}); + return ({ ...m, ...info }) + } ) .sort((a, b) => b.isOnline - a.isOnline); const filterdModels = search ? sortedModels.filter(m => m.Name.toLowerCase().includes(search.toLowerCase())) : sortedModels; diff --git a/ui-desktop/src/renderer/src/components/dashboard/Dashboard.jsx b/ui-desktop/src/renderer/src/components/dashboard/Dashboard.jsx index 5d5d4ef6..e74d9606 100644 --- a/ui-desktop/src/renderer/src/components/dashboard/Dashboard.jsx +++ b/ui-desktop/src/renderer/src/components/dashboard/Dashboard.jsx @@ -56,6 +56,7 @@ const Dashboard = ({ ethCoinPrice, loadTransactions, getStakedFunds, + explorerUrl, ...props }) => { const [activeModal, setActiveModal] = useState(null) @@ -174,7 +175,7 @@ const Dashboard = ({ window.openLink(`https://sepolia.arbiscan.io/address/${address}`)} + onClick={() => window.openLink(explorerUrl)} block > Transaction Explorer diff --git a/ui-desktop/src/renderer/src/store/hocs/withChatState.jsx b/ui-desktop/src/renderer/src/store/hocs/withChatState.jsx index f85b1c0f..e42afde0 100644 --- a/ui-desktop/src/renderer/src/store/hocs/withChatState.jsx +++ b/ui-desktop/src/renderer/src/store/hocs/withChatState.jsx @@ -4,7 +4,7 @@ import React from 'react'; import { ToastsContext } from '../../components/toasts'; import selectors from '../selectors'; import axios from 'axios'; -import { getSessionsByUser, getBidsByModelId } from '../utils/apiCallsHelper'; +import { getSessionsByUser, getBidsByModelId, getBidInfoById } from '../utils/apiCallsHelper'; const AvailabilityStatus = { available: "available", @@ -93,56 +93,27 @@ const withChatState = WrappedComponent => { } getModelsData = async () => { - const [localModels, modelsResp, providersResp] = await Promise.all([ + const [localModels, modelsResp, providersResp, meta, userBalances] = await Promise.all([ this.getLocalModels(), this.getAllModels(), - this.getProviders()]); + this.getProviders(), + this.getMetaInfo(), + this.getBalances()]); const models = modelsResp.filter(m => !m.IsDeleted); const providers = providersResp.filter(m => !m.IsDeleted); - const availabilityResults = await this.getProvidersAvailability(providers); - availabilityResults.forEach(ar => { - const provider = providers.find(p => p.Address == ar.id); - if(!provider) - return; - - provider.availabilityStatus = ar.status; - provider.availabilityUpdatedAt = ar.time; - }); - - const providersMap = providers.reduce((a, b) => ({ ...a, [b.Address.toLowerCase()]: b }), {}); - - const responses = (await Promise.all( - models.map(async m => { - const id = m.Id; - const bids = (await getBidsByModelId(this.props.config.chain.localProxyRouterUrl, id)) - .filter(b => +b.DeletedAt === 0) - .map(b => ({ ...b, ProviderData: providersMap[b.Provider.toLowerCase()], Model: m })) - .filter(b => b.ProviderData) - .filter(b => b.Provider != this.props.address); + const result = [...localModels.map(m => ({...m, isLocal: true })), ...models]; - return { id, bids } - }) - )).reduce((a,b) => ({...a, [b.id]: b.bids}), {}); - - const result = [...localModels.map(m => ({...m, isLocal: true }))]; - - for (const model of models) { - const id = model.Id; - const bids = responses[id]; - - if(!bids.length) { - continue; - } - - result.push({ ...model, bids }) - } - - return { models: result, providers } + return { models: result, providers, meta, userBalances } } getProvidersAvailability = async (providers) => { + const isValidUrl = (url) => { + const urlRegex = /^(https?:\/\/)?(([a-zA-Z0-9.-]+\.[a-zA-Z]{2,}|localhost)|(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))(:\d{1,5})?(\/\S*)?$/; + return urlRegex.test(url); + } + const availabilityResults = await Promise.all(providers.map(async p => { try { const storedRecord = JSON.parse(localStorage.getItem(p.Address)); @@ -156,14 +127,12 @@ const withChatState = WrappedComponent => { } } - const endpoint = p.Endpoint; - const [domain, port] = endpoint.split(":"); - const { data } = await axios.post("https://portchecker.io/api/v1/query", { - host: domain, - ports: [port], - }); - - const isValid = !!data.check?.find((c) => c.port == port && c.status == true); + if(!isValidUrl(p.Endpoint)) { + return ({ id: p.Address, status: AvailabilityStatus.disconnected, time: new Date() }); + } + + const isValid = await this.props.client.checkProviderConnectivity({ endpoint: p.Endpoint, address: p.Address }) + const record = ({id: p.Address, status: isValid ? AvailabilityStatus.available : AvailabilityStatus.disconnected, time: new Date() }); localStorage.setItem(record.id, JSON.stringify({ status: record.status, time: record.time })); return record; @@ -190,6 +159,23 @@ const withChatState = WrappedComponent => { return await getSessionsByUser(this.props.config.chain.localProxyRouterUrl, user); } + getBidInfo = async (id) => { + if(!id){ + return; + } + + return await getBidInfoById(this.props.config.chain.localProxyRouterUrl, id) + } + + getBidsByModelId = async(modelId) => { + if(!modelId) { + return; + } + + const bids = await getBidsByModelId(this.props.config.chain.localProxyRouterUrl, modelId); + return bids.filter(b => +b.DeletedAt === 0).filter(b => b.Provider != this.props.address); + } + onOpenSession = async ({ modelId, duration }) => { this.context.toast('info', 'Processing...'); try { @@ -229,8 +215,10 @@ const withChatState = WrappedComponent => { return ( { page: selectors.getTransactionPage(state), pageSize: selectors.getTransactionPageSize(state), hasNextPage: selectors.getHasNextPage(state), + explorerUrl: selectors.getContractExplorerUrl(state, { + hash: selectors.getWalletAddress(state) + }) }); const mapDispatchToProps = dispatch => ({ diff --git a/ui-desktop/src/renderer/src/store/utils/apiCallsHelper.tsx b/ui-desktop/src/renderer/src/store/utils/apiCallsHelper.tsx index 13354489..40bbcd4e 100644 --- a/ui-desktop/src/renderer/src/store/utils/apiCallsHelper.tsx +++ b/ui-desktop/src/renderer/src/store/utils/apiCallsHelper.tsx @@ -16,7 +16,7 @@ export const getSessionsByUser = async (url, user) => { } } - const limit = 20; + const limit = 50; let offset = 0; let sessions: any[] = []; let all = false; @@ -30,7 +30,7 @@ export const getSessionsByUser = async (url, user) => { all = true; } else { - offset++; + offset += limit; } } @@ -55,7 +55,7 @@ export const getBidsByModelId = async (url, modelId) => { } } - const limit = 20; + const limit = 50; let offset = 0; let bids: any[] = []; let all = false; @@ -69,9 +69,22 @@ export const getBidsByModelId = async (url, modelId) => { all = true; } else { - offset++; + offset += limit; } } return bids; +} + +export const getBidInfoById = async (url, id) => { + try { + const path = `${url}/blockchain/bids/${id}` + const response = await fetch(path); + const data = await response.json(); + return data.bid; + } + catch (e) { + console.log("Error", e) + return undefined; + } } \ No newline at end of file