Skip to content

Commit

Permalink
x/ref/runtime/internal/rpc: ensure that a proxied server expands to u…
Browse files Browse the repository at this point in the history
…se all available proxies (#186)

Prior to this PR, a server using the 'all' proxy policy would expand the set of proxy instances that it used when it encountered an error with any of existing proxy instances. This PR changes the behavior so that servers will monitor the set of available proxy instances and expand to use all available ones ahead of any errors being encountered.

Also, include a fix for big sur whereby the error messages for connection refused appears to have changed.
  • Loading branch information
cosnicolaou authored Feb 11, 2021
1 parent 399567d commit ecb939b
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 23 deletions.
42 changes: 38 additions & 4 deletions x/ref/runtime/internal/rpc/proxymgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func newProxyManager(s serverProxyAPI, proxyName string, policy rpc.ProxyPolicy,
}

func (pm *proxyManager) selectRandomSubsetLocked(needed, available int) map[int]bool {
if needed == 0 {
return nil
}
selected := map[int]bool{}
for {
candidate := pm.rand.Intn(available)
Expand Down Expand Up @@ -160,6 +163,7 @@ func (pm *proxyManager) updateAvailableProxies(ctx *context.T) {
pm.Lock()
defer pm.Unlock()
pm.proxies = updated

}

func (pm *proxyManager) markActive(ep naming.Endpoint) {
Expand All @@ -175,8 +179,6 @@ func (pm *proxyManager) markInActive(ep naming.Endpoint) {
}

func (pm *proxyManager) connectToSingleProxy(ctx *context.T, name string, ep naming.Endpoint) {
pm.markActive(ep)
defer pm.markInActive(ep)
for delay := pm.reconnectDelay; ; delay = nextDelay(delay) {
if !pm.isAvailable(ep) {
ctx.Infof("connectToSingleProxy(%q): proxy is no longer available\n", ep)
Expand Down Expand Up @@ -212,14 +214,26 @@ func (pm *proxyManager) tryConnections(ctx *context.T, notifyCh chan struct{}) b
return false
}
for _, ep := range idle {
if !pm.canGrow() {
continue
}
pm.markActive(ep)
go func(ep naming.Endpoint) {
pm.connectToSingleProxy(ctx, pm.proxyName, ep)
notifyCh <- struct{}{}
pm.markInActive(ep)
sendNotify(notifyCh)
}(ep)
}
return true
}

func sendNotify(ch chan struct{}) {
select {
case ch <- struct{}{}:
default:
}
}

func drainNotifyChan(ch chan struct{}) {
for {
select {
Expand All @@ -230,9 +244,29 @@ func drainNotifyChan(ch chan struct{}) {
}
}

func (pm *proxyManager) watchForChanges(ctx *context.T, ch chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-time.After(pm.resolveDelay):
pm.updateAvailableProxies(ctx)
if pm.shouldGrow() && pm.canGrow() {
sendNotify(ch)
}
}
}
}

func (pm *proxyManager) manageProxyConnections(ctx *context.T) {
notifyCh := make(chan struct{}, 10)
pm.updateAvailableProxies(ctx)
// Watch for changes in the set of available proxies so that for the
// 'all' policy, the server will connect to new proxies as they appear.
// For other policies reconnection may be little faster since the
// new set of proxies is already available.
go pm.watchForChanges(ctx, notifyCh)

for {
select {
case <-ctx.Done():
Expand All @@ -253,7 +287,7 @@ func (pm *proxyManager) manageProxyConnections(ctx *context.T) {
drainNotifyChan(notifyCh)
}
// Wait for a change in the set of available proxies.
if pm.shouldGrow() {
if pm.shouldGrow() && !pm.canGrow() {
for {
select {
case <-ctx.Done():
Expand Down
84 changes: 71 additions & 13 deletions x/ref/runtime/internal/rpc/proxymgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,19 @@ func TestSingleProxyConnections(t *testing.T) {
}
}

func waitForExpected(sm *mockServerAPI, proxyName string, expected int) {
// Wait for the expected number of connections.
for {
time.Sleep(100 * time.Millisecond)
sm.Lock()
if len(sm.listening[proxyName]) == expected {
sm.Unlock()
break
}
sm.Unlock()
}
}

func TestMultipleProxyConnections(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
Expand Down Expand Up @@ -363,13 +376,7 @@ func TestMultipleProxyConnections(t *testing.T) {
time.Sleep(time.Millisecond * 100)
sm.setEndpoints(proxyName, eps...)

// Wait for the expected number of connections.
for {
time.Sleep(100 * time.Millisecond)
if len(sm.listening[proxyName]) == tc.expected {
break
}
}
waitForExpected(sm, proxyName, tc.expected)

// Remove the endpoints and finish the current listeners.
sm.setEndpoints(proxyName)
Expand All @@ -394,14 +401,65 @@ func TestMultipleProxyConnections(t *testing.T) {
pm.updateAvailableProxies(ctx)

// Wait for the expected number of connections.
for {
time.Sleep(100 * time.Millisecond)
if len(sm.listening[proxyName]) == tc.expected {
break
}
}
waitForExpected(sm, proxyName, tc.expected)

cancel()
// Should immediately return if the context is already canceled.
pm.manageProxyConnections(cctx)

wg.Wait()
}

}

func TestMultipleProxyConnectionExpansion(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
sm := newMockServer()
ep1 := newEndpoint("5000")
ep2 := newEndpoint("5001")
ep3 := newEndpoint("5002")

fpm := newProxyManager(sm, "proxy0", rpc.UseFirstProxy, 1)
rpm := newProxyManager(sm, "proxy1", rpc.UseRandomProxy, 1)
apm := newProxyManager(sm, "proxy2", rpc.UseAllProxies, 3)

for i, tc := range []struct {
pm *proxyManager
initial int
final int
}{
{fpm, 1, 1},
{rpm, 1, 1},
{apm, 1, 3},
} {
cctx, cancel := context.WithCancel(ctx)
pm := tc.pm
// tune down the delays
pm.resolveDelay = time.Millisecond
pm.reconnectDelay = time.Millisecond
proxyName := fmt.Sprintf("proxy%v", i)
ch := make(chan struct{})
sm.setChan(proxyName, ch)

var wg sync.WaitGroup
wg.Add(1)
go func(pm *proxyManager) {
pm.manageProxyConnections(cctx)
wg.Done()
}(pm)

sm.setEndpoints(proxyName, ep1)

waitForExpected(sm, proxyName, tc.initial)
sm.setEndpoints(proxyName, ep1, ep2, ep3)
time.Sleep(100 * time.Millisecond)

// Wait for the expected number of connections.
waitForExpected(sm, proxyName, tc.final)

cancel()

// Should immediately return if the context is already canceled.
pm.manageProxyConnections(cctx)

Expand Down
11 changes: 9 additions & 2 deletions x/ref/runtime/internal/rpc/test/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,15 @@ func TestStartCallErrors(t *testing.T) { //nolint:gocyclo
if !errors.Is(err, verror.ErrNoServers) {
t.Errorf("wrong error: %s", err)
}
if want := "connection refused"; !strings.Contains(verror.DebugString(err), want) {
t.Errorf("wrong error: %s - doesn't contain %q", err, want)
found := false
allowed := []string{"connection reset by peer", "connection refused"}
for _, want := range allowed {
if strings.Contains(verror.DebugString(err), want) {
found = true
}
}
if !found {
t.Errorf("wrong error: %s - doesn't contain one of %q", err, allowed)
}

// This will fail with NoServers, but because there really is no
Expand Down
83 changes: 79 additions & 4 deletions x/ref/services/xproxy/xproxyd/proxyd_v23_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
responseVar = "RESPONSE" // Name of the variable used by client program to output the first response
responseVar1 = "RESPONSE1" // Name of the variable used by client program to output the second response
downloadSize = 64 * 1024 * 1024
using1OfMProxies = 1 // Use 1 out of the total available set of proxies when using all proxies.
using2OfMProxies = 2 // Use 2 out of the total available set of proxies when using all proxies.
)

Expand Down Expand Up @@ -106,7 +107,7 @@ func TestV23MultipleProxyd(t *testing.T) {
firstProxyAddress, firstProxyLog, _, err := startServer(t, sh, serverName, 1, runServer, serverCreds)
assert("first proxy policy server", firstProxyLog)

allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds)
allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds)
assert("all proxies policy server", allProxiesLog)

// Run all of the clients.
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestV23MultiProxyResilience(t *testing.T) {
assert("first two proxies", logsForProxies(first2)...)

// Start the server.
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds)
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds)
assert("server", serverLog)

// Run the client.
Expand Down Expand Up @@ -343,6 +344,74 @@ func TestV23MultiProxyResilience(t *testing.T) {

}

func TestV23MultiProxyExpansion(t *testing.T) {

v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()

var (
serverCreds = sh.ForkCredentials("server")
clientCreds = sh.ForkCredentials("client")
err error
)

assert := func(msg string, logs ...*bytes.Buffer) {
assertWithLog(t, err, msg, logs)
}

ns := v23.GetNamespace(sh.Ctx)
ns.CacheCtl(naming.DisableCache(true))

// Start a single proxy.
first, _, firstStatsAddrs := startInitialSetOfProxies(t, sh, 1)
assert("first proxy", logsForProxies(first)...)

// Start the server.
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using1OfMProxies, runServerAllProxiesNoLimit, serverCreds)
assert("server", serverLog)

// Run the client.
err = runSingleClient(sh, runClientAllProxiesServer, clientCreds)
assert("client")

// Gather stats and make sure the the server is using the first proxy.
ctx, err := v23.WithPrincipal(sh.Ctx, serverCreds.Principal)
assert("withPrincipal")

requests, _, _, err := gatherStats(ctx, firstStatsAddrs, serverAddress)
assert("gatherStats")

used := proxiesUsedForServer(requests, serverAddress)
if got, want := used, []int{0}; !reflect.DeepEqual(got, want) {
t.Fatalf("got %v, want %v", got, want)
}

// Start two more proxies and wait for the server to notice them.
second := startProxy(t, sh)
third := startProxy(t, sh)

_, err = waitForNMountedServers(t, sh.Ctx, ns, serverNameAll, 3)
assert("second and third proxies and server log", second.log, third.log, serverLog)

_, proxyStatsAddresses, err := waitForNProxies(t, sh.Ctx, ns, 3)
assert("wait for all three proxies to be in the mounttable")

// Run the client.
err = runSingleClient(sh, runClientAllProxiesServer, clientCreds)
assert("client with two proxies again")

requests, _, _, err = gatherStats(ctx, proxyStatsAddresses, serverAddress)
assert("gatherStats")

used = proxiesUsedForServer(requests, serverAddress)
if got, want := len(used), 3; !reflect.DeepEqual(got, want) {
t.Fatalf("got %v, want %v", got, want)
}

}

func TestV23SingleProxyResilience(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
Expand Down Expand Up @@ -701,11 +770,17 @@ var runServerRandomProxy = gosh.RegisterFunc(
"runServerRandomProxy",
createProxiedServer(serverNameRandom, proxyName, rpc.UseRandomProxy, 0),
)
var runServerAllProxies = gosh.RegisterFunc(
"runServerAllProxies",

var runServerAllProxiesLimit2 = gosh.RegisterFunc(
"runServerAllProxiesLimit2",
createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, using2OfMProxies),
)

var runServerAllProxiesNoLimit = gosh.RegisterFunc(
"runServerAllProxiesNoLimit",
createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, 0),
)

func createClient(serverName string, iterations int) func() error {
return func() error {
ctx, shutdown := test.V23Init()
Expand Down

0 comments on commit ecb939b

Please sign in to comment.