diff --git a/server/http.go b/server/http.go index 3075fca1..6b1a2394 100644 --- a/server/http.go +++ b/server/http.go @@ -402,13 +402,21 @@ type debugTopic struct { Sessions []string `json:"sessions,omitempty"` } +// debugCachedUser is a user cache entry debug info. +type debugCachedUser struct { + Uid string `json:"uid,omitempty"` + Unread int `json:"unread,omitempty"` + Topics int `json:"topics,omitempty"` +} + // debugDump is server internal state dump for debugging. type debugDump struct { - Version string `json:"server_version,omitempty"` - Build string `json:"build_id,omitempty"` - Timestamp time.Time `json:"ts,omitempty"` - Sessions []debugSession `json:"sessions,omitempty"` - Topics []debugTopic `json:"topics,omitempty"` + Version string `json:"server_version,omitempty"` + Build string `json:"build_id,omitempty"` + Timestamp time.Time `json:"ts,omitempty"` + Sessions []debugSession `json:"sessions,omitempty"` + Topics []debugTopic `json:"topics,omitempty"` + UserCache []debugCachedUser `json:"user_cache,omitempty"` } func serveStatus(wrt http.ResponseWriter, req *http.Request) { @@ -420,6 +428,7 @@ func serveStatus(wrt http.ResponseWriter, req *http.Request) { Timestamp: types.TimeNow(), Sessions: make([]debugSession, 0, len(globals.sessionStore.sessCache)), Topics: make([]debugTopic, 0, 10), + UserCache: make([]debugCachedUser, 0, 10), } // Sessions. globals.sessionStore.Range(func(sid string, s *Session) bool { @@ -467,6 +476,13 @@ func serveStatus(wrt http.ResponseWriter, req *http.Request) { }) return true }) + for k, v := range usersCache { + result.UserCache = append(result.UserCache, debugCachedUser{ + Uid: k.UserId(), + Unread: v.unread, + Topics: v.topics, + }) + } json.NewEncoder(wrt).Encode(result) } diff --git a/server/push.go b/server/push.go index 89c3703c..c465eb6a 100644 --- a/server/push.go +++ b/server/push.go @@ -24,7 +24,7 @@ func (t *Topic) channelSubUnsub(uid types.Uid, sub bool) { } // Prepares a payload to be delivered to a mobile device as a push notification in response to a {data} message. -func (t *Topic) pushForData(fromUid types.Uid, data *MsgServerData) *push.Receipt { +func (t *Topic) pushForData(fromUid types.Uid, data *MsgServerData, msgMarkedAsReadBySender bool) *push.Receipt { // Passing `Topic` as `t.name` for group topics and P2P topics. The p2p topic name is later rewritten for // each recipient then the payload is created: p2p recipient sees the topic as the ID of the other user. @@ -71,6 +71,9 @@ func (t *Topic) pushForData(fromUid types.Uid, data *MsgServerData) *push.Receip // Number of attached sessions the data message will be delivered to. // Push notifications sent to users with non-zero online sessions will be marked silent. Delivered: online, + // Unread counts are incremented for all recipients, + // and for sender only if the message wasnt't marked 'read' by the sender + ShouldIncrementUnreadCountInCache: uid != fromUid || !msgMarkedAsReadBySender, } } } diff --git a/server/push/push.go b/server/push/push.go index 5324aa89..27b1a577 100644 --- a/server/push/push.go +++ b/server/push/push.go @@ -30,6 +30,8 @@ type Recipient struct { Devices []string `json:"devices,omitempty"` // Unread count to include in the push Unread int `json:"unread"` + // Indicates whether unread counter in the cache should be incremented before sending the push. + ShouldIncrementUnreadCountInCache bool `json:"-"` } // Receipt is the push payload with a list of recipients. diff --git a/server/store/store.go b/server/store/store.go index 3d2e6e7d..e7fb9e7c 100644 --- a/server/store/store.go +++ b/server/store/store.go @@ -650,7 +650,7 @@ func (subsMapper) Delete(topic string, user types.Uid) error { // MessagesPersistenceInterface is an interface which defines methods for persistent storage of messages. type MessagesPersistenceInterface interface { - Save(msg *types.Message, attachmentURLs []string, readBySender bool) error + Save(msg *types.Message, attachmentURLs []string, readBySender bool) (error, bool) DeleteList(topic string, delID int, forUser types.Uid, ranges []types.Range) error GetAll(topic string, forUser types.Uid, opt *types.QueryOpt) ([]types.Message, error) GetDeleted(topic string, forUser types.Uid, opt *types.QueryOpt) ([]types.Range, int, error) @@ -663,30 +663,35 @@ type messagesMapper struct{} var Messages MessagesPersistenceInterface // Save message -func (messagesMapper) Save(msg *types.Message, attachmentURLs []string, readBySender bool) error { +func (messagesMapper) Save(msg *types.Message, attachmentURLs []string, readBySender bool) (error, bool) { msg.InitTimes() msg.SetUid(Store.GetUid()) // Increment topic's or user's SeqId err := adp.TopicUpdateOnMessage(msg.Topic, msg) if err != nil { - return err + return err, false } err = adp.MessageSave(msg) if err != nil { - return err + return err, false } + markedReadBySender := false // Mark message as read by the sender. if readBySender { // Make sure From is valid, otherwise we will reset values for all subscribers. fromUid := types.ParseUid(msg.From) if !fromUid.IsZero() { // Ignore the error here. It's not a big deal if it fails. - adp.SubsUpdate(msg.Topic, fromUid, + if subErr := adp.SubsUpdate(msg.Topic, fromUid, map[string]interface{}{ "RecvSeqId": msg.SeqId, - "ReadSeqId": msg.SeqId}) + "ReadSeqId": msg.SeqId}); subErr != nil { + logs.Warn.Printf("topic[%s]: failed to mark message (seq: %d) read by sender - err: %+v", msg.Topic, msg.SeqId, subErr) + } else { + markedReadBySender = true + } } } @@ -699,11 +704,11 @@ func (messagesMapper) Save(msg *types.Message, attachmentURLs []string, readBySe } } if len(attachments) > 0 { - return adp.FileLinkAttachments("", types.ZeroUid, msg.Uid(), attachments) + return adp.FileLinkAttachments("", types.ZeroUid, msg.Uid(), attachments), markedReadBySender } } - return nil + return nil, markedReadBySender } // DeleteList deletes multiple messages defined by a list of ranges. diff --git a/server/topic.go b/server/topic.go index 3a2df02b..8020e098 100644 --- a/server/topic.go +++ b/server/topic.go @@ -964,7 +964,8 @@ func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid, } } - if err := store.Messages.Save( + markedReadBySender := false + if err, unreadUpdated := store.Messages.Save( &types.Message{ ObjHeader: types.ObjHeader{CreatedAt: msg.Timestamp}, SeqId: t.lastID + 1, @@ -977,6 +978,8 @@ func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid, msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp)) return err + } else { + markedReadBySender = unreadUpdated } t.lastID++ @@ -1023,7 +1026,7 @@ func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid, t.broadcastToSessions(data) // sendPush will update unread message count and send push notification. - if pushRcpt := t.pushForData(asUid, data.Data); pushRcpt != nil { + if pushRcpt := t.pushForData(asUid, data.Data, markedReadBySender); pushRcpt != nil { sendPush(pushRcpt) } return nil diff --git a/server/user.go b/server/user.go index c85ce368..83449db0 100644 --- a/server/user.go +++ b/server/user.go @@ -909,10 +909,12 @@ func usersRequestFromCluster(req *UserCacheReq) { } } +var usersCache map[types.Uid]userCacheEntry + // The go routine for processing updates to users cache. func userUpdater() { // Caches unread counters and numbers of topics the user's subscribed to. - usersCache := make(map[types.Uid]userCacheEntry) + usersCache = make(map[types.Uid]userCacheEntry) // Unread counter updates blocked by IO on per user basis. We flush them when the IO completes. perUserBuffers := make(map[types.Uid][]bufferedUpdate) @@ -927,10 +929,10 @@ func userUpdater() { // IO callback queue. ioDone := make(chan *ioResult, 1024) - unreadUpdater := func(uids []types.Uid, val int, inc bool) map[types.Uid]int { + unreadUpdater := func(uids []types.Uid, vals []int, inc bool) map[types.Uid]int { var dbPending []types.Uid counts := make(map[types.Uid]int, len(uids)) - for _, uid := range uids { + for i, uid := range uids { counts[uid] = 0 uce, ok := usersCache[uid] if !ok { @@ -939,6 +941,7 @@ func userUpdater() { continue } + val := vals[i] if uce.unread < 0 { // Unread counter not initialized yet. Maybe start a DB read? if updateBuf, ioInProgress := perUserBuffers[uid]; ioInProgress { @@ -1057,19 +1060,19 @@ func userUpdater() { if upd.PushRcpt != nil { // List of uids for which the unread count is being read from the DB. pendingUsers := []types.Uid{} + allUids := make([]types.Uid, 0, len(upd.PushRcpt.To)) - for uid := range upd.PushRcpt.To { + allDeltas := make([]int, 0, len(upd.PushRcpt.To)) + for uid, r := range upd.PushRcpt.To { allUids = append(allUids, uid) + delta := 0 + if r.ShouldIncrementUnreadCountInCache { + delta = 1 + } + allDeltas = append(allDeltas, delta) } - var delta int - // Increment unread counter only on msg event. - if upd.PushRcpt.Payload.What == "msg" { - delta = 1 - } else { - delta = 0 - } - allUnread := unreadUpdater(allUids, delta, true) + allUnread := unreadUpdater(allUids, allDeltas, true) for uid, unread := range allUnread { rcptTo := upd.PushRcpt.To[uid] // Handle update @@ -1138,7 +1141,7 @@ func userUpdater() { } // Request to update unread count for one user. - unreadUpdater([]types.Uid{upd.UserId}, upd.Unread, upd.Inc) + unreadUpdater([]types.Uid{upd.UserId}, []int{upd.Unread}, upd.Inc) } }