ngrok是一个内网穿透工具,主要用途是让用户能够通过一台ngrok的中转服务器访问在内网中的一台机器。

用途有点类似与端口映射,要把一台内网中的机器的端口映射到具有公网ip的另一台机器的端口。

假如被映射的机器不在内网,那么可以直接通过映射的机器向被映射的机器创建链接转发来达到目的,但是如果被映射的机器在一个内网的话,就要复杂很多了。

首先需要内网的机器和外网的机器维护一条链接(因为外网的机器不能够主动连接内网的机器),为了避免链接的中断,需要通过心跳等途径维持链接。这条链接保证了外网机器能够随时和内网通信。

当然,单单一条链接是不够的,一般我们访问一个网页都会打开5-10个tcp链接,如果这些链接都阻塞在同一条链接上的话,性能会受很大的影响。
为了保证性能,外网机器和内网的机器可以制定一种协议,外网的机器可以通过协议请求内网的机器创建多条联通外网机器的链接以供传输。这样就可以保证请求可以被并行地响应,以保证性能。ngrok就是这么做的。

下面来看一下ngrok的执行流程(主要讲一下tcp的映射,省略了http,https和认证过程)。

ngrok主要分为ngrok和ngrokd,ngrok是客户端,ngrokd是服务端。

服务端

ngrokd启动的时候会执行一个tunnelListener函数,用来监听客户端的链接。

//server main.go
tunnelListener(opts.tunnelAddr, tlsConfig)



func tunnelListener(addr string, tlsConfig *tls.Config) {
    // listen for incoming connections
    listener, err := conn.Listen(addr, "tun", tlsConfig)
    //...
    log.Info("Listening for control and proxy connections on %s", listener.Addr.String())
    
    for c := range listener.Conns {
        go func(tunnelConn conn.Conn) {
            
            var rawMsg msg.Message
            if rawMsg, err = msg.ReadMsg(tunnelConn); err != nil {
                tunnelConn.Warn("Failed to read message: %v", err)
                tunnelConn.Close()
                return
            }

            switch m := rawMsg.(type) {
            case *msg.Auth:
                NewControl(tunnelConn, m)

            case *msg.RegProxy:
                NewProxy(tunnelConn, m)

            default:
                tunnelConn.Close()
            }
        }(c)
    }
}

中间有部分不需要的东西被删了。tunnelListener用来监听客户端。当有客户端连接的时候,它会先接收一个rawMsg(既定的协议),如果rawMsg是Auth类型的,那么就代表有新的客户端要连接了,那么就通过NewControl创建了一Control。

这个Control是一条进行控制的链接,是需要一直维护的。所有的控制信息都是通过这条链接来传递的。Control结构如下:

type Control struct {
    // auth message
    auth *msg.Auth

    // actual connection
    conn conn.Conn

    // put a message in this channel to send it over
    // conn to the client
    out chan (msg.Message)

    // read from this channel to get the next message sent
    // to us over conn by the client
    in chan (msg.Message)

    // the last time we received a ping from the client - for heartbeats
    lastPing time.Time

    // all of the tunnels this control connection handles
    tunnels []*Tunnel

    // proxy connections
    proxies chan conn.Conn

    // identifier
    id string

    // synchronizer for controlled shutdown of writer()
    writerShutdown *util.Shutdown

    // synchronizer for controlled shutdown of reader()
    readerShutdown *util.Shutdown

    // synchronizer for controlled shutdown of manager()
    managerShutdown *util.Shutdown

    // synchronizer for controller shutdown of entire Control
    shutdown *util.Shutdown
}

其中,auth是认证信息,ctlConn指的就是控制链接本身。out和in控制着数据的读入和读出,所有加入out的msg都会被发送到对应的客户端,所有ctlConn接收到的msg都会被放入in。分别由reader和writer这两个goroutine实现。

func (c *Control) writer() {
    // write messages to the control channel
    for m := range c.out {
        c.conn.SetWriteDeadline(time.Now().Add(controlWriteTimeout))
        if err := msg.WriteMsg(c.conn, m); err != nil {
            panic(err)
        }
    }
}

func (c *Control) reader() {
    // read messages from the control channel
    for {
        if msg, err := msg.ReadMsg(c.conn); err != nil {
            if err == io.EOF {
                c.conn.Info("EOF")
                return
            } else {
                panic(err)
            }
        } else {
            // this can also panic during shutdown
            c.in <- msg
        }
    }
}

还有两个重要的元素是proxies和tunnel。tunnel存放的是外网机器监听外部连接的链接,proxies存放的是外网机器访问内网的链接。

Control建立之后会随机生成一个id,这个id代表着对应的client的id。生成id之后Control会把这个id相关的信息发送给客户端,之后这个id就代表着客户端了。

