Skip to content

Commit

Permalink
dashboard 备注 客户端管理优化 多客户端支持 流量显示支持 热更新支持 404错误页支持
Browse files Browse the repository at this point in the history
  • Loading branch information
刘河 committed Jan 25, 2019
1 parent c533436 commit c34e5e1
Show file tree
Hide file tree
Showing 37 changed files with 5,410 additions and 727 deletions.
87 changes: 43 additions & 44 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/cnlh/easyProxy/utils"
"log"
"net"
"strconv"
"sync"
"time"
)
Expand All @@ -31,20 +32,20 @@ func newList() *list {
}

type Tunnel struct {
TunnelPort int //通信隧道端口
listener *net.TCPListener //server端监听
SignalList map[string]*list //通信
TunnelList map[string]*list //隧道
RunList map[string]interface{} //运行中的任务
TunnelPort int //通信隧道端口
listener *net.TCPListener //server端监听
SignalList map[int]*list //通信
TunnelList map[int]*list //隧道
RunList map[int]interface{} //运行中的任务
lock sync.Mutex
tunnelLock sync.Mutex
}

func NewTunnel(tunnelPort int, runList map[string]interface{}) *Tunnel {
func NewTunnel(tunnelPort int, runList map[int]interface{}) *Tunnel {
t := new(Tunnel)
t.TunnelPort = tunnelPort
t.SignalList = make(map[string]*list)
t.TunnelList = make(map[string]*list)
t.SignalList = make(map[int]*list)
t.TunnelList = make(map[int]*list)
t.RunList = runList
return t
}
Expand Down Expand Up @@ -87,7 +88,8 @@ func (s *Tunnel) cliProcess(c *utils.Conn) error {
c.Conn.Close()
return err
}
if !s.verify(string(vval)) {
id, err := utils.GetCsvDb().GetIdByVerifyKey(string(vval),c.Conn.RemoteAddr().String())
if err != nil {
log.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
s.verifyError(c)
return errors.New("验证错误")
Expand All @@ -97,18 +99,18 @@ func (s *Tunnel) cliProcess(c *utils.Conn) error {
if flag, err := c.ReadFlag(); err != nil {
return err
} else {
return s.typeDeal(flag, c, string(vval))
return s.typeDeal(flag, c, id)
}
}

//tcp连接类型区分
func (s *Tunnel) typeDeal(typeVal string, c *utils.Conn, cFlag string) error {
func (s *Tunnel) typeDeal(typeVal string, c *utils.Conn, id int) error {
switch typeVal {
case utils.WORK_MAIN:
log.Println("客户端连接成功", c.Conn.RemoteAddr())
s.addList(s.SignalList, c, cFlag)
s.addList(s.SignalList, c, id)
case utils.WORK_CHAN:
s.addList(s.TunnelList, c, cFlag)
s.addList(s.TunnelList, c, id)
default:
return errors.New("无法识别")
}
Expand All @@ -117,41 +119,38 @@ func (s *Tunnel) typeDeal(typeVal string, c *utils.Conn, cFlag string) error {
}

//加到对应的list中
func (s *Tunnel) addList(m map[string]*list, c *utils.Conn, cFlag string) {
func (s *Tunnel) addList(m map[int]*list, c *utils.Conn, id int) {
s.lock.Lock()
if v, ok := m[cFlag]; ok {
if v, ok := m[id]; ok {
v.Add(c)
} else {
l := newList()
l.Add(c)
m[cFlag] = l
m[id] = l
}
s.lock.Unlock()
}

//新建隧道
func (s *Tunnel) newChan(cFlag string) error {
func (s *Tunnel) newChan(id int) error {
var connPass *utils.Conn
var err error
retry:
if connPass, err = s.waitAndPop(s.SignalList, cFlag); err != nil {
if connPass, err = s.waitAndPop(s.SignalList, id); err != nil {
return err
}
if _, err = connPass.Conn.Write([]byte("chan")); err != nil {
goto retry
}
s.SignalList[cFlag].Add(connPass)
s.SignalList[id].Add(connPass)
return nil
}

//得到一个tcp隧道
//TODO 超时问题 锁机制问题 对单个客户端加锁
func (s *Tunnel) GetTunnel(cFlag string, en, de int, crypt, mux bool) (c *utils.Conn, err error) {
if v, ok := s.TunnelList[cFlag]; !ok || v.Len() < 3 { //新建通道
go s.newChan(cFlag)
}
func (s *Tunnel) GetTunnel(id int, en, de int, crypt, mux bool) (c *utils.Conn, err error) {
retry:
if c, err = s.waitAndPop(s.TunnelList, cFlag); err != nil {
if c, err = s.waitAndPop(s.TunnelList, id); err != nil {
return
}
if _, err = c.WriteTest(); err != nil {
Expand All @@ -163,78 +162,78 @@ retry:
}

//得到一个通信通道
func (s *Tunnel) GetSignal(cFlag string) (err error, conn *utils.Conn) {
if v, ok := s.SignalList[cFlag]; !ok || v.Len() == 0 {
func (s *Tunnel) GetSignal(id int) (err error, conn *utils.Conn) {
if v, ok := s.SignalList[id]; !ok || v.Len() == 0 {
err = errors.New("客户端未连接")
return
}
conn = s.SignalList[cFlag].Pop()
conn = s.SignalList[id].Pop()
return
}

//重回slice 复用
func (s *Tunnel) ReturnSignal(conn *utils.Conn, cFlag string) {
if v, ok := s.SignalList[cFlag]; ok {
func (s *Tunnel) ReturnSignal(conn *utils.Conn, id int) {
if v, ok := s.SignalList[id]; ok {
v.Add(conn)
}
}

//重回slice 复用
func (s *Tunnel) ReturnTunnel(conn *utils.Conn, cFlag string) {
if v, ok := s.TunnelList[cFlag]; ok {
func (s *Tunnel) ReturnTunnel(conn *utils.Conn, id int) {
if v, ok := s.TunnelList[id]; ok {
utils.FlushConn(conn.Conn)
v.Add(conn)
}
}

//删除通信通道
func (s *Tunnel) DelClientSignal(cFlag string) {
s.delClient(cFlag, s.SignalList)
func (s *Tunnel) DelClientSignal(id int) {
s.delClient(id, s.SignalList)
}

//删除隧道
func (s *Tunnel) DelClientTunnel(cFlag string) {
s.delClient(cFlag, s.TunnelList)
func (s *Tunnel) DelClientTunnel(id int) {
s.delClient(id, s.TunnelList)
}

func (s *Tunnel) delClient(cFlag string, l map[string]*list) {
if t := l[utils.Getverifyval(cFlag)]; t != nil {
func (s *Tunnel) delClient(id int, l map[int]*list) {
if t := l[id]; t != nil {
for {
if t.Len() <= 0 {
break
}
t.Pop().Close()
}
delete(l, utils.Getverifyval(cFlag))
delete(l, id)
}
}

//等待
func (s *Tunnel) waitAndPop(m map[string]*list, cFlag string) (c *utils.Conn, err error) {
func (s *Tunnel) waitAndPop(m map[int]*list, id int) (c *utils.Conn, err error) {
ticker := time.NewTicker(time.Millisecond * 100)
stop := time.After(time.Second * 10)
stop := time.After(time.Second * 3)
for {
select {
case <-ticker.C:
s.lock.Lock()
if v, ok := m[cFlag]; ok && v.Len() > 0 {
if v, ok := m[id]; ok && v.Len() > 0 {
c = v.Pop()
ticker.Stop()
s.lock.Unlock()
return
}
s.lock.Unlock()
case <-stop:
err = errors.New("client key: " + cFlag + ",err: get client conn timeout")
err = errors.New("client id: " + strconv.Itoa(id) + ",err: get client conn timeout")
return
}
}
return
}

func (s *Tunnel) verify(vKeyMd5 string) bool {
func (s *Tunnel) verify(id int) bool {
for k := range s.RunList {
if utils.Getverifyval(k) == vKeyMd5 {
if k == id {
return true
}
}
Expand Down
43 changes: 33 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import (
"log"
"net"
"sync"
"sync/atomic"
"time"
)

type TRPClient struct {
svrAddr string
tcpNum int
svrAddr string
tcpNum int
tunnelNum int64
tunnel chan bool
serverStatus bool
sync.Mutex
vKey string
}
Expand All @@ -21,6 +25,7 @@ func NewRPClient(svraddr string, tcpNum int, vKey string) *TRPClient {
c.svrAddr = svraddr
c.tcpNum = tcpNum
c.vKey = vKey
c.tunnel = make(chan bool)
return c
}

Expand All @@ -29,12 +34,17 @@ func (s *TRPClient) Start() error {
for i := 0; i < s.tcpNum; i++ {
go s.NewConn()
}
for i := 0; i < 5; i++ {
go s.dealChan()
}
go s.session()
return nil
}

//新建
func (s *TRPClient) NewConn() error {
s.Lock()
s.serverStatus = false
conn, err := net.Dial("tcp", s.svrAddr)
if err != nil {
log.Println("连接服务端失败,五秒后将重连")
Expand All @@ -44,11 +54,12 @@ func (s *TRPClient) NewConn() error {
return err
}
s.Unlock()
return s.process(utils.NewConn(conn))
return s.processor(utils.NewConn(conn))
}

//处理
func (s *TRPClient) process(c *utils.Conn) error {
func (s *TRPClient) processor(c *utils.Conn) error {
s.serverStatus = true
c.SetAlive()
if _, err := c.Write([]byte(utils.Getverifyval(s.vKey))); err != nil {
return err
Expand All @@ -58,17 +69,13 @@ func (s *TRPClient) process(c *utils.Conn) error {
flags, err := c.ReadFlag()
if err != nil {
log.Println("服务端断开,五秒后将重连", err)
time.Sleep(5 * time.Second)
go s.NewConn()
break
}
switch flags {
case utils.VERIFY_EER:
log.Fatalln("vkey:", s.vKey, "不正确,服务端拒绝连接,请检查")
case utils.WORK_CHAN: //隧道模式,每次开启10个,加快连接速度
for i := 0; i < 5; i++ {
go s.dealChan()
}
case utils.RES_MSG:
log.Println("服务端返回错误。")
default:
Expand Down Expand Up @@ -98,13 +105,16 @@ func (s *TRPClient) dealChan() {
//写标志
c.WriteChan()
re:
atomic.AddInt64(&s.tunnelNum, 1)
//获取连接的host type(tcp or udp)
typeStr, host, en, de, crypt, mux, err := c.GetHostFromConn()
s.tunnel <- true
atomic.AddInt64(&s.tunnelNum, -1)
if err != nil {
c.Close()
return
}
Process(c, typeStr, host, en, de, crypt, mux)
s.ConnectAndCopy(c, typeStr, host, en, de, crypt, mux)
if mux {
utils.FlushConn(conn)
goto re
Expand All @@ -113,7 +123,20 @@ re:
}
}

func Process(c *utils.Conn, typeStr, host string, en, de int, crypt, mux bool) {
func (s *TRPClient) session() {
t := time.NewTicker(time.Millisecond * 1000)
for {
select {
case <-s.tunnel:
case <-t.C:
}
if s.serverStatus && s.tunnelNum < 5 {
go s.dealChan()
}
}
}

func (s *TRPClient) ConnectAndCopy(c *utils.Conn, typeStr, host string, en, de int, crypt, mux bool) {
//与目标建立连接,超时时间为3
server, err := net.DialTimeout(typeStr, host, time.Second*3)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/proxy_server/proxy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
func main() {
flag.Parse()
server.VerifyKey = *VerifyKey
cnf := server.ServerConfig{
cnf := &utils.ServerConfig{
TcpPort: *httpPort,
Mode: *rpMode,
Target: *tunnelTarget,
Expand All @@ -51,5 +51,5 @@ func main() {
}
log.Println("服务端启动,监听tcp服务端端口:", *TcpPort)
cnf.CompressDecode, cnf.CompressEncode = utils.GetCompressType(cnf.Compress)
server.StartNewServer(*TcpPort, &cnf)
server.StartNewServer(*TcpPort, cnf)
}
2 changes: 2 additions & 0 deletions conf/clients.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2,zl4p3da659qa9rh3,127.0.0.1:58000,测试2,true,,,0,0,
1,rfd0tl1anega0d0g,127.0.0.1:53603,测试,true,1,1,1,1,snappy
4 changes: 2 additions & 2 deletions conf/hosts.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
a.o.com,127.0.0.1:8082,7hiixust68kbz33a,,www.baidu.com
b.o.com,,7hiixust68kbz33a,,ab
a.o.com,127.0.0.1:8080,1,,,测试2
b.o.com,127.0.0.1:8082,2,,,测试
9 changes: 4 additions & 5 deletions conf/tasks.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
8001,tunnelServer,127.0.0.1:88,jq5i7n0sjs1h0jje,aaa,bbb,,1,0,0,0,1
0,hostServer,,7n7bxc2bm1fyjfab,ab,b,,1,1,1,0,1
0,hostServer,,ts08z6vk5nc72fs8,aaa,bbb,snappy,1,0,1,2,3
8002,tunnelServer,127.0.0.1:88,2nxo93wvotb9g75s,,,,1,0,0,0,1
8025,socks5Server,,2p3qs71oym3zx52w,,,,1,0,0,0,1
9001,tunnelServer,127.0.0.1:8080,,,,1,0,0,0,0,1,1,true,test
53,udpServer,114.114.114.114:53,,,,1,0,0,0,0,2,2,true,udp测试
8024,socks5Server,,,,,1,0,0,0,0,3,2,true,socks5测试
8025,httpProxyServer,,,,,1,0,0,0,0,4,2,true,http测试
Loading

0 comments on commit c34e5e1

Please sign in to comment.