Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protocol version negotiation #7

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/espdy.hrl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-define(LOG(S,A), io:format("~p\t" ++ S ++"\n",[self()|A])).
-define(LOG(S,A), io:format(case whereis(spdy_logging) of undefined -> standard_io; LoggingPid -> LoggingPid end, "~p\t" ++ S ++"\n",[self()|A])).

%% DATA FRAMES:
-record(spdy_data, {
Expand Down
67 changes: 45 additions & 22 deletions src/espdy_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ init([Socket, Transport, CBMod, Opts]) ->
ok = zlib:inflateInit(Zinf),
Zdef = zlib:open(),
ok = zlib:deflateInit(Zdef),
%%ok = zlib:deflateInit(Z, best_compression,deflated, 15, 9, default),
case SpdyVersion of
2 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT);
3 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT_V3)
end,
State = #state{ socket=Socket,
cbmod=CBMod,
transport=Transport,
Expand All @@ -70,9 +65,18 @@ init([Socket, Transport, CBMod, Opts]) ->
spdy_version = SpdyVersion,
spdy_opts=Opts
},
?LOG("SPDY_VERSION init v~B ~p ~p",[SpdyVersion, self(), State]),
init_deflate(State),
?LOG("SPDY_VERSION init v:~p ~p ~p",[SpdyVersion, self(), State]),
{ok, State}.

init_deflate(State) ->
Zdef = State#state.z_context_def,
case State#state.spdy_version of
2 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT);
3 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT_V3);
negotiate -> deferred
end.