c.out <- &msg.AuthResp{
        Version:   version.Proto,
        MmVersion: version.MajorMinor(),
        ClientId:  c.id,
    }

客户端接受到这个id信息之后,会在以后的消息中带上这个id,方便服务端确认是哪个客户端。
为了方便服务端通过id找到对应的Control,服务端会把id和对应Control放在一个map里面,这个map就是controlRegistry。

controlRegistry.Add(c.id, c);

之后,Control会监听从客户端发来的请求(这个时候服务端还没建立端口映射,需要客户端发相应的请求:我要吧自己的什么端口映射到服务端的什么端口上)

func (c *Control) manager() {
    for {
        select {
        case mRaw, ok := <-c.in:
            // c.in closes to indicate shutdown
            if !ok {
                return
            }

            switch m := mRaw.(type) {
            case *msg.ReqTunnel:
                c.registerTunnel(m)

            case *msg.Ping:
                c.lastPing = time.Now()
                c.out <- &msg.Pong{}
            }
        }
    }
}

其中,msg.Ping是心跳信息,msg.ReqTunnel是客户端请求映射的信息。服务端接收到客户端请求,会创建一个新的tunnel

// Register a new tunnel on this control connection
func (c *Control) registerTunnel(rawTunnelReq *msg.ReqTunnel) {
    for _, proto := range strings.Split(rawTunnelReq.Protocol, "+") {
        tunnelReq := *rawTunnelReq
        tunnelReq.Protocol = proto

        c.conn.Debug("Registering new tunnel")
        t, err := NewTunnel(&tunnelReq, c)

        // add it to the list of tunnels
        c.tunnels = append(c.tunnels, t)

        // acknowledge success
        c.out <- &msg.NewTunnel{
            Url:      t.url,
            Protocol: proto,
            ReqId:    rawTunnelReq.ReqId,
        }

        rawTunnelReq.Hostname = strings.Replace(t.url, proto+"://", "", 1)
    }
}


//创建tunnel
func NewTunnel(m *msg.ReqTunnel, ctl *Control) (t *Tunnel, err error) {
    t = &Tunnel{
        req:    m,
        start:  time.Now(),
        ctl:    ctl,
        Logger: log.NewPrefixLogger(),
    }

    proto := t.req.Protocol
    switch proto {
    case "tcp":
        bindTcp := func(port int) error {
            if t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: port}); err != nil {
                err = t.ctl.conn.Error("Error binding TCP listener: %v", err)
                return err
            }

            // create the url
            addr := t.listener.Addr().(*net.TCPAddr)
            t.url = fmt.Sprintf("tcp://%s:%d", opts.domain, addr.Port)

            // register it
            if err = tunnelRegistry.RegisterAndCache(t.url, t); err != nil {
                // This should never be possible because the OS will
                // only assign available ports to us.
                t.listener.Close()
                err = fmt.Errorf("TCP listener bound, but failed to register %s", t.url)
                return err
            }

            go t.listenTcp(t.listener)
            return nil
        }
        
        
        
// Listens for new public tcp connections from the internet.
func (t *Tunnel) listenTcp(listener *net.TCPListener) {
        // accept public connections
        tcpConn, err := listener.AcceptTCP()

        conn := conn.Wrap(tcpConn, "pub")
        conn.AddLogPrefix(t.Id())
        conn.Info("New connection from %v", conn.RemoteAddr())

        go t.HandlePublicConnection(conn)
    }
}

Control建立一个tunnel之后会发送一个msg.NewTunnel信息给客户端,代表tunnel已经建立。这个tunnel会被放在Control的tunnels结构中去。新建Tunnel的过程中,会把url和tunnel的信息注册到一个map中,方便通过url获取到对应的tunnle,这个map是tunnelRegistry。

tunnel建立之后,基本的工作都已经做完了,只需要在tunnel里面监听外网的链接就行了。下面来看一下对外网链接的处理。

func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {

    var proxyConn conn.Conn
    var err error
    for i := 0; i < (2 * proxyMaxPoolSize); i++ {
        // get a proxy connection
        if proxyConn, err = t.ctl.GetProxy(); err != nil {
            t.Warn("Failed to get proxy connection: %v", err)
            return
        }
        // tell the client we're going to start using this proxy connection
        startPxyMsg := &msg.StartProxy{
            Url:        t.url,
            ClientAddr: publicConn.RemoteAddr().String(),
        }

        if err = msg.WriteMsg(proxyConn, startPxyMsg); err != nil {
            proxyConn.Warn("Failed to write StartProxyMessage: %v, attempt %d", err, i)
            proxyConn.Close()
        } else {
            // success
            break
        }
    }

    // join the public and proxy connections
    bytesIn, bytesOut := conn.Join(publicConn, proxyConn)

}

