Skip to content

Commit

Permalink
feat: dojima chain side handler changes
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Mar 22, 2024
1 parent fe4894f commit 28e2b75
Show file tree
Hide file tree
Showing 63 changed files with 3,976 additions and 590 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ specify exactly the dependency you want to update.
## Protobuf

We use [Protocol Buffers](https://developers.google.com/protocol-buffers) along
with [`gogoproto`](https://github.com/cosmos/gogoproto) to generate code for use
with [`gogoproto`](https://google.golang.org/protobuf) to generate code for use
across CometBFT.

To generate proto stubs, lint, and check protos for breaking changes, you will
Expand Down
5 changes: 5 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Client interface {
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)

BeginSideBlockAsync(types.RequestBeginSideBlock) *ReqRes
BeginSideBlockSync(types.RequestBeginSideBlock) (*types.ResponseBeginSideBlock, error)
DeliverSideTxAsync(types.RequestDeliverSideTx) *ReqRes
DeliverSideTxSync(types.RequestDeliverSideTx) (*types.ResponseDeliverSideTx, error)
}

//----------------------------------------
Expand Down
32 changes: 32 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,35 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
reqres := cli.ApplySnapshotChunkAsync(params)
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
}

//
// Side channel
//

func (cli *grpcClient) DeliverSideTxAsync(params types.RequestDeliverSideTx) *ReqRes {
req := types.ToRequestDeliverSideTx(params)
res, err := cli.client.DeliverSideTx(context.Background(), req.GetDeliverSideTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_DeliverSideTx{DeliverSideTx: res}})
}

func (cli *grpcClient) BeginSideBlockAsync(params types.RequestBeginSideBlock) *ReqRes {
req := types.ToRequestBeginSideBlock(params)
res, err := cli.client.BeginSideBlock(context.Background(), req.GetBeginSideBlock(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginSideBlock{BeginSideBlock: res}})
}

func (cli *grpcClient) BeginSideBlockSync(params types.RequestBeginSideBlock) (*types.ResponseBeginSideBlock, error) {
reqres := cli.BeginSideBlockAsync(params)
return reqres.Response.GetBeginSideBlock(), cli.Error()
}

func (cli *grpcClient) DeliverSideTxSync(params types.RequestDeliverSideTx) (*types.ResponseDeliverSideTx, error) {
reqres := cli.DeliverSideTxAsync(params)
return reqres.Response.GetDeliverSideTx(), cli.Error()
}
42 changes: 42 additions & 0 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,48 @@ func (app *localClient) ApplySnapshotChunkSync(
return &res, nil
}

//
// Side channel
//

func (app *localClient) DeliverSideTxAsync(params types.RequestDeliverSideTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.DeliverSideTx(params)
return app.callback(
types.ToRequestDeliverSideTx(params),
types.ToResponseDeliverSideTx(res),
)
}

func (app *localClient) BeginSideBlockAsync(req types.RequestBeginSideBlock) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.BeginSideBlock(req)
return app.callback(
types.ToRequestBeginSideBlock(req),
types.ToResponseBeginSideBlock(res),
)
}

func (app *localClient) DeliverSideTxSync(req types.RequestDeliverSideTx) (*types.ResponseDeliverSideTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.DeliverSideTx(req)
return &res, nil
}

func (app *localClient) BeginSideBlockSync(req types.RequestBeginSideBlock) (*types.ResponseBeginSideBlock, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.BeginSideBlock(req)
return &res, nil
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
Expand Down
24 changes: 24 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,30 @@ func (cli *socketClient) ApplySnapshotChunkSync(
return reqres.Response.GetApplySnapshotChunk(), cli.Error()
}

//
// Side channel
//

func (cli *socketClient) BeginSideBlockAsync(req types.RequestBeginSideBlock) *ReqRes {
return cli.queueRequest(types.ToRequestBeginSideBlock(req))
}

func (cli *socketClient) BeginSideBlockSync(req types.RequestBeginSideBlock) (*types.ResponseBeginSideBlock, error) {
reqres := cli.queueRequest(types.ToRequestBeginSideBlock(req))
cli.FlushSync()
return reqres.Response.GetBeginSideBlock(), cli.Error()
}

func (cli *socketClient) DeliverSideTxAsync(req types.RequestDeliverSideTx) *ReqRes {
return cli.queueRequest(types.ToRequestDeliverSideTx(req))
}

func (cli *socketClient) DeliverSideTxSync(req types.RequestDeliverSideTx) (*types.ResponseDeliverSideTx, error) {
reqres := cli.queueRequest(types.ToRequestDeliverSideTx(req))
cli.FlushSync()
return reqres.Response.GetDeliverSideTx(), cli.Error()
}

//----------------------------------------

func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
Expand Down
2 changes: 1 addition & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func prefixKey(key []byte) []byte {
return append(kvPairPrefixKey, key...)
}

//---------------------------------------------------
// ---------------------------------------------------

var _ types.Application = (*Application)(nil)

Expand Down
16 changes: 14 additions & 2 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
ValidatorSetChangePrefix string = "val:"
)

