Skip to content

Commit

Permalink
gltesting: Implement a grpc-web-proxy for node access
Browse files Browse the repository at this point in the history
Besides the grpc-web proxy functionality, the node grpc-web-proxy also
needs the capability of locating and starting a node if it hasn't been
started yet. It also needs to be configured with the correct client
certificates. The `NodeHandler` class encapsulates that logic.

We also test it by running a `GetInfo` call through the
grpc-web-client, the node-grpc-web-proxy-, finally hitting the node,
and back again.
  • Loading branch information
cdecker committed Nov 21, 2024
1 parent f12ceef commit 9574d63
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 19 deletions.
15 changes: 14 additions & 1 deletion libs/gl-testing/gltesting/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from gltesting.network import node_factory
from pyln.testing.fixtures import directory as str_directory
from decimal import Decimal
from gltesting.grpcweb import GrpcWebProxy
from gltesting.grpcweb import GrpcWebProxy, NodeHandler
from clnvm import ClnVersionManager


Expand Down Expand Up @@ -220,3 +220,16 @@ def grpc_web_proxy(scheduler, grpc_test_server):
yield p

p.stop()


@pytest.fixture
def node_grpc_web_proxy(scheduler):
"""A grpc-web proxy that knows how to talk to nodes.
"""
p = GrpcWebProxy(scheduler=scheduler, grpc_port=0)
p.handler_cls = NodeHandler
p.start()

yield p

p.stop()
136 changes: 119 additions & 17 deletions libs/gl-testing/gltesting/grpcweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
import logging
import struct
import httpx
from dataclasses import dataclass
from typing import Dict
import ssl


class GrpcWebProxy(object):
Expand All @@ -26,15 +30,20 @@ def __init__(self, scheduler: Scheduler, grpc_port: int):
self.logger.info(
f"GrpcWebProxy configured to forward requests from web_port={self.web_port} to grpc_port={self.grpc_port}"
)
self.handler_cls = Handler

def start(self):
self._thread = Thread(target=self.run, daemon=True)
self.logger.info(f"Starting grpc-web-proxy on port {self.web_port}")
self.running = True
server_address = ("127.0.0.1", self.web_port)

self.httpd = ThreadingHTTPServer(server_address, Handler)
self.httpd = ThreadingHTTPServer(server_address, self.handler_cls)
self.httpd.grpc_port = self.grpc_port

# Just a simple way to pass the scheduler to the handler
self.httpd.scheduler = self.scheduler

self.logger.debug(f"Server startup complete")
self._thread.start()

Expand All @@ -47,11 +56,49 @@ def stop(self):
self._thread.join()


@dataclass
class Request:
body: bytes
headers: Dict[str, str]
flags: int
length: int


@dataclass
class Response:
body: bytes


class Handler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger("gltesting.grpcweb.Handler")
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)

def proxy(self, request) -> Response:
"""Callback called with the request, implementing the proxying."""
url = f"http://localhost:{self.server.grpc_port}{self.path}"
self.logger.debug(f"Forwarding request to '{url}'")
headers = {
"te": "trailers",
"Content-Type": "application/grpc",
"grpc-accept-encoding": "identity",
"user-agent": "gl-testing-grpc-web-proxy",
}
content = struct.pack("!cI", request.flags, request.length) + request.body
req = httpx.Request(
"POST",
url,
headers=headers,
content=content,
)
client = httpx.Client(http1=False, http2=True)
res = client.send(req)
return Response(body=res.content)

def auth(self, request: Request) -> bool:
"""Authenticate the request. True means allow."""
return True

def do_POST(self):
# We don't actually touch the payload, so we do not really
# care about the flags ourselves. The upstream sysmte will
Expand All @@ -69,40 +116,95 @@ def do_POST(self):
# need to decode it, and we can treat it as opaque blob.
body = self.rfile.read(length)

req = Request(body=body, headers=self.headers, flags=flags, length=length)
if not self.auth(req):
self.wfile.write(b"HTTP/1.1 401 Unauthorized\r\n\r\n")
return

response = self.proxy(req)
self.wfile.write(b"HTTP/1.0 200 OK\n\n")
self.wfile.write(response.body)
self.wfile.flush()


class NodeHandler(Handler):
"""A handler that is aware of nodes, their auth and how they schedule."""

def __init__(self, *args, **kwargs):
self.logger = logging.getLogger("gltesting.grpcweb.NodeHandler")
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)