因为现在服务端和客户端只有一条链接(Control),这条链接主要是传送相关的控制消息,为了传输数据,请求客户端创建proxy链接。这个操作在 t.ctl.GetProxy()中完成。

func (c *Control) GetProxy() (proxyConn conn.Conn, err error) {
    var ok bool

    // get a proxy connection from the pool
    select {
    case proxyConn, ok = <-c.proxies:
        if !ok {
            err = fmt.Errorf("No proxy connections available, control is closing")
            return
        }
    default:
        // no proxy available in the pool, ask for one over the control channel
        c.conn.Debug("No proxy in pool, requesting proxy from control . . .")
        if err = util.PanicToError(func() { c.out <- &msg.ReqProxy{} }); err != nil {
            return
        }

        select {
        case proxyConn, ok = <-c.proxies:
            if !ok {
                err = fmt.Errorf("No proxy connections available, control is closing")
                return
            }

        case <-time.After(pingTimeoutInterval):
            err = fmt.Errorf("Timeout trying to get proxy connection")
            return
        }
    }
    return
}

getproxy函数会首先在Control的proxies中找有没有已存在的链接,如果有的话直接拿出来用,如果没有的话请求客户端创建proxy链接,并阻塞在select。客户端会连接服务端,并发送注册proxy的消息msg.RegProxy。这个消息中会带有客户端的id。
服务端通过这个id把这条链接存放在c.proxies中,这样的话GetProxy继续运行,并返回可用的proxy链接。

//接收新的proxy链接消息
    switch m := rawMsg.(type) {
            case *msg.Auth:
                NewControl(tunnelConn, m)

            case *msg.RegProxy:
                NewProxy(tunnelConn, m)

            default:
                tunnelConn.Close()
            }
            
//根据id找到control,把链接放入proxies
func NewProxy(pxyConn conn.Conn, regPxy *msg.RegProxy) {
    ctl := controlRegistry.Get(regPxy.ClientId)

    if ctl == nil {
        panic("No client found for identifier: " + regPxy.ClientId)
    }

    ctl.RegisterProxy(pxyConn)
}

获取proxy链接之后只要把tunnel和proxy链接相连就可以实现数据传输了

// join the public and proxy connections
bytesIn, bytesOut := conn.Join(publicConn, proxyConn)

func Join(c Conn, c2 Conn) (int64, int64) {
    var wait sync.WaitGroup

    pipe := func(to Conn, from Conn, bytesCopied *int64) {
        defer to.Close()
        defer from.Close()
        defer wait.Done()

        var err error
        *bytesCopied, err = io.Copy(to, from)
        if err != nil {
            from.Warn("Copied %d bytes to %s before failing with error %v", *bytesCopied, to.Id(), err)
        } else {
            from.Debug("Copied %d bytes to %s", *bytesCopied, to.Id())
        }
    }

    wait.Add(2)
    var fromBytes, toBytes int64
    go pipe(c, c2, &fromBytes)
    go pipe(c2, c, &toBytes)
    c.Info("Joined with connection %s", c2.Id())
    wait.Wait()
    return fromBytes, toBytes
}

客户端

客户端的部分行为在上文中已经提及了,这里简单讲一下。

读取配置文件之后,客户端会创建一个Controller运行。

NewController().Run(config)

Controller有一个web端,主要显示一些连接的信息(只有http和https会显示,tcp协议不会),这个部分不说了,主要说一下和服务端通信的部分。
Controller中会执行model.run运行clientmodel。

ctl.Go(ctl.model.Run)

clientmodel会执行control函数

func (c *ClientModel) Run() {
    // how long we should wait before we reconnect
    maxWait := 30 * time.Second
    wait := 1 * time.Second

    for {
        // run the control channel
        c.control()
    }
}

在control函数中,client会根据配置文件连接到相应的服务端进行认证,成功之后发送要进行映射的tunnel信息。发送成功之后就等着听取服务端的命令就好了。(同时要自己维护心跳)

