Skip to content

Commit

Permalink
重构会话共享 (jumpserver#473)
Browse files Browse the repository at this point in the history
* 重构会话共享
1. 每新建的会话,创建一个Room
2. 当加入非本地koko创建的会话时,redis发送共享请求, 成功返回结果,创建redis连接,交换数据。(Room维持的用户连接为0, 或接受到会话退出事件时,则关闭连接)
3. Koko收到共享请求,并检测是本地创建的会话,则创建redis连接,接受远端数据(Join则增加1,Leave则减1,当加入会话的计数为0时,关闭redis连接)
4. 当共享会话结束时,Room接受stop信号,关闭所有加入的用户连接,并等待连接退出,最终关闭Room


Co-authored-by: Eric <[email protected]>
  • Loading branch information
fit2bot and LeeEirc authored Nov 9, 2020
1 parent d63454b commit 7c54918
Show file tree
Hide file tree
Showing 21 changed files with 998 additions and 604 deletions.
19 changes: 0 additions & 19 deletions pkg/exchange/exchange.go

This file was deleted.

22 changes: 14 additions & 8 deletions pkg/exchange/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,40 @@ import (
"github.com/jumpserver/koko/pkg/logger"
)

var ex Exchanger
var manager roomManager

func Initial(ctx context.Context) {
conf := config.GetConf()
var err error
var (
err error
)

switch strings.ToLower(conf.ShareRoomType) {
case "redis":
ex, err = NewRedisExchange(Config{
manager, err = newRedisManager(Config{
Addr: net.JoinHostPort(conf.RedisHost, conf.RedisPort),
Password: conf.RedisPassword,
Clusters: conf.RedisClusters,
DBIndex: conf.RedisDBIndex,
})

default:
ex, err = NewLocalExchange()
manager = newLocalManager()
}
logger.Infof("Exchange share room type: %s", conf.ShareRoomType)
if err != nil {
logger.Fatal(err)
}
}

func GetExchange() Exchanger {
return ex
func Register(r *Room) {
manager.Add(r)
}

func StopExchange() {
ex.Close()
func UnRegister(r *Room) {
manager.Delete(r)
}

func GetRoom(roomId string) *Room {
return manager.Get(roomId)
}
162 changes: 40 additions & 122 deletions pkg/exchange/local.go
Original file line number Diff line number Diff line change
@@ -1,145 +1,63 @@
package exchange

import (
"context"
"fmt"
"sync"

"github.com/jumpserver/koko/pkg/model"
)

func NewLocalExchange() (*LocalExchange, error) {
return &LocalExchange{
createdRooms: make(map[string]*localRoom),
joinedRooms: make(map[string]map[*localRoom]context.CancelFunc),
}, nil

func newLocalManager() *localRoomManager {
m := localRoomManager{newLocalCache()}
return &m
}

type LocalExchange struct {
createdRooms map[string]*localRoom
joinedRooms map[string]map[*localRoom]context.CancelFunc
mu sync.Mutex
type localRoomManager struct {
*localCache
}

func (exc *LocalExchange) JoinRoom(receiveChan chan<- model.RoomMessage, roomId string) (Room, error) {
exc.mu.Lock()
defer exc.mu.Unlock()
if createdRoom, ok := exc.createdRooms[roomId]; ok {
ctx, cancelFunc := context.WithCancel(context.Background())
r := &localRoom{
roomID: roomId,
writeChan: createdRoom.readChan,
readChan: receiveChan,
ctx: ctx,
cancelFunc: cancelFunc,
}
if joinRoomsMap, ok := exc.joinedRooms[roomId]; ok {
joinRoomsMap[r] = cancelFunc
} else {
exc.joinedRooms[roomId] = map[*localRoom]context.CancelFunc{
r: cancelFunc,
}
}
return r, nil
func newLocalCache() *localCache {
l := &localCache{
caches: make(map[string]*Room),
storeChan: make(chan *Room),
leaveChan: make(chan *Room),
checkChan: make(chan string),
resultChan: make(chan *Room),
}
go l.run()
return l

return nil, fmt.Errorf("room %s not found", roomId)
}

func (exc *LocalExchange) LeaveRoom(exRoom Room, roomId string) {
sub, ok := exRoom.(*localRoom)
if !ok {
return
}
exc.mu.Lock()
defer exc.mu.Unlock()
if joinRoomsMap, ok := exc.joinedRooms[roomId]; ok {
if contextFunc, ok2 := joinRoomsMap[sub]; ok2 {
delete(joinRoomsMap, sub)
contextFunc()
}
}
close(sub.readChan)
}
type localCache struct {
caches map[string]*Room

func (exc *LocalExchange) CreateRoom(receiveChan chan<- model.RoomMessage, roomId string) Room {
exc.mu.Lock()
defer exc.mu.Unlock()
ctx, contextFunc := context.WithCancel(context.Background())
readChan := make(chan model.RoomMessage)
r := &localRoom{
roomID: roomId,
writeChan: readChan,
readChan: receiveChan,
ctx: ctx,
cancelFunc: contextFunc,
}
exc.createdRooms[roomId] = r
go func() {
for {
roomMgs, ok := <-readChan
if !ok {
return
}
exc.mu.Lock()
joinedRooms := make([]*localRoom, 0, len(exc.joinedRooms[roomId]))
for joinRoom := range exc.joinedRooms[roomId] {
joinedRooms = append(joinedRooms, joinRoom)
}
exc.mu.Unlock()
for i := range joinedRooms {
select {
case <-joinedRooms[i].ctx.Done():
continue
case joinedRooms[i].readChan <- roomMgs:
default:
storeChan chan *Room

}
}
}
}()
return r
}
leaveChan chan *Room

func (exc *LocalExchange) DestroyRoom(exRoom Room) {
sub, ok := exRoom.(*localRoom)
if !ok {
return
}
exc.mu.Lock()
defer exc.mu.Unlock()
delete(exc.createdRooms, sub.roomID)
sub.cancelFunc()
close(sub.readChan)
checkChan chan string

resultChan chan *Room
}

func (exc *LocalExchange) Close() {
exc.mu.Lock()
defer exc.mu.Unlock()
for roomID, createdRoom := range exc.createdRooms {
if joinRoomMap, ok := exc.joinedRooms[roomID]; ok {
delete(exc.joinedRooms, roomID)
for joinedRoom := range joinRoomMap {
close(joinedRoom.readChan)
}
func (l *localCache) run() {
for {
select {
case s := <-l.storeChan:
l.caches[s.Id] = s
go s.run()
case s := <-l.leaveChan:
delete(l.caches, s.Id)
s.stop()
case sid := <-l.checkChan:
l.resultChan <- l.caches[sid]
}
close(createdRoom.readChan)
}
}

func (l localCache) Add(s *Room) {
l.storeChan <- s
}

type localRoom struct {
roomID string
writeChan chan<- model.RoomMessage
readChan chan<- model.RoomMessage
ctx context.Context
cancelFunc context.CancelFunc
func (l localCache) Delete(s *Room) {
l.leaveChan <- s
}

func (r *localRoom) Publish(msg model.RoomMessage) {
select {
case <-r.ctx.Done():
case r.writeChan <- msg:
}
func (l localCache) Get(sid string) *Room {
l.checkChan <- sid
return <-l.resultChan
}
Loading

0 comments on commit 7c54918

Please sign in to comment.