Fix incorrect unread count in user cache.

Messages marked as read by sender must not be counted as unread.
Also, exposed a bit more debug info.
This commit is contained in:
aforge
2023-02-10 16:09:17 -08:00
parent 112d100c2b
commit 9f9d8de7ed
6 changed files with 61 additions and 29 deletions
+21 -5
View File
@@ -402,13 +402,21 @@ type debugTopic struct {
Sessions []string `json:"sessions,omitempty"`
}
// debugCachedUser is a user cache entry debug info.
type debugCachedUser struct {
Uid string `json:"uid,omitempty"`
Unread int `json:"unread,omitempty"`
Topics int `json:"topics,omitempty"`
}
// debugDump is server internal state dump for debugging.
type debugDump struct {
Version string `json:"server_version,omitempty"`
Build string `json:"build_id,omitempty"`
Timestamp time.Time `json:"ts,omitempty"`
Sessions []debugSession `json:"sessions,omitempty"`
Topics []debugTopic `json:"topics,omitempty"`
Version string `json:"server_version,omitempty"`
Build string `json:"build_id,omitempty"`
Timestamp time.Time `json:"ts,omitempty"`
Sessions []debugSession `json:"sessions,omitempty"`
Topics []debugTopic `json:"topics,omitempty"`
UserCache []debugCachedUser `json:"user_cache,omitempty"`
}
func serveStatus(wrt http.ResponseWriter, req *http.Request) {
@@ -420,6 +428,7 @@ func serveStatus(wrt http.ResponseWriter, req *http.Request) {
Timestamp: types.TimeNow(),
Sessions: make([]debugSession, 0, len(globals.sessionStore.sessCache)),
Topics: make([]debugTopic, 0, 10),
UserCache: make([]debugCachedUser, 0, 10),
}
// Sessions.
globals.sessionStore.Range(func(sid string, s *Session) bool {
@@ -467,6 +476,13 @@ func serveStatus(wrt http.ResponseWriter, req *http.Request) {
})
return true
})
for k, v := range usersCache {
result.UserCache = append(result.UserCache, debugCachedUser{
Uid: k.UserId(),
Unread: v.unread,
Topics: v.topics,
})
}
json.NewEncoder(wrt).Encode(result)
}
+4 -1
View File
@@ -24,7 +24,7 @@ func (t *Topic) channelSubUnsub(uid types.Uid, sub bool) {
}
// Prepares a payload to be delivered to a mobile device as a push notification in response to a {data} message.
func (t *Topic) pushForData(fromUid types.Uid, data *MsgServerData) *push.Receipt {
func (t *Topic) pushForData(fromUid types.Uid, data *MsgServerData, msgMarkedAsReadBySender bool) *push.Receipt {
// Passing `Topic` as `t.name` for group topics and P2P topics. The p2p topic name is later rewritten for
// each recipient then the payload is created: p2p recipient sees the topic as the ID of the other user.
@@ -71,6 +71,9 @@ func (t *Topic) pushForData(fromUid types.Uid, data *MsgServerData) *push.Receip
// Number of attached sessions the data message will be delivered to.
// Push notifications sent to users with non-zero online sessions will be marked silent.
Delivered: online,
// Unread counts are incremented for all recipients,
// and for sender only if the message wasnt't marked 'read' by the sender
ShouldIncrementUnreadCountInCache: uid != fromUid || !msgMarkedAsReadBySender,
}
}
}
+2
View File
@@ -30,6 +30,8 @@ type Recipient struct {
Devices []string `json:"devices,omitempty"`
// Unread count to include in the push
Unread int `json:"unread"`
// Indicates whether unread counter in the cache should be incremented before sending the push.
ShouldIncrementUnreadCountInCache bool `json:"-"`
}
// Receipt is the push payload with a list of recipients.
+13 -8
View File
@@ -650,7 +650,7 @@ func (subsMapper) Delete(topic string, user types.Uid) error {
// MessagesPersistenceInterface is an interface which defines methods for persistent storage of messages.
type MessagesPersistenceInterface interface {
Save(msg *types.Message, attachmentURLs []string, readBySender bool) error
Save(msg *types.Message, attachmentURLs []string, readBySender bool) (error, bool)
DeleteList(topic string, delID int, forUser types.Uid, ranges []types.Range) error
GetAll(topic string, forUser types.Uid, opt *types.QueryOpt) ([]types.Message, error)
GetDeleted(topic string, forUser types.Uid, opt *types.QueryOpt) ([]types.Range, int, error)
@@ -663,30 +663,35 @@ type messagesMapper struct{}
var Messages MessagesPersistenceInterface
// Save message
func (messagesMapper) Save(msg *types.Message, attachmentURLs []string, readBySender bool) error {
func (messagesMapper) Save(msg *types.Message, attachmentURLs []string, readBySender bool) (error, bool) {
msg.InitTimes()
msg.SetUid(Store.GetUid())
// Increment topic's or user's SeqId
err := adp.TopicUpdateOnMessage(msg.Topic, msg)
if err != nil {
return err
return err, false
}
err = adp.MessageSave(msg)
if err != nil {
return err
return err, false
}
markedReadBySender := false
// Mark message as read by the sender.
if readBySender {
// Make sure From is valid, otherwise we will reset values for all subscribers.
fromUid := types.ParseUid(msg.From)
if !fromUid.IsZero() {
// Ignore the error here. It's not a big deal if it fails.
adp.SubsUpdate(msg.Topic, fromUid,
if subErr := adp.SubsUpdate(msg.Topic, fromUid,
map[string]interface{}{
"RecvSeqId": msg.SeqId,
"ReadSeqId": msg.SeqId})
"ReadSeqId": msg.SeqId}); subErr != nil {
logs.Warn.Printf("topic[%s]: failed to mark message (seq: %d) read by sender - err: %+v", msg.Topic, msg.SeqId, subErr)
} else {
markedReadBySender = true
}
}
}
@@ -699,11 +704,11 @@ func (messagesMapper) Save(msg *types.Message, attachmentURLs []string, readBySe
}
}
if len(attachments) > 0 {
return adp.FileLinkAttachments("", types.ZeroUid, msg.Uid(), attachments)
return adp.FileLinkAttachments("", types.ZeroUid, msg.Uid(), attachments), markedReadBySender
}
}
return nil
return nil, markedReadBySender
}
// DeleteList deletes multiple messages defined by a list of ranges.
+5 -2
View File
@@ -964,7 +964,8 @@ func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid,
}
}
if err := store.Messages.Save(
markedReadBySender := false
if err, unreadUpdated := store.Messages.Save(
&types.Message{
ObjHeader: types.ObjHeader{CreatedAt: msg.Timestamp},
SeqId: t.lastID + 1,
@@ -977,6 +978,8 @@ func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid,
msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp))
return err
} else {
markedReadBySender = unreadUpdated
}
t.lastID++
@@ -1023,7 +1026,7 @@ func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid,
t.broadcastToSessions(data)
// sendPush will update unread message count and send push notification.
if pushRcpt := t.pushForData(asUid, data.Data); pushRcpt != nil {
if pushRcpt := t.pushForData(asUid, data.Data, markedReadBySender); pushRcpt != nil {
sendPush(pushRcpt)
}
return nil
+16 -13
View File
@@ -909,10 +909,12 @@ func usersRequestFromCluster(req *UserCacheReq) {
}
}
var usersCache map[types.Uid]userCacheEntry
// The go routine for processing updates to users cache.
func userUpdater() {
// Caches unread counters and numbers of topics the user's subscribed to.
usersCache := make(map[types.Uid]userCacheEntry)
usersCache = make(map[types.Uid]userCacheEntry)
// Unread counter updates blocked by IO on per user basis. We flush them when the IO completes.
perUserBuffers := make(map[types.Uid][]bufferedUpdate)
@@ -927,10 +929,10 @@ func userUpdater() {
// IO callback queue.
ioDone := make(chan *ioResult, 1024)
unreadUpdater := func(uids []types.Uid, val int, inc bool) map[types.Uid]int {
unreadUpdater := func(uids []types.Uid, vals []int, inc bool) map[types.Uid]int {
var dbPending []types.Uid
counts := make(map[types.Uid]int, len(uids))
for _, uid := range uids {
for i, uid := range uids {
counts[uid] = 0
uce, ok := usersCache[uid]
if !ok {
@@ -939,6 +941,7 @@ func userUpdater() {
continue
}
val := vals[i]
if uce.unread < 0 {
// Unread counter not initialized yet. Maybe start a DB read?
if updateBuf, ioInProgress := perUserBuffers[uid]; ioInProgress {
@@ -1057,19 +1060,19 @@ func userUpdater() {
if upd.PushRcpt != nil {
// List of uids for which the unread count is being read from the DB.
pendingUsers := []types.Uid{}
allUids := make([]types.Uid, 0, len(upd.PushRcpt.To))
for uid := range upd.PushRcpt.To {
allDeltas := make([]int, 0, len(upd.PushRcpt.To))
for uid, r := range upd.PushRcpt.To {
allUids = append(allUids, uid)
delta := 0
if r.ShouldIncrementUnreadCountInCache {
delta = 1
}
allDeltas = append(allDeltas, delta)
}
var delta int
// Increment unread counter only on msg event.
if upd.PushRcpt.Payload.What == "msg" {
delta = 1
} else {
delta = 0
}
allUnread := unreadUpdater(allUids, delta, true)
allUnread := unreadUpdater(allUids, allDeltas, true)
for uid, unread := range allUnread {
rcptTo := upd.PushRcpt.To[uid]
// Handle update
@@ -1138,7 +1141,7 @@ func userUpdater() {
}
// Request to update unread count for one user.
unreadUpdater([]types.Uid{upd.UserId}, upd.Unread, upd.Inc)
unreadUpdater([]types.Uid{upd.UserId}, []int{upd.Unread}, upd.Inc)
}
}