// Establishes and manages a tunnel control connection with the server
func (c *ClientModel) control() {

    // establish control channel
    var (
        ctlConn conn.Conn
        err     error
    )
    if c.proxyUrl == "" {
        // simple non-proxied case, just connect to the server
        ctlConn, err = conn.Dial(c.serverAddr, "ctl", c.tlsConfig)
    } else {
        ctlConn, err = conn.DialHttpProxy(c.proxyUrl, c.serverAddr, "ctl", c.tlsConfig)
    }

    // authenticate with the server
    auth := &msg.Auth{
        ClientId:  c.id,
        OS:        runtime.GOOS,
        Arch:      runtime.GOARCH,
        Version:   version.Proto,
        MmVersion: version.MajorMinor(),
        User:      c.authToken,
    }

    if err = msg.WriteMsg(ctlConn, auth); err != nil {
        panic(err)
    }

    // wait for the server to authenticate us
    var authResp msg.AuthResp
    if err = msg.ReadMsgInto(ctlConn, &authResp); err != nil {
        panic(err)
    }


    c.id = authResp.ClientId
    c.serverVersion = authResp.MmVersion
    c.Info("Authenticated with server, client id: %v", c.id)
    c.update()
    if err = SaveAuthToken(c.configPath, c.authToken); err != nil {
        c.Error("Failed to save auth token: %v", err)
    }

    // request tunnels
    reqIdToTunnelConfig := make(map[string]*TunnelConfiguration)
    for _, config := range c.tunnelConfig {
        // create the protocol list to ask for
        var protocols []string
        for proto, _ := range config.Protocols {
            protocols = append(protocols, proto)
        }

        reqTunnel := &msg.ReqTunnel{
            ReqId:      util.RandId(8),
            Protocol:   strings.Join(protocols, "+"),
            Hostname:   config.Hostname,
            Subdomain:  config.Subdomain,
            HttpAuth:   config.HttpAuth,
            RemotePort: config.RemotePort,
        }

        // send the tunnel request
        if err = msg.WriteMsg(ctlConn, reqTunnel); err != nil {
            panic(err)
        }

        // save request id association so we know which local address
        // to proxy to later
        reqIdToTunnelConfig[reqTunnel.ReqId] = config
    }

    // start the heartbeat
    lastPong := time.Now().UnixNano()
    c.ctl.Go(func() { c.heartbeat(&lastPong, ctlConn) })

    // main control loop
    for {
        var rawMsg msg.Message
        if rawMsg, err = msg.ReadMsg(ctlConn); err != nil {
            panic(err)
        }

        switch m := rawMsg.(type) {
        case *msg.ReqProxy:
            c.ctl.Go(c.proxy)

        case *msg.Pong:
            atomic.StoreInt64(&lastPong, time.Now().UnixNano())

        case *msg.NewTunnel:
            if m.Error != "" {
                emsg := fmt.Sprintf("Server failed to allocate tunnel: %s", m.Error)
                c.Error(emsg)
                c.ctl.Shutdown(emsg)
                continue
            }

            tunnel := mvc.Tunnel{
                PublicUrl: m.Url,
                LocalAddr: reqIdToTunnelConfig[m.ReqId].Protocols[m.Protocol],
                Protocol:  c.protoMap[m.Protocol],
            }

            c.tunnels[tunnel.PublicUrl] = tunnel
            c.connStatus = mvc.ConnOnline
            c.Info("Tunnel established at %v", tunnel.PublicUrl)
            c.update()

        default:
            ctlConn.Warn("Ignoring unknown control message %v ", m)
        }
    }
}

这里最重要的信息是msg.ReqProxy,接受到这个信息之后服务端会主动创建客户端到服务端的proxy链接

// Establishes and manages a tunnel proxy connection with the server
func (c *ClientModel) proxy() {
    var (
        remoteConn conn.Conn
        err        error
    )

    if c.proxyUrl == "" {
        remoteConn, err = conn.Dial(c.serverAddr, "pxy", c.tlsConfig)
    } else {
        remoteConn, err = conn.DialHttpProxy(c.proxyUrl, c.serverAddr, "pxy", c.tlsConfig)
    }

    err = msg.WriteMsg(remoteConn, &msg.RegProxy{ClientId: c.id})

    // wait for the server to ack our register
    var startPxy msg.StartProxy
    if err = msg.ReadMsgInto(remoteConn, &startPxy); err != nil {
        remoteConn.Error("Server failed to write StartProxy: %v", err)
        return
    }

    tunnel, ok := c.tunnels[startPxy.Url]
    if !ok {
        remoteConn.Error("Couldn't find tunnel for proxy: %s", startPxy.Url)
        return
    }

    // start up the private connection
    start := time.Now()
    localConn, err := conn.Dial(tunnel.LocalAddr, "prv", nil)

    m := c.metrics
    m.proxySetupTimer.Update(time.Since(start))
    m.connMeter.Mark(1)
    c.update()
    m.connTimer.Time(func() {
        localConn := tunnel.Protocol.WrapConn(localConn, mvc.ConnectionContext{Tunnel: tunnel, ClientAddr: startPxy.ClientAddr})
        bytesIn, bytesOut := conn.Join(localConn, remoteConn)
        m.bytesIn.Update(bytesIn)
        m.bytesOut.Update(bytesOut)
        m.bytesInCount.Inc(bytesIn)
        m.bytesOutCount.Inc(bytesOut)
    })
    c.update()
}

以上就是ngrok服务端和客户端的主要逻辑了。

写的比较匆忙,如果看不明白可以找相关的文章对比着看看。比如这篇:ngrok原理浅析

如果想要搭建ngrok服务端,可以看这篇:搭建 ngrok 服务实现内网穿透