//-----------------------------------------
// -----------------------------------------

var _ types.Application = (*PersistentKVStoreApplication)(nil)

Expand Down Expand Up @@ -173,7 +173,19 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

//---------------------------------------------
//
// Side channel functions
//

func (app *PersistentKVStoreApplication) BeginSideBlock(req types.RequestBeginSideBlock) types.ResponseBeginSideBlock {
return app.app.BeginSideBlock(req)
}

func (app *PersistentKVStoreApplication) DeliverSideTx(req types.RequestDeliverSideTx) types.ResponseDeliverSideTx {
return app.app.DeliverSideTx(req)
}

// ---------------------------------------------
// update validators

func (app *PersistentKVStoreApplication) Validators() (validators []types.ValidatorUpdate) {
Expand Down
22 changes: 22 additions & 0 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Application interface {
OfferSnapshot(RequestOfferSnapshot) ResponseOfferSnapshot // Offer a snapshot to the application
LoadSnapshotChunk(RequestLoadSnapshotChunk) ResponseLoadSnapshotChunk // Load a snapshot chunk
ApplySnapshotChunk(RequestApplySnapshotChunk) ResponseApplySnapshotChunk // Apply a shapshot chunk

// Consensus side connection
BeginSideBlock(RequestBeginSideBlock) ResponseBeginSideBlock // Signals the beginning of a block with side txs
DeliverSideTx(RequestDeliverSideTx) ResponseDeliverSideTx // Deliver a tx for full state-less processing
}

//-------------------------------------------------------
Expand Down Expand Up @@ -95,6 +99,14 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons
return ResponseApplySnapshotChunk{}
}

func (BaseApplication) BeginSideBlock(req RequestBeginSideBlock) ResponseBeginSideBlock {
return ResponseBeginSideBlock{}
}

func (BaseApplication) DeliverSideTx(req RequestDeliverSideTx) ResponseDeliverSideTx {
return ResponseDeliverSideTx{}
}

//-------------------------------------------------------

// GRPCApplication is a GRPC wrapper for Application
Expand Down Expand Up @@ -182,3 +194,13 @@ func (app *GRPCApplication) ApplySnapshotChunk(
res := app.app.ApplySnapshotChunk(*req)
return &res, nil
}

func (app *GRPCApplication) BeginSideBlock(ctx context.Context, req *RequestBeginSideBlock) (*ResponseBeginSideBlock, error) {
res := app.app.BeginSideBlock(*req)
return &res, nil
}

func (app *GRPCApplication) DeliverSideTx(ctx context.Context, req *RequestDeliverSideTx) (*ResponseDeliverSideTx, error) {
res := app.app.DeliverSideTx(*req)
return &res, nil
}
32 changes: 32 additions & 0 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request {
}
}

//
// side channel
//

func ToRequestBeginSideBlock(req RequestBeginSideBlock) *Request {
return &Request{
Value: &Request_BeginSideBlock{&req},
}
}

func ToRequestDeliverSideTx(req RequestDeliverSideTx) *Request {
return &Request{
Value: &Request_DeliverSideTx{&req},
}
}

//----------------------------------------

func ToResponseException(errStr string) *Response {
Expand Down Expand Up @@ -256,3 +272,19 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response {
Value: &Response_ApplySnapshotChunk{&res},
}
}

//
// side channel
//

func ToResponseBeginSideBlock(req ResponseBeginSideBlock) *Response {
return &Response{
Value: &Response_BeginSideBlock{&req},
}
}

func ToResponseDeliverSideTx(req ResponseDeliverSideTx) *Response {
return &Response{
Value: &Response_DeliverSideTx{&req},
}
}
Loading

0 comments on commit 28e2b75

Please sign in to comment.