Skip to content

Commit

Permalink
Add handshake to determine wire encoding and protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmorgan committed Sep 12, 2024
1 parent 5c53968 commit 9cd9887
Show file tree
Hide file tree
Showing 18 changed files with 295 additions and 117 deletions.
3 changes: 1 addition & 2 deletions pkgs/_analyzer_macros/lib/macro_implementation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class AnalyzerMacroImplementation implements injected.MacroImplementation {
AnalyzerMacroImplementation._(
packageConfig,
await MacroHost.serve(
// TODO(davidmorgan): this should be negotiated per client, not
// set here.
// TODO(davidmorgan): support serving multiple protocols.
protocol: protocol,
packageConfig: packageConfig,
queryService: AnalyzerQueryService()));
Expand Down
3 changes: 1 addition & 2 deletions pkgs/_cfe_macros/lib/macro_implementation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class CfeMacroImplementation implements injected.MacroImplementation {
/// [packageConfig] is the package config of the workspace of the code being
/// edited.
static Future<CfeMacroImplementation> start({
// TODO(davidmorgan): this should be negotiated per client, not
// set here.
// TODO(davidmorgan): support serving multiple protocols.
required Protocol protocol,
required Uri packageConfig,
}) async =>
Expand Down
5 changes: 1 addition & 4 deletions pkgs/_macro_builder/lib/src/bootstrap.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ import 'package:macro_service/macro_service.dart' as macro_service;
void main(List<String> arguments) {
macro_client.MacroClient.run(
// TODO(davidmorgan): this should be negotiated, not passed here.
protocol: macro_service.Protocol.fromJson(
convert.json.decode(arguments[0])),
endpoint: macro_service.HostEndpoint.fromJson(
convert.json.decode(arguments[1])),
convert.json.decode(arguments[0])),
macros: [''');
for (var i = 0; i != macros.length; ++i) {
final macro = macros[i];
Expand Down
5 changes: 1 addition & 4 deletions pkgs/_macro_builder/test/macro_builder_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ import 'package:macro_service/macro_service.dart' as macro_service;
void main(List<String> arguments) {
macro_client.MacroClient.run(
// TODO(davidmorgan): this should be negotiated, not passed here.
protocol: macro_service.Protocol.fromJson(
convert.json.decode(arguments[0])),
endpoint: macro_service.HostEndpoint.fromJson(
convert.json.decode(arguments[1])),
convert.json.decode(arguments[0])),
macros: [m0.DeclareX(), m1.DeclareY(), m2.OtherMacroImplementation()]);
}
''');
Expand Down
52 changes: 41 additions & 11 deletions pkgs/_macro_client/lib/macro_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:dart_model/dart_model.dart';
Expand All @@ -16,38 +17,67 @@ import 'package:macro_service/macro_service.dart';
/// TODO(davidmorgan): split to multpile implementations depending on
/// transport used to connect to host.
class MacroClient {
final Protocol protocol;
/// The protocol, once the handshake is complete.
late final Protocol protocol;
final Iterable<Macro> macros;
final Socket socket;
late final RemoteMacroHost _host;

/// A completer for each pending request to the host, by request ID.
final Map<int, Completer<Response>> _responseCompleters = {};

MacroClient._(this.protocol, this.macros, this.socket) {
MacroClient._(this.macros, this.socket) {
_host = RemoteMacroHost(this);
_start();
}

// TODO(davidmorgan): negotiation about protocol version goes here.
/// Does the protocol handshake then sends [MacroStartedRequest] for each
/// macro.
void _start() async {
// The incoming data starts as JSON strings, `handshakeProtocol`, then
// switches to the agreed-upon protocol. So use a broadcast stream to
// allow processing the stream in two different ways.
final broadcastStream = socket.asBroadcastStream();
// Prepare to receive the handshake response, but it won't be sent until
// after `HandshakeRequest` is sent below.
final firstResponse = handshakeProtocol.decode(broadcastStream).first;
// Send `HandshakeRequest` telling the host what protocols this macro
// bundle supports.
Protocol.handshakeProtocol.send(
socket.add,
HandshakeRequest(protocols: [
Protocol(
encoding: ProtocolEncoding.json,
version: ProtocolVersion.macros1),
Protocol(
encoding: ProtocolEncoding.binary,
version: ProtocolVersion.macros1),
]).node);
// Read `HandshakeResponse`, get from it the protocol to use for the rest of
// the stream, and decode+handle using that protocol.
final handshakeResponse = HandshakeResponse.fromJson(await firstResponse);
protocol = handshakeResponse.protocol!;
protocol.decode(broadcastStream).listen(_handleRequest);

// Note that reading `HandshakeResponse` then switching protocol relies on
// no other messages arriving in the same chunk as `HandshakeResponse`. This
// is guaranteed because the host won't send anything else until it receives
// a `MacroStartedRequest` that is sent next.

// Tell the host which macros are in this bundle.
for (final macro in macros) {
_sendRequest(MacroRequest.macroStartedRequest(
unawaited(_sendRequest(MacroRequest.macroStartedRequest(
MacroStartedRequest(macroDescription: macro.description),
id: nextRequestId));
id: nextRequestId)));
}

protocol.decode(socket).listen(_handleRequest);
}

/// Runs [macros] for the host at [endpoint].
static Future<MacroClient> run({
// TODO(davidmorgan): this should be negotiated, not just passed in.
required Protocol protocol,
required HostEndpoint endpoint,
required Iterable<Macro> macros,
}) async {
final socket = await Socket.connect('localhost', endpoint.port);
return MacroClient._(protocol, macros, socket);
return MacroClient._(macros, socket);
}

Future<Response> _sendRequest(MacroRequest request) async {
Expand Down
48 changes: 34 additions & 14 deletions pkgs/_macro_client/test/macro_client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,55 @@ import 'package:test/test.dart';

void main() {
for (final protocol in [
Protocol(encoding: ProtocolEncoding.json),
Protocol(encoding: ProtocolEncoding.binary)
Protocol(encoding: ProtocolEncoding.json, version: ProtocolVersion.macros1),
Protocol(
encoding: ProtocolEncoding.binary, version: ProtocolVersion.macros1)
]) {
group('MacroClient using ${protocol.encoding}', () {
/// Waits for [HandshakRequest] on [socket], sends response choosing
/// [protocol], returns next responses decoded using [protocol].
Future<StreamQueue<Map<String, Object?>>> handshake(Socket socket) async {
final broadcastStream = socket.asBroadcastStream();
final handshakeRequest =
await handshakeProtocol.decode(broadcastStream).first;
final result = StreamQueue(protocol.decode(broadcastStream));

expect(handshakeRequest, {
'protocols': [
{'encoding': 'json', 'version': 'macros1'},
{'encoding': 'binary', 'version': 'macros1'}
]
});
Protocol.handshakeProtocol
.send(socket.add, HandshakeResponse(protocol: protocol).node);

return result;
}

test('connects to service', () async {
final serverSocket = await ServerSocket.bind('localhost', 0);
addTearDown(serverSocket.close);

unawaited(MacroClient.run(
protocol: protocol,
endpoint: HostEndpoint(port: serverSocket.port),
macros: [DeclareXImplementation()]));

await (await serverSocket.first.timeout(const Duration(seconds: 10)))
.close();
final socket = await serverSocket.first;
await handshake(socket);
});

test('error response if no such macro', () async {
final serverSocket = await ServerSocket.bind('localhost', 0);

unawaited(MacroClient.run(
protocol: protocol,
endpoint: HostEndpoint(port: serverSocket.port),
macros: []));
macros: [QueryClassImplementation()]));

final socket = await serverSocket.first;
final responses = StreamQueue(protocol.decode(socket));
final responses = await handshake(socket);

// MacroStartedRequest, ignore.
await responses.next;

final requestId = nextRequestId;
protocol.send(
Expand All @@ -53,6 +75,7 @@ void main() {
name: 'DeclareX'),
AugmentRequest(phase: 2))
.node);

final augmentResponse = await responses.next;
expect(augmentResponse, {
'requestId': requestId,
Expand All @@ -68,13 +91,12 @@ void main() {
final serverSocket = await ServerSocket.bind('localhost', 0);

unawaited(MacroClient.run(
protocol: protocol,
endpoint: HostEndpoint(port: serverSocket.port),
macros: [DeclareXImplementation()]));

final socket = await serverSocket.first;
final responses = await handshake(socket);

final responses = StreamQueue(protocol.decode(socket));
final descriptionResponse = await responses.next;
expect(descriptionResponse, {
'id': descriptionResponse['id'],
Expand Down Expand Up @@ -116,13 +138,12 @@ void main() {
final serverSocket = await ServerSocket.bind('localhost', 0);

unawaited(MacroClient.run(
protocol: protocol,
endpoint: HostEndpoint(port: serverSocket.port),
macros: [QueryClassImplementation()]));

final socket = await serverSocket.first;
final responses = await handshake(socket);

final responses = StreamQueue(protocol.decode(socket));
final descriptionResponse = await responses.next;
expect(descriptionResponse, {
'id': descriptionResponse['id'],
Expand Down Expand Up @@ -196,13 +217,12 @@ void main() {
final serverSocket = await ServerSocket.bind('localhost', 0);

unawaited(MacroClient.run(
protocol: protocol,
endpoint: HostEndpoint(port: serverSocket.port),
macros: [QueryClassImplementation()]));

final socket = await serverSocket.first;
final responses = await handshake(socket);

final responses = StreamQueue(protocol.decode(socket));
// MacroStartedRequest, ignore.
await responses.next;

Expand Down
7 changes: 2 additions & 5 deletions pkgs/_macro_host/lib/macro_host.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MacroHost {

/// Starts a macro host with introspection queries handled by [queryService].
static Future<MacroHost> serve({
// TODO(davidmorgan): this should be negotiated per client, not set here.
// TODO(davidmorgan): support serving multiple protocols.
required Protocol protocol,
required Uri packageConfig,
required QueryService queryService,
Expand Down Expand Up @@ -86,10 +86,7 @@ class MacroHost {
_hostService._macroState[annotation.asString] = _MacroState();
final macroBundle = await macroBuilder.build(
macroPackageConfig.uri, [lookupMacroImplementation(annotation)!]);
macroRunner.start(
macroBundle: macroBundle,
protocol: macroServer.protocol,
endpoint: macroServer.endpoint);
macroRunner.start(macroBundle: macroBundle, endpoint: macroServer.endpoint);
}
}

Expand Down
9 changes: 3 additions & 6 deletions pkgs/_macro_runner/lib/macro_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ import 'package:macro_service/macro_service.dart';
class MacroRunner {
/// Starts [macroBundle] connected to [endpoint].
void start(
{required BuiltMacroBundle macroBundle,
// TODO(davidmorgan): this should be negotiated, not passed here.
required Protocol protocol,
required HostEndpoint endpoint}) {
Process.run(macroBundle.executablePath,
[json.encode(protocol), json.encode(endpoint)]).then((result) {
{required BuiltMacroBundle macroBundle, required HostEndpoint endpoint}) {
Process.run(macroBundle.executablePath, [json.encode(endpoint)])
.then((result) {
if (result.exitCode != 0) {
print('Macro process exited with error: ${result.stderr}');
}
Expand Down
1 change: 0 additions & 1 deletion pkgs/_macro_runner/test/macro_runner_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ void main() {
final runner = MacroRunner();
runner.start(
macroBundle: bundle,
protocol: protocol,
endpoint: HostEndpoint(port: serverSocket.port));

expect(
Expand Down
27 changes: 22 additions & 5 deletions pkgs/_macro_server/lib/macro_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class MacroServer {
///
/// TODO(davidmorgan): other transports besides TCP/IP.
static Future<MacroServer> serve({
// TODO(davidmorgan): this should be negotiated per client, not set for
// each server.
// TODO(davidmorgan): support serving multiple protocols.
required Protocol protocol,
required HostService service,
}) async {
Expand Down Expand Up @@ -65,10 +64,28 @@ class MacroServer {
return connection.responses.where((r) => r.requestId == request.id).first;
}

void _handleConnection(Socket socket) {
void _handleConnection(Socket socket) async {
final connection = _Connection(socket);
_connections.add(connection);
protocol.decode(socket).forEach((jsonData) {

final broadcastStream = socket.asBroadcastStream();
final firstRequest =
Protocol.handshakeProtocol.decode(broadcastStream).first;
final handshakeRequest = HandshakeRequest.fromJson(await firstRequest);
// TODO(davidmorgan): compute intersection of requested and supported
// protocols.
if (!handshakeRequest.protocols.any((p) => p.equals(protocol))) {
throw StateError('No requested protocol is supported.');
}
// The macro bundle relies on this message arriving fully before any other
// message arrives. This is guaranteed because `sendToMacro` waits for a
// matching `MacroStartedRequest` from the bundle before sending to it,
// and `MacroStartedRequest` is sent after this message is received and
// the protocol set.
Protocol.handshakeProtocol
.send(socket.add, HandshakeResponse(protocol: protocol).node);

protocol.decode(broadcastStream).forEach((jsonData) {
final request = MacroRequest.fromJson(jsonData);
if (request.type.isKnown) {
if (request.type == MacroRequestType.macroStartedRequest) {
Expand All @@ -85,7 +102,7 @@ class MacroServer {
if (response.type.isKnown) {
connection._responsesController.add(response);
}
});
}).ignore();
}
}

Expand Down
Loading

0 comments on commit 9cd9887

Please sign in to comment.