Merge pull request #11023 from yousong/bugfix/yousong-cloudproxy

Bugfix/yousong cloudproxy
This commit is contained in:
Yousong Zhou
2021-05-11 09:52:58 +08:00
committed by GitHub
3 changed files with 46 additions and 11 deletions

View File

@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync"
"time"
@@ -150,9 +151,31 @@ func (c *Client) Start(ctx context.Context) {
sshClientC := make(chan *ssh.Client)
var sshClient *ssh.Client
go c.runClientState(ctx, sshClientC)
for {
select {
case sshClient = <-sshClientC:
case sshc := <-sshClientC:
conn := sshc.Conn
localAddr := conn.LocalAddr()
localAddrStr := localAddr.String()
addr, portStr, err := net.SplitHostPort(localAddrStr)
if err != nil {
log.Errorf("split host port of ssh client local addr: %v", err)
sshc.Close()
break
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
log.Errorf("parse ssh client local port: %v", err)
sshc.Close()
break
}
if v := c.localForwards.get(int(port), addr); v != nil {
log.Errorf("ssh client local port %d collides with local forward: %#v", port, v)
sshc.Close()
break
}
sshClient = sshc
case req := <-c.lfc:
if sshClient != nil {
c.localForward(ctx, sshClient, req)
@@ -178,7 +201,7 @@ func (c *Client) Start(ctx context.Context) {
}
}
func (c *Client) runClientState(ctx context.Context, sshClientC chan *ssh.Client) {
func (c *Client) runClientState(ctx context.Context, sshClientC chan<- *ssh.Client) {
for {
select {
case <-ctx.Done():
@@ -203,12 +226,6 @@ func (c *Client) runClientState(ctx context.Context, sshClientC chan *ssh.Client
func() {
defer sshc.Conn.Close()
select {
case sshClientC <- sshc:
case <-ctx.Done():
return
}
closeC := make(chan struct{})
go func() {
defer close(closeC)
@@ -219,6 +236,13 @@ func (c *Client) runClientState(ctx context.Context, sshClientC chan *ssh.Client
}
}()
select {
case sshClientC <- sshc:
case <-closeC:
case <-ctx.Done():
return
}
select {
case <-closeC:
case <-ctx.Done():

View File

@@ -180,14 +180,24 @@ func (w *Worker) Start(ctx context.Context) {
}
go w.apih.Start(ctx)
var mss *agentmodels.ModelSets
const tickDur = 11 * time.Second
var (
mss *agentmodels.ModelSets
tick = time.NewTicker(tickDur)
)
for {
select {
case imss := <-w.apih.ModelSets():
log.Infof("agent: got new data from api helper")
mss = imss.(*agentmodels.ModelSets)
if err := w.run(ctx, mss); err != nil {
log.Errorf("agent: %v", err)
log.Errorf("agent run: %v", err)
}
case <-tick.C:
if mss != nil {
if err := w.run(ctx, mss); err != nil {
log.Errorf("agent refresh run: %v", err)
}
}
case <-ctx.Done():
return

View File

@@ -213,10 +213,11 @@ func (guest *SGuest) sshableTryEach(
tmo := time.NewTimer(13 * time.Second)
tick := time.NewTicker(3 * time.Second)
out:
for {
select {
case <-tmo.C:
break
break out
case <-tick.C:
if ok := guest.sshableTryForward(ctx, tryData, &fwd); ok {
return nil