Skip to content

Commit

Permalink
Try fix iOS push message bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Dec 21, 2018
1 parent f349470 commit 229a445
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 70 deletions.
135 changes: 69 additions & 66 deletions access/session/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@
package server

import (
"github.com/nebula-chat/chatengine/mtproto/rpc"
"github.com/nebula-chat/chatengine/mtproto"
"container/list"
"time"
"github.com/golang/glog"
"reflect"
"github.com/nebula-chat/chatengine/pkg/logger"
"math/rand"
"fmt"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/nebula-chat/chatengine/mtproto"
"github.com/nebula-chat/chatengine/mtproto/rpc"
"github.com/nebula-chat/chatengine/pkg/grpc_util"
"github.com/nebula-chat/chatengine/pkg/logger"
"math/rand"
"reflect"
"time"
)

const (
kSessionUnknown = 0
kSessionGeneric = 1
kSessionDownload = 2
kSessionUpload = 3
kSessionPush = 4
kSessionTemp = 5
kSessionProxy = 6
kSessionUnknown = 0
kSessionGeneric = 1
kSessionDownload = 2
kSessionUpload = 3
kSessionPush = 4
kSessionTemp = 5
kSessionProxy = 6
kSessionGenericMedia = 7
// kSessionMaxSize = 8
)
Expand All @@ -51,9 +51,9 @@ const (
)

const (
kDefaultPingTimeout = 30
kPingAddTimeout = 15
kCacheSessionTimeout = 3 * 60
kDefaultPingTimeout = 30
kPingAddTimeout = 15
kCacheSessionTimeout = 3 * 60
)