handle_call(none_implemented, _From, State) ->
Reply = ok,
{reply, Reply, State}.
Expand Down Expand Up @@ -201,6 +205,7 @@ handle_frame(#spdy_syn_stream{ version=_Version,
self(),
Headers,
State#state.cbmod,
lookup_setting(?SETTINGS_INITIAL_WINDOW_SIZE, State),
State#state.spdy_opts),
%% TODO pass fin into startlink?
hasflag(Flags,?DATA_FLAG_FIN) andalso espdy_stream:received_fin(Pid),
Expand All @@ -222,9 +227,7 @@ handle_frame(#spdy_syn_stream{version=FrameVersion,
streamid=StreamID},
State=#state{spdy_version=SessionVersion}) ->
?LOG("SYN_STREAM mismatched version, ~p -> ~p, sending stream_error", [FrameVersion, SessionVersion]),
% UNSUPPORTED_VERSION is reserved for stream recipients (i.e. the client),
% so send a generic PROTOCOL_ERROR.
stream_error(protocol_error, #stream{id=StreamID}, State),
stream_error(unsupported_version, #stream{id=StreamID}, State),
State;

handle_frame(#spdy_syn_reply{version=_Version,
Expand Down Expand Up @@ -253,8 +256,7 @@ handle_frame(#spdy_syn_reply{version=_Version,
handle_frame(#spdy_syn_reply{version=FrameVersion,
streamid=StreamID},
State=#state{spdy_version=SessionVersion}) ->
?LOG("SYN_REPLY mismatched version, ~p -> ~p, sending stream_error", [FrameVersion, SessionVersion]),
stream_error(unsupported_version, #stream{id=StreamID}, State),
?LOG("SYN_REPLY mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]),
State;

handle_frame(#spdy_rst_stream{version=_Version,
Expand All @@ -275,10 +277,15 @@ handle_frame(#spdy_rst_stream{version=_Version,

handle_frame(#spdy_rst_stream{version=FrameVersion},
State=#state{spdy_version=SessionVersion}) ->
?LOG("RST_STREAM mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]),
session_error(protocol_error, State),
?LOG("RST_STREAM mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]),
State;

handle_frame(Settings = #spdy_settings{version=Version}, State=#state{spdy_version=negotiate, spdy_opts=Opts}) ->
?LOG("SETTINGS mismatched version, ~p, switching versions", [Version]),
NewOpts = [{spdy_version, Version} | proplists:delete(spdy_version, Opts)],
NewState = State#state{spdy_version=Version, spdy_opts=NewOpts},
init_deflate(NewState),
handle_frame(Settings, NewState);
handle_frame(#spdy_settings{version=_Version,
flags=_Flags,
settings=Settings
Expand All @@ -288,11 +295,9 @@ handle_frame(#spdy_settings{version=_Version,

handle_frame(#spdy_settings{version=FrameVersion},
State=#state{spdy_version=SessionVersion}) ->
?LOG("SETTINGS mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]),
session_error(protocol_error, State),
?LOG("SETTINGS mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]),
State;


handle_frame(#spdy_noop{version=2}, State) ->
State;

Expand All @@ -302,8 +307,7 @@ handle_frame(F=#spdy_ping{version=_Version}, State=#state{spdy_version=_Version}

handle_frame(#spdy_ping{version=FrameVersion},
State=#state{spdy_version=SessionVersion}) ->
?LOG("PING mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]),
session_error(protocol_error, State),
?LOG("PING mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]),
State;

handle_frame(#spdy_goaway{version=_Version,
Expand All @@ -317,8 +321,7 @@ handle_frame(#spdy_goaway{version=_Version,

handle_frame(#spdy_goaway{version=FrameVersion},
State=#state{spdy_version=SessionVersion}) ->
?LOG("GOAWAY mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]),
session_error(protocol_error, State),
?LOG("GOAWAY mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]),
State;

handle_frame(#spdy_headers{version=_Version,
Expand All @@ -342,10 +345,23 @@ handle_frame(#spdy_headers{version=_Version,
handle_frame(#spdy_headers{version=FrameVersion,
streamid=StreamID},
State=#state{spdy_version=SessionVersion}) ->
?LOG("HEADERS mismatched version, ~p -> ~p, sending stream_error", [FrameVersion, SessionVersion]),
stream_error(unsupported_version, #stream{id=StreamID}, State),
?LOG("HEADERS mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]),
State;

handle_frame(#spdy_window_update{ streamid=StreamID,
delta_size=DeltaSize}, State) ->
case lookup_stream(StreamID, State) of
undefined ->
F = #spdy_rst_stream{version=State#state.spdy_version,
streamid=StreamID,
statuscode=?INVALID_STREAM},
socket_write(F, State),
State;
S = #stream{} -> %% TODO check stream is known to be active still?
espdy_stream:window_updated(S#stream.pid, DeltaSize),
State
end;

%% DATA FRAME:
handle_frame(#spdy_data{ streamid=StreamID,
flags=Flags,
Expand Down Expand Up @@ -395,6 +411,13 @@ apply_settings(Settings, State = #state{settings=OldSettings}) ->
end, OldSettings, Settings),
?LOG("SETTINGS FOR THIS SESSION: ~p",[NewSettings]),
State#state{settings=NewSettings}.

lookup_setting(Id, #state{settings=Settings}) ->
lookup_setting(Id, Settings);
lookup_setting(_Id, []) -> undefined;
lookup_setting(Id, [#spdy_setting_pair{id=Id, value=Value} | _]) -> Value;
lookup_setting(Id, [_ | Settings]) -> lookup_setting(Id, Settings).

%% STATUS CODES used by rst-stream, goaway, etc


152 changes: 93 additions & 59 deletions src/espdy_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
-compile(export_all).

%% API
-export([start_link/5, send_data_fin/1, send_data/2, closed/2, received_data/2,
send_response/3, received_fin/1, send_frame/2, headers_updated/3
-export([start_link/6, send_data_fin/1, send_data/2, closed/2, received_data/2,
send_response/3, received_fin/1, send_frame/2, headers_updated/3,
window_updated/2
]).

%% gen_server callbacks
Expand All @@ -27,12 +28,13 @@
mod,
mod_state,
headers,
window_size,
spdy_version,
spdy_opts}).

%% API
start_link(StreamID, Pid, Headers, Mod, Opts) ->
gen_server:start(?MODULE, [StreamID, Pid, Headers, Mod, Opts], []).
start_link(StreamID, Pid, Headers, Mod, WindowSize, Opts) ->
gen_server:start(?MODULE, [StreamID, Pid, Headers, Mod, WindowSize, Opts], []).

send_data(Pid, Data) when is_pid(Pid), is_binary(Data) ->
gen_server:cast(Pid, {data, Data}).
Expand All @@ -58,29 +60,92 @@ send_frame(Pid, F) ->
headers_updated(Pid, Delta, NewMergedHeaders) ->
gen_server:cast(Pid, {headers_updated, Delta, NewMergedHeaders}).

window_updated(Pid, Delta) ->
gen_server:cast(Pid, {window_updated, Delta}).

%% gen_server callbacks

init([StreamID, Pid, Headers, Mod, Opts]) ->
self() ! init_callback,
%% Z = zlib:open(),
%% ok = zlib:deflateInit(Z),
%%ok = zlib:deflateInit(Z, best_compression,deflated, 15, 9, default),
%% zlib:deflateSetDictionary(Z, ?HEADERS_ZLIB_DICT),
init([StreamID, Pid, Headers, Mod, WindowSize, Opts]) ->
gen_server:cast(self(), init_callback),
SpdyVersion = proplists:get_value(spdy_version, Opts),
{ok, #state{streamid=StreamID,
pid=Pid,
mod=Mod,
headers=Headers,
window_size=WindowSize,
spdy_version=SpdyVersion,
spdy_opts=Opts}}.

handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.

%% Called when we got a syn_stream for this stream.
%% cb module is supposed to make and send the reply.
handle_cast(init_callback, State) ->
case (State#state.mod):init(self(), State#state.headers, State#state.spdy_opts) of
%% In this case, the callback module provides the full response
%% with no need for this process to persist for streaming the body
%% so we can terminate this process after replying.
{ok, Headers, Body} when is_list(Headers), is_binary(Body) ->
%% se re-send this as a message to ourselves, because the callback
%% module may have dispatched other frames (eg, settings) before
%% returning us this response:
send_response(self(), Headers, Body),
{noreply, State};
%% The callback module will call msg us the send_http_response
%% (typically from within the guts of cowboy_http_req, so that
%% we can reuse the existing http API)
{ok, noreply} ->
%% TODO track state, set timeout on getting the response from CB
{noreply, State};
{ok, noreply, ModState} ->
{noreply, State#state{mod_state=ModState}};
%% CB module is going to stream us the body data, so we keep this process
%% alive until we get the fin packet as part of the stream.
%%%% {ok, Headers, stream, ModState} when is_list(Headers) ->
%%%% NVPairsData = encode_name_value_pairs(Headers, State#state.z_context),
%%%% StreamID = State#state.streamid,
%%%% F = #cframe{type=?SYN_REPLY,
%%%% flags=0,
%%%% length = 6 + byte_size(NVPairsData),
%%%% data= <<0:1,
%%%% StreamID:31/big-unsigned-integer,
%%%% 0:16/big-unsigned-integer, %% UNUSED
%%%% NVPairsData/binary
%%%% >>},
%%%% espdy_session:snd(State#state.pid, StreamID, F),
%%%% {noreply, State#state{mod_state=ModState}};
%% CB module can't respond, because request is invalid
{error, not_http} ->
StreamID = State#state.streamid,
F = #spdy_rst_stream{ streamid=StreamID, statuscode=?PROTOCOL_ERROR },
espdy_session:snd(State#state.pid, StreamID, F),
{stop, normal, State};
{error, ErrorStatusCode} when is_number(ErrorStatusCode) ->
StreamID = State#state.streamid,
F = #spdy_rst_stream{ streamid=StreamID, statuscode=ErrorStatusCode },
espdy_session:snd(State#state.pid, StreamID, F),
{stop, normal, State}
end;

handle_cast({send_frame, F}, State) ->
espdy_session:snd(State#state.pid, State#state.streamid, F),
{noreply, State};
StreamId = State#state.streamid,
FrameType = element(1, F),
F2 = case FrameType of
spdy_data ->
setelement(2, F, StreamId);
spdy_window_update ->
setelement(3, F, StreamId);
_ ->
setelement(4, F, StreamId)
end,
FullF = case FrameType of
spdy_data -> F;
_ -> setelement(2, F2, State#state.spdy_version)
end,
espdy_session:snd(State#state.pid, StreamId, FullF),
{noreply, decrement_window(F, State)};

handle_cast(received_fin, State = #state{clientclosed=true}) ->
?LOG("Got FIN but client has already closed?", []),
Expand All @@ -103,24 +168,27 @@ handle_cast({headers_updated, Delta, NewMergedHeaders}, State) ->
{ok, NewModState} = (State#state.mod):headers_updated(Delta, NewMergedHeaders, State#state.mod_state),
{noreply, State#state{mod_state=NewModState}};

handle_cast({window_updated, Delta}, State = #state{window_size=Size}) ->
{noreply, State#state{window_size=Size + Delta}};

handle_cast({closed, Reason}, State) ->
(State#state.mod):closed(Reason, State#state.mod_state),
{stop, normal, State};

%% part of streamed body
handle_cast({data, Bin, false}, State) when is_binary(Bin) ->
handle_cast({data, Bin}, State) when is_binary(Bin) ->
F = #spdy_data{ streamid = State#state.streamid,
data=Bin},
espdy_session:snd(State#state.pid, State#state.streamid, F),
{noreply, State};
{noreply, decrement_window(F, State)};

%% last of streamed body
handle_cast({data, Bin, true}, State) when is_binary(Bin) ->
handle_cast({data, fin}, State) ->
F = #spdy_data{ streamid = State#state.streamid,
flags=?DATA_FLAG_FIN,
data=Bin},
data= <<"">>},
espdy_session:snd(State#state.pid, State#state.streamid, F),
NewState = State#state{serverclosed=true},
NewState = decrement_window(F, State#state{serverclosed=true}),
case both_closed(NewState) of
true -> ?LOG("Both ends closed, stopping stream ~w",[State#state.streamid]),
{stop, normal, NewState};
Expand All @@ -138,48 +206,9 @@ handle_cast({send_response, Headers, Body}, State) ->
{noreply, NewState}
end.


%% Called when we got a syn_stream for this stream.
%% cb module is supposed to make and send the reply.
handle_info(init_callback, State) ->
case (State#state.mod):init(self(), State#state.headers, State#state.spdy_opts) of
%% In this case, the callback module provides the full response
%% with no need for this process to persist for streaming the body
%% so we can terminate this process after replying.
{ok, Headers, Body} when is_list(Headers), is_binary(Body) ->
%% se re-send this as a message to ourselves, because the callback
%% module may have dispatched other frames (eg, settings) before
%% returning us this response:
send_response(self(), Headers, Body),
{noreply, State};
%% The callback module will call msg us the send_http_response
%% (typically from within the guts of cowboy_http_req, so that
%% we can reuse the existing http API)
{ok, noreply} ->
%% TODO track state, set timeout on getting the response from CB
{noreply, State};
%% CB module is going to stream us the body data, so we keep this process
%% alive until we get the fin packet as part of the stream.
%%%% {ok, Headers, stream, ModState} when is_list(Headers) ->
%%%% NVPairsData = encode_name_value_pairs(Headers, State#state.z_context),
%%%% StreamID = State#state.streamid,
%%%% F = #cframe{type=?SYN_REPLY,
%%%% flags=0,
%%%% length = 6 + byte_size(NVPairsData),
%%%% data= <<0:1,
%%%% StreamID:31/big-unsigned-integer,
%%%% 0:16/big-unsigned-integer, %% UNUSED
%%%% NVPairsData/binary
%%%% >>},
%%%% espdy_session:snd(State#state.pid, StreamID, F),
%%%% {noreply, State#state{mod_state=ModState}};
%% CB module can't respond, because request is invalid
{error, not_http} ->
StreamID = State#state.streamid,
F = #spdy_rst_stream{ streamid=StreamID, statuscode=?PROTOCOL_ERROR },
espdy_session:snd(State#state.pid, StreamID, F),
{stop, normal, State}
end.
handle_info(M, State) ->
{noreply, NewModState} = (State#state.mod):handle_info(M, State#state.mod_state),
{noreply, State#state{mod_state=NewModState}}.

terminate(_Reason, _State) ->
ok.
Expand All @@ -189,6 +218,11 @@ code_change(_OldVsn, State, _Extra) ->

%%% Internal functions

decrement_window(#spdy_data{data=Data}, State = #state{window_size=Size}) ->
State#state{window_size=Size - size(Data)};
decrement_window(_, State) ->
State.

send_http_response(Headers, Body, State = #state{}) when is_list(Headers), is_binary(Body) ->
io:format("Respond with: ~p ~p\n",[Headers, Body]),
StreamID = State#state.streamid,
Expand Down