def auth(self, request: Request) -> bool:
# TODO extract the `glauthpubkey` and the `glauthsig`, then
# verify them. Fail the call if the verification fails,
# forward otherwise.
# This is just a test server, and we don't make use of the
# multiplexing support in `h2`, which simplifies this proxy
# quite a bit. The production server maintains a cache of
# connections and multiplexes correctly.
pk = request.headers.get("glauthpubkey", None)
sig = request.headers.get("glauthsig", None)
ts = request.headers.get("glts", None)

import httpx
if not pk:
self.logger.warn(f"Missing public key header")
return False

url = f"http://localhost:{self.server.grpc_port}{self.path}"
self.logger.debug(f"Forwarding request to '{url}'")
if not sig:
self.logger.warn(f"Missing signature header")
return False

if not ts:
self.logger.warn(f"Missing timestamp header")
return False

# TODO Check the signature.
return True

def proxy(self, request: Request):
# Fetch current location of the node

pk = request.headers.get("glauthpubkey")
from base64 import b64decode

pk = b64decode(pk)

node = self.server.scheduler.get_node(pk)
self.logger.debug(f"Found node for node_id={pk.hex()}")

# TODO Schedule node if not scheduled

client_cert = node.identity.private_key
ca_path = node.identity.caroot_path

# Load TLS client cert info client
ctx = httpx.create_ssl_context(
verify=ca_path,
http2=True,
cert=(
node.identity.cert_chain_path,
node.identity.private_key_path,
),
)
client = httpx.Client(http1=False, http2=True, verify=ctx)

url = f"{node.process.grpc_uri}{self.path}"
headers = {
"te": "trailers",
"Content-Type": "application/grpc",
"grpc-accept-encoding": "idenity",
"user-agent": "My bloody hacked up script",
"grpc-accept-encoding": "identity",
"user-agent": "gl-testing-grpc-web-proxy",
}
content = struct.pack("!cI", flags, length) + body
content = struct.pack("!cI", request.flags, request.length) + request.body

# Forward request
req = httpx.Request(
"POST",
url,
headers=headers,
content=content,
)
client = httpx.Client(http1=False, http2=True)

res = client.send(req)
res = client.send(req)

canned = b"\n\rheklllo world"
l = struct.pack("!I", len(canned))
self.wfile.write(b"HTTP/1.0 200 OK\n\n")
self.wfile.write(b"\x00")
self.wfile.write(l)
self.wfile.write(canned)
self.wfile.flush()
# Return response
return Response(body=res.content)
48 changes: 48 additions & 0 deletions libs/gl-testing/tests/test_grpc_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@
from gltesting.test_pb2_grpc import GreeterStub
from gltesting.test_pb2 import HelloRequest
import sonora.client
from pyln import grpc as clnpb
from base64 import b64encode
from time import time
import struct
from typing import Any


class GrpcWebClient:
"""A simple grpc-web client that implements the calling convention."""

def __init__(self, node_grpc_web_proxy_uri, node_id: bytes):
self.node_id = node_id
self.node_grpc_web_proxy_uri = node_grpc_web_proxy_uri
self.channel = sonora.client.insecure_web_channel(node_grpc_web_proxy_uri)
self.stub = clnpb.NodeStub(self.channel)

def call(self, method_name: str, req: Any) -> Any:
ts = struct.pack("!Q", int(time() * 1000))
metadata = [
("glauthpubkey", b64encode(self.node_id).decode("ASCII")),
("glauthsig", b64encode(b"\x00" * 64).decode("ASCII")),
("glts", b64encode(ts).decode("ASCII")),
]
func = self.stub.__dict__.get(method_name)
return func(req, metadata=metadata)


def test_start(grpc_web_proxy):
with sonora.client.insecure_web_channel(
Expand All @@ -14,3 +40,25 @@ def test_start(grpc_web_proxy):
req = HelloRequest(name="greenlight")
print(stub.SayHello(req))


def test_node_grpc_web(scheduler, node_grpc_web_proxy, clients):
"""Ensure that the"""
# Start by creating a node
c = clients.new()
c.register(configure=True)
n = c.node()
info = n.get_info()

# Now extract the TLS certificates, so we can sign the payload.
# TODO Configure the web client to sign its requests too
node_id = info.id
key_path = c.directory / "device-key.pem"
ca_path = c.directory / "ca.pem"

proxy_uri = f"http://localhost:{node_grpc_web_proxy.web_port}"
web_client = GrpcWebClient(proxy_uri, node_id)

# Issue a request to the node through the proxy.
req = clnpb.GetinfoRequest()
info = web_client.call("Getinfo", req)
print(info)
2 changes: 1 addition & 1 deletion libs/gl-testing/tests/util/grpcserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ def target():
self.thread.start()

def stop(self):
self.inner.aclose
pass

0 comments on commit 9574d63

Please sign in to comment.