const (
Expand All @@ -75,17 +75,17 @@ const (
)

const (
kMsgIdTooLow = int32(16)
kMsgIdTooHigh = int32(17)
kMsgIdMod4 = int32(18)
kMsgIdTooLow = int32(16)
kMsgIdTooHigh = int32(17)
kMsgIdMod4 = int32(18)
kMsgIdCollision = int32(19)

kMsgIdTooOld = int32(20)

kSeqNoTooLow = int32(3)
kSeqNoTooLow = int32(3)
kSeqNoTooHigh = int32(33)
kSeqNoNotEven = int32(34)
kSeqNoNotOdd = int32(35)
kSeqNoNotOdd = int32(35)

kInvalidContainer = int32(64)
)
Expand Down Expand Up @@ -130,37 +130,38 @@ type sessionBase interface {
func newSession(sessionId int64, sessType int, cb sessionCallback) sessionBase {
var sess sessionBase
sb := &session{
sessionId: sessionId,
sessionType: sessType,
connId: ClientConnID{},
apiMessages: list.New(),
pendingMessages: []*pendingMessage{},
msgIds: list.New(),
cb: cb,
sessionState: kSessionStateNew,
closeDate: time.Now().Unix() + kDefaultPingTimeout + kPingAddTimeout,
sessionId: sessionId,
sessionType: sessType,
connId: ClientConnID{},
apiMessages: list.New(),
pendingMessages: []*pendingMessage{},
msgIds: list.New(),
cb: cb,
sessionState: kSessionStateNew,
closeDate: time.Now().Unix() + kDefaultPingTimeout + kPingAddTimeout,
// closeSessionDate: 0,
// lastTime: time.Now().Unix(),
connState: kStateNew,
connState: kStateNew,
lastReceiveTime: time.Now().UnixNano(),
}

switch sessType {
case kSessionGeneric:
rpcClient, _ := getBizRPCClient()
sess = &genericSession{
session: sb,
session: sb,
RPCClient: rpcClient,
}
case kSessionDownload:
rpcClient, _ := getNbfsRPCClient()
sess = &downloadSession{
session: sb,
session: sb,
RPCClient: rpcClient,
}
case kSessionUpload:
rpcClient, _ := getNbfsRPCClient()
sess = &uploadSession{
session: sb,
session: sb,
RPCClient: rpcClient,
}
case kSessionPush:
Expand Down Expand Up @@ -199,22 +200,23 @@ type sessionCallback interface {
}

type session struct {
sessionType int
sessionId int64
sessionState int
connId ClientConnID
nextSeqNo uint32
apiMessages *list.List
firstMsgId int64
pendingMessages []*pendingMessage
rpcMessages []*networkApiMessage
msgIds *list.List
syncMessages []*pendingMessage
connState int
sessionType int
sessionId int64
sessionState int
connId ClientConnID
nextSeqNo uint32
apiMessages *list.List
firstMsgId int64
pendingMessages []*pendingMessage
rpcMessages []*networkApiMessage
msgIds *list.List
syncMessages []*pendingMessage
connState int
// lastTime int64
closeDate int64
closeDate int64
// closeSessionDate int64
cb sessionCallback
cb sessionCallback
lastReceiveTime int64
}

func (c *session) String() string {
Expand Down Expand Up @@ -295,6 +297,7 @@ func (c *session) changeConnState(state int) {

func (c *session) processMessageData(id ClientConnID, cntl *zrpc.ZRpcController, salt int64, msg *mtproto.TLMessage2, cb2 func(msg *mtproto.TLMessage2)) {
// c.lastTime = time.Now().Unix()
c.lastReceiveTime = time.Now().UnixNano()
sessionStateNew := c.connState != kStateOnline

c.changeConnState(kStateOnline)
Expand Down Expand Up @@ -340,7 +343,7 @@ func (c *session) processMessageData(id ClientConnID, cntl *zrpc.ZRpcController,
}

if sessionStateNew {
// if c.sessionState == kSessionStateNew {
// if c.sessionState == kSessionStateNew {
c.onNewSessionCreated(id, cntl, msgs[0].MsgId)
c.firstMsgId = msgs[0].MsgId
// c.sessionState = kSessionStateCreated
Expand Down Expand Up @@ -439,7 +442,7 @@ func (c *session) onTimer() bool {
c.changeConnState(kStateOffline)
}
} else if c.connState == kStateOffline || c.connState == kStateNew {
if date >= c.closeDate + kCacheSessionTimeout {
if date >= c.closeDate+kCacheSessionTimeout {
c.changeConnState(kStateClose)
}
}
Expand Down Expand Up @@ -533,9 +536,9 @@ func (c *session) sendPendingMessagesToClient(connID ClientConnID, cntl *zrpc.ZR
for _, m := range pendingMessages {
message2 := mtproto.TLMessage2{
//MsgId: msgId,
MsgId: mtproto.GenerateMessageId(),
Seqno: c.generateMessageSeqNo(m.confirm),
Bytes: int32(len(m.tl.EncodeToLayer(int(c.cb.getLayer())))),
MsgId: mtproto.GenerateMessageId(),
Seqno: c.generateMessageSeqNo(m.confirm),
Bytes: int32(len(m.tl.EncodeToLayer(int(c.cb.getLayer())))),
// Bytes: int32(len(m.tl.Encode())),
Object: m.tl,
}
Expand All @@ -549,7 +552,7 @@ func (c *session) sendPendingMessagesToClient(connID ClientConnID, cntl *zrpc.ZR
func (c *session) addMsgId(msgId int64) {
inserted := false
for e := c.msgIds.Front(); e != nil; e = e.Next() {
if e.Value.(int64) > msgId {
if e.Value.(int64) > msgId {
c.msgIds.InsertBefore(msgId, e)
inserted = true
break
Expand All @@ -570,7 +573,7 @@ func (c *session) getMinMessageId() int64 {

func (c *session) checkExistMessageId(msgId int64) bool {
for e := c.msgIds.Front(); e != nil; e = e.Next() {
if e.Value.(int64) == msgId {
if e.Value.(int64) == msgId {
return true
}
}
Expand All @@ -595,7 +598,7 @@ func (c *session) checkBadServerSalt(connID ClientConnID, cntl *zrpc.ZRpcControl
if c.cb.getCacheSalt() != nil {
if salt == c.cb.getCacheSalt().GetSalt() {
date := int32(time.Now().Unix())
if c.cb.getCacheSalt().GetValidUntil() + 300 >= date {
if c.cb.getCacheSalt().GetValidUntil()+300 >= date {
valid = true
}
}
Expand Down Expand Up @@ -734,11 +737,11 @@ func (c *session) checkBadMsgNotification(connID ClientConnID, cntl *zrpc.ZRpcCo
date := time.Now().Unix()
glog.Info("date: ", date, ", timeMessage: ", timeMessage)

if timeMessage < date - 300 {
if timeMessage < date-300 {
errorCode = kMsgIdTooLow
break
}
if timeMessage > date + 30 {
if timeMessage > date+30 {
errorCode = kMsgIdTooHigh
break
}
Expand All @@ -765,7 +768,7 @@ func (c *session) checkBadMsgNotification(connID ClientConnID, cntl *zrpc.ZRpcCo
// by the client must not be empty and must present a fractional
// part of the time point when the message was created.
//
if msg.MsgId % 4 != 0 {
if msg.MsgId%4 != 0 {
errorCode = kMsgIdMod4
break
}
Expand All @@ -780,7 +783,7 @@ func (c *session) checkBadMsgNotification(connID ClientConnID, cntl *zrpc.ZRpcCo
switch msg.Object.(type) {
case *mtproto.TLMsgContainer:
// odd
if msg.Seqno % 2 != 0 {
if msg.Seqno%2 != 0 {
errorCode = kSeqNoNotEven
break
}
Expand All @@ -791,7 +794,7 @@ func (c *session) checkBadMsgNotification(connID ClientConnID, cntl *zrpc.ZRpcCo
}
case *mtproto.TLMsgsAck, *mtproto.TLHttpWait, *mtproto.TLMsgsStateInfo, *mtproto.TLMsgsAllInfo:
// even
if msg.Seqno % 2 != 0 {
if msg.Seqno%2 != 0 {
errorCode = kSeqNoNotEven
break
}
Expand All @@ -802,7 +805,7 @@ func (c *session) checkBadMsgNotification(connID ClientConnID, cntl *zrpc.ZRpcCo
// ignore
default:
//
if msg.Seqno % 2 == 0 {
if msg.Seqno%2 == 0 {
errorCode = kSeqNoNotOdd
break
}
Expand Down Expand Up @@ -1230,7 +1233,7 @@ func (c *session) onContestSaveDeveloperInfo(connID ClientConnID, cntl *zrpc.ZRp
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func invokeRpcRequest(authUserId int32, authKeyId int64, layer int32, requests *rpcApiMessages, invoke func()(*grpc_util.RPCClient)) []*networkApiMessage {
func invokeRpcRequest(authUserId int32, authKeyId int64, layer int32, requests *rpcApiMessages, invoke func() *grpc_util.RPCClient) []*networkApiMessage {
glog.Infof("invokeRpcRequest - receive data: {session_id: %d, conn_id: %d, md: %s, data: {%v}}",
requests.sessionId, requests.connID, requests.cntl, requests.rpcMessages)

Expand All @@ -1243,8 +1246,8 @@ func invokeRpcRequest(authUserId int32, authKeyId int64, layer int32, requests *

for i := 0; i < len(requests.rpcMessages); i++ {
var (
err error
rpcResult mtproto.TLObject
err error
rpcResult mtproto.TLObject
)

// 初始化metadata
Expand Down
12 changes: 8 additions & 4 deletions access/session/server/updates_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ type updatesManager struct {
}

func (m *updatesManager) getOnlineGenericSession() *genericSession {
for e := m.genericSessions.Front(); e != nil; e = e.Next() {
if e.Value.(*genericSession).sessionOnline() {
return e.Value.(*genericSession)
var lastReceiveTime int64 = 0
var lastRecentSess *genericSession
for e := m.genericSessions.Back(); e != nil; e = e.Next() {
sess := e.Value.(*genericSession)
if sess.sessionOnline() && sess.lastReceiveTime > lastReceiveTime {
lastRecentSess = sess
lastReceiveTime = sess.lastReceiveTime
}
}
return nil
return lastRecentSess
}

func (m *updatesManager) onGenericSessionNew(s sessionBase) {
Expand Down

0 comments on commit 229a445

Please sign in to comment.