From a0c11d56beedeca5a717acbd922c0e49e8cff9cf Mon Sep 17 00:00:00 2001 From: or-else Date: Mon, 6 Oct 2025 16:12:26 +0300 Subject: [PATCH] support for subscriber count, bug fixes in cleaning up deleted channels --- server/datamodel.go | 10 +- server/db/adapter.go | 2 + server/db/mongodb/adapter.go | 569 +++++++++++++++----------- server/db/mysql/adapter.go | 450 +++++++++++--------- server/db/postgres/adapter.go | 295 ++++++------- server/db/rethinkdb/adapter.go | 461 +++++++++++++-------- server/hub.go | 1 + server/init_topic.go | 3 + server/store/mock_store/mock_store.go | 14 + server/store/store.go | 9 +- server/store/types/types.go | 9 +- server/topic.go | 40 +- 12 files changed, 1121 insertions(+), 742 deletions(-) diff --git a/server/datamodel.go b/server/datamodel.go index 6d98a2de..bec04170 100644 --- a/server/datamodel.go +++ b/server/datamodel.go @@ -448,6 +448,7 @@ type MsgTopicDesc struct { RecvSeqId int `json:"recv,omitempty"` // Id of the last delete operation as seen by the requesting user DelId int `json:"clear,omitempty"` + SubCnt int `json:"subcnt,omitempty"` Public any `json:"public,omitempty"` Trusted any `json:"trusted,omitempty"` // Per-subscription private data @@ -475,6 +476,9 @@ func (src *MsgTopicDesc) describe() string { if src.DelId != 0 { s += " clear=" + strconv.Itoa(src.DelId) } + if src.SubCnt != 0 { + s += " subcnt=" + strconv.Itoa(src.SubCnt) + } if src.Public != nil { s += " pub='...'" } @@ -529,7 +533,8 @@ type MsgTopicSub struct { SeqId int `json:"seq,omitempty"` // Id of the latest Delete operation DelId int `json:"clear,omitempty"` - + // Number of subscribers, group topics only. + SubCnt int `json:"subcnt,omitempty"` // P2P topics in 'me' {get subs} response: // Other user's last online timestamp & user agent @@ -550,6 +555,9 @@ func (src *MsgTopicSub) describe() string { } if src.DelId != 0 { s += " clear=" + strconv.Itoa(src.DelId) + } + if src.SubCnt != 0 { + s += " subcnt=" + strconv.Itoa(src.SubCnt) } if src.Public != nil { s += " pub='...'" diff --git a/server/db/adapter.go b/server/db/adapter.go index 83643157..d92c98f1 100644 --- a/server/db/adapter.go +++ b/server/db/adapter.go @@ -121,6 +121,8 @@ type Adapter interface { TopicDelete(topic string, isChan, hard bool) error // TopicUpdateOnMessage increments Topic's or User's SeqId value and updates TouchedAt timestamp. TopicUpdateOnMessage(topic string, msg *t.Message) error + // TopicUpdateSubCnt refreshes denormalized topic subscriber count. + TopicUpdateSubCnt(topic string) error // TopicUpdate updates topic record. TopicUpdate(topic string, update map[string]any) error // TopicOwnerChange updates topic's owner diff --git a/server/db/mongodb/adapter.go b/server/db/mongodb/adapter.go index d933245f..4ab1f60e 100644 --- a/server/db/mongodb/adapter.go +++ b/server/db/mongodb/adapter.go @@ -40,12 +40,12 @@ type adapter struct { } const ( - defaultHost = "localhost:27017" - defaultDatabase = "tinode" - adpVersion = 116 adapterName = "mongodb" + defaultHost = "localhost:27017" + defaultDatabase = "tinode" + defaultMaxResults = 1024 // This is capped by the Session's send queue limit (128). defaultMaxMessageResults = 100 @@ -79,6 +79,20 @@ type configType struct { APIVersion mdbopts.ServerAPIVersion `json:"api_version,omitempty"` } +func (a *adapter) maybeStartTransaction(sess mdb.Session) error { + if a.useTransactions { + return sess.StartTransaction() + } + return nil +} + +func (a *adapter) maybeCommitTransaction(ctx context.Context, sess mdb.Session) error { + if a.useTransactions { + return sess.CommitTransaction(ctx) + } + return nil +} + // Open initializes mongodb session func (a *adapter) Open(jsonconfig json.RawMessage) error { if a.conn != nil { @@ -237,6 +251,15 @@ func (a *adapter) GetDbVersion() (int, error) { return result.Value, nil } +func (a *adapter) updateDbVersion(v int) error { + a.version = -1 + _, err := a.db.Collection("kvmeta").UpdateOne(a.ctx, + b.M{"_id": "version"}, + b.M{"$set": b.M{"value": v}}, + ) + return err +} + // CheckDbVersion checks if the actual database version matches adapter version. func (a *adapter) CheckDbVersion() error { version, err := a.GetDbVersion() @@ -549,15 +572,6 @@ func (a *adapter) UpgradeDb() error { return nil } -func (a *adapter) updateDbVersion(v int) error { - a.version = -1 - _, err := a.db.Collection("kvmeta").UpdateOne(a.ctx, - b.M{"_id": "version"}, - b.M{"$set": b.M{"value": v}}, - ) - return err -} - // Create system topic 'sys'. func createSystemTopic(a *adapter) error { now := t.TimeNow() @@ -623,34 +637,23 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { } user.Public = unmarshalBsonD(user.Public) user.Trusted = unmarshalBsonD(user.Trusted) + users = append(users, user) } + return users, nil } -func (a *adapter) maybeStartTransaction(sess mdb.Session) error { - if a.useTransactions { - return sess.StartTransaction() - } - return nil -} - -func (a *adapter) maybeCommitTransaction(ctx context.Context, sess mdb.Session) error { - if a.useTransactions { - return sess.CommitTransaction(ctx) - } - return nil -} - -// UserDelete deletes user record. +// UserDelete deletes specified user: wipes completely (hard-delete) or marks as deleted. func (a *adapter) UserDelete(uid t.Uid, hard bool) error { forUser := uid.String() // Select topics where the user is the owner. - topicIds, err := a.db.Collection("topics").Distinct(a.ctx, "_id", b.M{"owner": forUser}) + ownTopics, err := a.topicNamesForUser("topics", + b.M{"owner": forUser, "state": b.M{"$ne": t.StateDeleted}}, "_id", true) if err != nil { return err } - topicFilter := b.M{"topic": b.M{"$in": topicIds}} + ownTopicsFilter := b.M{"topic": b.M{"$in": ownTopics}} var sess mdb.Session if sess, err = a.conn.StartSession(); err != nil { @@ -665,19 +668,32 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { if err = mdb.WithSession(a.ctx, sess, func(sc mdb.SessionContext) error { if hard { + // No need to delete user's devices: devices are stored in user's record and will be deleted with it. + + // Delete user's subscriptions in all topics and decrement subcnt in topic. + if err = a.subsDelete(sc, b.M{"user": forUser}, true); err != nil { + return err + } + + // Delete user's dellog entries in all topics. + err = a.clearUserDellog(sc, forUser) + if err != nil { + return err + } + // Can't delete user's messages in all topics because we cannot notify topics of such deletion. - // Or we have to delete these messages one by one. - // For now, just leave the messages there marked as sent by "not found" user. + // Just leave the messages there marked as sent by "not found" user. // Delete topics where the user is the owner: - if len(topicIds) > 0 { + if len(ownTopics) > 0 { + // 1. Delete dellog // 2. Decrement fileuploads. // 3. Delete all messages. // 4. Delete subscriptions. - // Delete dellog entries. - _, err = a.db.Collection("dellog").DeleteMany(sc, topicFilter) + // Delete dellog for topics owned by the user. + _, err = a.db.Collection("dellog").DeleteMany(sc, ownTopicsFilter) if err != nil { return err } @@ -685,61 +701,37 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { // Decrement fileuploads UseCounter // First get array of attachments IDs that were used in messages of topics from topicIds // Then decrement the usecount field of these file records - err = a.decFileUseCounter(sc, "messages", b.M{"topic": b.M{"$in": topicIds}}) + err = a.decFileUseCounter(sc, "messages", ownTopicsFilter) if err != nil { return err } - // Decrement use counter for topic avatars - err = a.decFileUseCounter(sc, "topics", b.M{"_id": b.M{"$in": topicIds}}) + // Decrement use counter for topic avatars. + err = a.decFileUseCounter(sc, "topics", b.M{"_id": b.M{"$in": ownTopics}}) if err != nil { return err } // Delete messages - _, err = a.db.Collection("messages").DeleteMany(sc, topicFilter) + _, err = a.db.Collection("messages").DeleteMany(sc, ownTopicsFilter) if err != nil { return err } - // Delete subscriptions - _, err = a.db.Collection("subscriptions").DeleteMany(sc, topicFilter) + // Delete subscriptions for all users where the user is the owner of the topic. + _, err = a.db.Collection("subscriptions").DeleteMany(sc, ownTopicsFilter) if err != nil { return err } + // No need to delete topic tags: they are stored in topic record and will be deleted with it. + // And finally delete the topics. if _, err = a.db.Collection("topics").DeleteMany(sc, b.M{"owner": forUser}); err != nil { return err } } - // Select all other topics where the user is a subscriber. - topicIds, err = a.db.Collection("subscriptions").Distinct(sc, "topic", b.M{"user": forUser}) - if err != nil { - return err - } - - if len(topicIds) > 0 { - // Delete user's dellog entries. - if _, err = a.db.Collection("dellog").DeleteMany(sc, - b.M{"topic": b.M{"$in": topicIds}, "deletedfor": forUser}); err != nil { - return err - } - - // Delete user's markings of soft-deleted messages - filter := b.M{"topic": b.M{"$in": topicIds}, "deletedfor.user": forUser} - if _, err = a.db.Collection("messages"). - UpdateMany(sc, filter, b.M{"$pull": b.M{"deletedfor": b.M{"user": forUser}}}); err != nil { - return err - } - - // Delete user's subscriptions in all topics. - if err = a.subsDelete(sc, b.M{"user": forUser}, true); err != nil { - return err - } - } - // Delete user's authentication records. if _, err = a.authDelAllRecords(sc, uid); err != nil { return err @@ -755,6 +747,8 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { return err } + // No need to delete user's tags: they are stored in user's record and will be deleted with it. + // And finally delete the user. if _, err = a.db.Collection("users").DeleteOne(sc, b.M{"_id": forUser}); err != nil { return err @@ -769,18 +763,37 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { disable := b.M{"$set": b.M{"updatedat": now, "state": t.StateDeleted, "stateat": now}} // Disable subscriptions for topics where the user is the owner. - if _, err = a.db.Collection("subscriptions").UpdateMany(sc, topicFilter, disable); err != nil { + if _, err = a.db.Collection("subscriptions").UpdateMany(sc, ownTopicsFilter, disable); err != nil { return err } - // Disable topics where the user is the owner. - if _, err = a.db.Collection("topics").UpdateMany(sc, b.M{"_id": b.M{"$in": topicIds}}, + + // Disable group topics where the user is the owner. + if _, err = a.db.Collection("topics").UpdateMany(sc, b.M{"_id": b.M{"$in": ownTopics}}, b.M{"$set": b.M{ "updatedat": now, "touchedat": now, "state": t.StateDeleted, "stateat": now, }}); err != nil { return err } - // FIXME: disable p2p topics with the user. + // Disable p2p topics with the user. + p2pTopics, err := a.p2pTopicsForUser(uid) + if err != nil { + return err + } + if len(p2pTopics) > 0 { + if _, err = a.db.Collection("topics").UpdateMany(sc, b.M{"_id": b.M{"$in": p2pTopics}}, + b.M{"$set": b.M{ + "updatedat": now, "touchedat": now, "state": t.StateDeleted, "stateat": now, + }}); err != nil { + return err + } + + // Disable subscription to user's disabled p2p topics. + if _, err = a.db.Collection("subscriptions").UpdateMany(sc, + b.M{"topic": b.M{"$in": p2pTopics}}, disable); err != nil { + return err + } + } // Finally disable the user. if _, err = a.db.Collection("users").UpdateMany(sc, b.M{"_id": forUser}, disable); err != nil { @@ -797,7 +810,8 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { return err } -// topicStateForUser is called by UserUpdate when the update contains state change +// topicStateForUser is called by UserUpdate when the update contains state change. +// Soft-deleted topics remain soft-deleted. func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error { state, ok := update.(t.ObjState) if !ok { @@ -816,16 +830,19 @@ func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error } // Change state of p2p topics with the user (p2p topic's owner is blank) - topicIds, err := a.db.Collection("subscriptions").Distinct(a.ctx, "topic", b.M{"user": uid.String()}) + // Get list of p2p topics with the user. + p2pTopics, err := a.p2pTopicsForUser(uid) if err != nil { return err } - - if _, err := a.db.Collection("topics").UpdateMany(a.ctx, - b.M{"_id": b.M{"$in": topicIds}, "owner": "", "state": b.M{"$ne": t.StateDeleted}}, - b.M{"$set": b.M{"state": state, "stateat": now}}); err != nil { - return err + if len(p2pTopics) > 0 { + if _, err := a.db.Collection("topics").UpdateMany(a.ctx, + b.M{"_id": b.M{"$in": p2pTopics}, "state": b.M{"$ne": t.StateDeleted}}, + b.M{"$set": b.M{"state": state, "stateat": now}}); err != nil { + return err + } } + // Subscriptions don't need to be updated: // subscriptions of a disabled user are not disabled and still can be manipulated. return nil @@ -833,7 +850,7 @@ func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error // UserUpdate updates user record func (a *adapter) UserUpdate(uid t.Uid, update map[string]any) error { - // to get round the hardcoded "UpdatedAt" key in store.Users.Update() + // Convert field names from CamelCase to lowercase. update = normalizeUpdateMap(update) _, err := a.db.Collection("users").UpdateOne(a.ctx, b.M{"_id": uid.String()}, b.M{"$set": update}) @@ -845,46 +862,37 @@ func (a *adapter) UserUpdate(uid t.Uid, update map[string]any) error { now, _ := update["stateat"].(time.Time) err = a.topicStateForUser(uid, now, state) } + + // Tags are stored in the same record, no need to update them separately. + return err } -// UserUpdateTags adds, removes, or resets user's tags +// UserUpdateTags adds, removes, or resets user's tags. func (a *adapter) UserUpdateTags(uid t.Uid, add, remove, reset []string) ([]string, error) { + var newTags t.StringSlice // Compare to nil vs checking for zero length: zero length reset is valid. if reset != nil { - // Replace Tags with the new value - return reset, a.UserUpdate(uid, map[string]any{"tags": reset}) + // Replace tags with the new value + newTags = reset + } else { + var user t.User + err := a.db.Collection("users").FindOne(a.ctx, b.M{"_id": uid.String()}).Decode(&user) + if err != nil { + return nil, err + } + + // Mutate the tag list. + newTags := user.Tags + if len(add) > 0 { + newTags = union(newTags, add) + } + if len(remove) > 0 { + newTags = diff(newTags, remove) + } } - var user t.User - err := a.db.Collection("users").FindOne(a.ctx, b.M{"_id": uid.String()}).Decode(&user) - if err != nil { - return nil, err - } - - // Mutate the tag list. - newTags := user.Tags - if len(add) > 0 { - newTags = union(newTags, add) - } - if len(remove) > 0 { - newTags = diff(newTags, remove) - } - - update := map[string]any{"tags": newTags} - if err := a.UserUpdate(uid, update); err != nil { - return nil, err - } - - // Get the new tags - var tags map[string][]string - findOpts := mdbopts.FindOne().SetProjection(b.M{"tags": 1, "_id": 0}) - err = a.db.Collection("users").FindOne(a.ctx, b.M{"_id": uid.String()}, findOpts).Decode(&tags) - if err != nil { - return nil, err - } - - return tags["tags"], nil + return newTags, a.UserUpdate(uid, map[string]any{"tags": newTags}) } // UserGetByCred returns user ID for the given validated credential. @@ -939,7 +947,8 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { pipeline := b.A{ b.M{"$match": b.M{"user": b.M{"$in": uids}}}, - // Join documents from two collection + // Join documents from two collection. + // FIXME: this does not work for channels as localField[topic] is not the same as foreignField[_id]. b.M{"$lookup": b.M{ "from": "topics", "localField": "topic", @@ -1422,7 +1431,7 @@ func (a *adapter) TopicCreate(topic *t.Topic) error { return err } -// TopicCreateP2P creates a p2p topic +// TopicCreateP2P creates a p2p topic. func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { initiator.Id = initiator.Topic + ":" + initiator.User // Don't care if the initiator changes own subscription @@ -1452,7 +1461,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { topic := &t.Topic{ ObjHeader: t.ObjHeader{Id: initiator.Topic}, TouchedAt: initiator.GetTouchedAt(), - SubCnt: 2, } topic.ObjHeader.MergeTimes(&initiator.ObjHeader) return a.TopicCreate(topic) @@ -1460,34 +1468,41 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { // TopicGet loads a single topic by name, if it exists. If the topic does not exist the call returns (nil, nil) func (a *adapter) TopicGet(topic string) (*t.Topic, error) { - var tpc = new(t.Topic) - if err := a.db.Collection("topics").FindOne(a.ctx, b.M{"_id": topic}).Decode(tpc); err != nil { + var tt = new(t.Topic) + if err := a.db.Collection("topics").FindOne(a.ctx, b.M{"_id": topic}).Decode(tt); err != nil { if err == mdb.ErrNoDocuments { return nil, nil } return nil, err } - // Topic found, get subsription count. Try both topic and channel names. - if err := a.db.Collection("subscriptions").FindOne(a.ctx, b.A{ - b.M{"$match": b.M{ - "topic": b.M{"$in": b.A{topic, t.GrpToChn(topic)}}, - "deletedat": b.M{"$exists": false}, - }}, - b.M{"$count": "subCnt"}, - }).Decode(&tpc.SubCnt); err != nil { - return nil, err + + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Topic found, get subsription count. + subCnt, err := a.subscriptionCount(topic) + if err != nil { + return nil, err + } + + if int(subCnt) != tt.SubCnt { + // Update the topic with the correct subscription count. + tt.SubCnt = int(subCnt) + err = a.topicUpdate(topic, b.M{"subcnt": tt.SubCnt}) + if err != nil { + return nil, err + } + } } - tpc.Public = unmarshalBsonD(tpc.Public) - tpc.Trusted = unmarshalBsonD(tpc.Trusted) - // tpc.Aux = unmarshalBsonD(tpc.Aux) - return tpc, nil + tt.Public = unmarshalBsonD(tt.Public) + tt.Trusted = unmarshalBsonD(tt.Trusted) + + return tt, nil } // TopicsForUser loads user's contact list: p2p and grp topics, except for 'me' & 'fnd' subscriptions. // Reads and denormalizes Public & Trusted values. func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { - // Fetch user's subscriptions + // Fetch all user's subscriptions. filter := b.M{"user": uid.String()} if !keepDeleted { // Filter out rows with defined deletedat @@ -1525,6 +1540,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if err != nil { return nil, err } + // Must close the cursor manually as we will be reusing it. // Fetch subscriptions. Two queries are needed: users table (me & p2p) and topics table (p2p and grp). // Prepare a list of Separate subscriptions to users vs topics @@ -1554,14 +1570,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( sub.SetWith(uid1.UserId()) } topq = append(topq, tname) - } else { - // Group or sys subscription. - if tcat == t.TopicCatGrp { - // Maybe convert channel name to topic name. - tname = t.ChnToGrp(tname) - } - topq = append(topq, tname) + } else if tcat == t.TopicCatGrp { + // Maybe convert channel name to topic name. + tname = t.ChnToGrp(tname) } + // No special handling needed for 'slf', 'sys' subscriptions. + + topq = append(topq, tname) sub.Private = unmarshalBsonD(sub.Private) join[tname] = sub } @@ -1578,9 +1593,11 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if len(topq) > 0 { // Fetch grp & p2p topics filter = b.M{"_id": b.M{"$in": topq}} + if !keepDeleted { filter["state"] = b.M{"$ne": t.StateDeleted} } + if !ims.IsZero() { // Use cache timestamp if provided: get newer entries only. filter["touchedat"] = b.M{"$gt": ims} @@ -1591,6 +1608,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( findOpts = mdbopts.Find().SetSort(b.D{{"touchedat", 1}}).SetLimit(int64(limit)) } } + cur, err = a.db.Collection("topics").Find(a.ctx, filter, findOpts) if err != nil { return nil, err @@ -1602,11 +1620,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( break } sub := join[top.Id] + // Check if sub.UpdatedAt needs to be adjusted to earlier or later time. sub.UpdatedAt = common.SelectLatestTime(sub.UpdatedAt, top.UpdatedAt) sub.SetState(top.State) sub.SetTouchedAt(top.TouchedAt) sub.SetSeqId(top.SeqId) if t.GetTopicCat(sub.Topic) == t.TopicCatGrp { + sub.SetSubCnt(top.SubCnt) sub.SetPublic(unmarshalBsonD(top.Public)) sub.SetTrusted(unmarshalBsonD(top.Trusted)) } @@ -1614,6 +1634,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( join[top.Id] = sub } cur.Close(a.ctx) + if err != nil { return nil, err } @@ -1651,6 +1672,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( } } cur.Close(a.ctx) + if err != nil { return nil, err } @@ -1664,12 +1686,12 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( return common.SelectEarliestUpdatedSubs(subs, opts, a.maxResults), nil } -// UsersForTopic loads users' subscriptions for a given topic. Public & Trusted are loaded. +// UsersForTopic loads users' subscriptions for a given topic (not channel readers). +// Public & Trusted are loaded. func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { tcat := t.GetTopicCat(topic) - // Fetch topic subscribers - // Fetch all subscribed users. The number of users is not large + // Fetch all subscribed users. The number of users is not large. filter := b.M{"topic": topic} if !keepDeleted && tcat != t.TopicCatP2P { // Filter out rows with DeletedAt being not null. @@ -1700,7 +1722,7 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt return nil, err } - // Fetch subscriptions + // Fetch subscriptions. var subs []t.Subscription join := make(map[string]t.Subscription) usrq := make([]any, 0, 16) @@ -1717,10 +1739,9 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt return nil, err } + // Fetch users by a list of subscriptions. if len(usrq) > 0 { subs = make([]t.Subscription, 0, len(usrq)) - - // Fetch users by a list of subscriptions cur, err = a.db.Collection("users").Find(a.ctx, b.M{ "_id": b.M{"$in": usrq}, "state": b.M{"$ne": t.StateDeleted}}) @@ -1739,7 +1760,6 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt sub.SetPublic(unmarshalBsonD(usr2.Public)) sub.SetTrusted(unmarshalBsonD(usr2.Trusted)) sub.SetLastSeenAndUA(usr2.LastSeen, usr2.UserAgent) - subs = append(subs, sub) } } @@ -1787,14 +1807,15 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt return subs, nil } -// OwnTopics loads a slice of topic names where the user is the owner. -func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) { - filter := b.M{"owner": uid.String(), "state": b.M{"$ne": t.StateDeleted}} - findOpts := mdbopts.Find().SetProjection(b.M{"_id": 1}) - cur, err := a.db.Collection("topics").Find(a.ctx, filter, findOpts) +// topicNamesForUser reads topic names from the 'field' of 'collection' using 'filter'. +// If includeChan is true, for group topics also add the corresponding channel name. +func (a *adapter) topicNamesForUser(collection string, filter b.M, field string, includeChan bool) ([]string, error) { + cur, err := a.db.Collection(collection).Find(a.ctx, filter, + mdbopts.Find().SetProjection(b.M{field: 1})) if err != nil { return nil, err } + defer cur.Close(a.ctx) var names []string for cur.Next(a.ctx) { @@ -1802,38 +1823,44 @@ func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) { if err = cur.Decode(&res); err != nil { break } - names = append(names, res["_id"]) + names = append(names, res[field]) + // If the name is a group topic, also add the channel name if requested. + if includeChan { + if channel := t.GrpToChn(res[field]); channel != "" { + names = append(names, channel) + } + } } - cur.Close(a.ctx) return names, err } +func (a *adapter) p2pTopicsForUser(uid t.Uid) ([]string, error) { + return a.topicNamesForUser("subscriptions", + b.M{ + "user": uid.String(), + "deletedat": b.M{"$exists": false}, + "topic": b.M{"$regex": primitive.Regex{Pattern: "^p2p"}}}, + "topic", false) +} + +// OwnTopics loads a slice of topic names where the user is the owner. +func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) { + return a.topicNamesForUser("topics", + b.M{"owner": uid.String(), "state": b.M{"$ne": t.StateDeleted}}, + "_id", false) +} + // ChannelsForUser loads a slice of topic names where the user is a channel reader and notifications (P) are enabled. func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) { - filter := b.M{ - "user": uid.String(), - "deletedat": b.M{"$exists": false}, - "topic": b.M{"$regex": primitive.Regex{Pattern: "^chn"}}, - "modewant": b.M{"$bitsAllSet": b.A{t.ModePres}}, - "modegiven": b.M{"$bitsAllSet": b.A{t.ModePres}}} - findOpts := mdbopts.Find().SetProjection(b.M{"topic": 1}) - cur, err := a.db.Collection("subscriptions").Find(a.ctx, filter, findOpts) - if err != nil { - return nil, err - } - - var names []string - for cur.Next(a.ctx) { - var res map[string]string - if err = cur.Decode(&res); err != nil { - break - } - names = append(names, res["topic"]) - } - cur.Close(a.ctx) - - return names, err + return a.topicNamesForUser("subscriptions", + b.M{ + "user": uid.String(), + "deletedat": b.M{"$exists": false}, + "topic": b.M{"$regex": primitive.Regex{Pattern: "^chn"}}, + "modewant": b.M{"$bitsAllSet": b.A{t.ModePres}}, + "modegiven": b.M{"$bitsAllSet": b.A{t.ModePres}}}, + "topic", false) } // TopicShare creates topic subscriptions. @@ -1859,11 +1886,13 @@ func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error { } } - // Update topic's subscription count. - // The error is ignored because the subvscriptions have been created already. - a.db.Collection("topics").UpdateOne(a.ctx, - b.M{"_id": topic}, - b.M{"$inc": map[string]any{"subcnt": len(shares)}}) + if topic != "" { + // Update topic's subscription count. + // The error is ignored because the subscriptions have been created already. + a.db.Collection("topics").UpdateOne(a.ctx, + b.M{"_id": topic}, + b.M{"$inc": b.M{"subcnt": len(shares)}}) + } return nil } @@ -1887,10 +1916,10 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { filter = b.M{"_id": topic} if hard { - if err = a.MessageDeleteList(topic, nil); err != nil { + if err = a.decFileUseCounter(a.ctx, "topics", filter); err != nil { return err } - if err = a.decFileUseCounter(a.ctx, "topics", filter); err != nil { + if err = a.MessageDeleteList(topic, nil); err != nil { return err } _, err = a.db.Collection("topics").DeleteOne(a.ctx, filter) @@ -1906,7 +1935,26 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { // TopicUpdateOnMessage increments Topic's or User's SeqId value and updates TouchedAt timestamp. func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error { - return a.topicUpdate(topic, map[string]any{"seqid": msg.SeqId, "touchedat": msg.CreatedAt}) + return a.topicUpdate(topic, b.M{"seqid": msg.SeqId, "touchedat": msg.CreatedAt}) +} + +func (a *adapter) subscriptionCount(topic string) (int64, error) { + // Get count of non-deleted subscriptions to the topic. + return a.db.Collection("subscriptions").CountDocuments(a.ctx, b.M{ + "topic": b.M{"$in": b.A{topic, t.GrpToChn(topic)}}, + "deletedat": b.M{"$exists": false}, + }) +} + +// TopicUpdateSubCnt updates subscriber count denormalized in topic. +func (a *adapter) TopicUpdateSubCnt(topic string) error { + // Get count of non-deleted subscriptions to the topic. + // UPDATE ... SET=(SELECT ...) is not supported in MongoDB, so we have to do it in two queries. + count, err := a.subscriptionCount(topic) + if err != nil { + return err + } + return a.topicUpdate(topic, b.M{"subcnt": count}) } // TopicUpdate updates topic record. @@ -1974,7 +2022,9 @@ func (a *adapter) SubsForUser(user t.Uid) ([]t.Subscription, error) { return subs, cur.Err() } -// SubsForTopic gets a list of subscriptions to a given topic. Does NOT load Public & Trusted values. +// SubsForTopic fetches all subsciptions for a topic. Does NOT load Public value and does not load channel readers. +// The difference between UsersForTopic vs SubsForTopic is that the former loads user.public+trusted, +// the latter does not. func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { filter := b.M{"topic": topic} if !keepDeleted { @@ -2016,7 +2066,7 @@ func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) // SubsUpdate updates part of a subscription object. Pass nil for fields which don't need to be updated func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) error { - // to get round the hardcoded pass of "Private" key + // Convert CamelCase field names to lowercase. update = normalizeUpdateMap(update) filter := b.M{} @@ -2031,7 +2081,7 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er return err } -// SubsDelete deletes a single subscription +// SubsDelete marks at most one subscription as deleted (soft-deleting). func (a *adapter) SubsDelete(topic string, user t.Uid) error { var sess mdb.Session var err error @@ -2052,6 +2102,7 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { return err } + // Channel readers cannot delete messages. if !t.IsChannel(topic) { // Delete user's dellog entries. @@ -2066,19 +2117,96 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { return err } } + + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Decrement topic subscription count (only one subscription is deleted). + if err := a.topicUpdate(topic, b.M{"subcnt": -1}); err != nil { + return err + } + } + // Commit changes. return a.maybeCommitTransaction(sc, sess) }) } -// Delete/mark deleted subscriptions. +// clearUserDellog deletes all dellog entries and deletedfor markings of a given user. +func (a *adapter) clearUserDellog(sc mdb.SessionContext, forUser string) error { + topics, err := a.db.Collection("subscriptions").Distinct(sc, "topic", + b.M{"user": forUser, "deletedat": b.M{"$exists": false}}) + if err != nil { + return err + } + + // No need to convert channel names to group names: + // channel readers cannot delete messages. + + if len(topics) > 0 { + // Delete user's dellog entries. + if _, err = a.db.Collection("dellog").DeleteMany(sc, + b.M{"topic": b.M{"$in": topics}, "deletedfor": forUser}); err != nil { + return err + } + + // Delete user's markings of soft-deleted messages + filter := b.M{"topic": b.M{"$in": topics}, "deletedfor.user": forUser} + if _, err = a.db.Collection("messages"). + UpdateMany(sc, filter, b.M{"$pull": b.M{"deletedfor": b.M{"user": forUser}}}); err != nil { + return err + } + } + + return nil +} + +// Delete/mark deleted subscriptions and decrement subcnt in topic. func (a *adapter) subsDelete(ctx context.Context, filter b.M, hard bool) error { - var err error + // First, decrement subscription count in all affected topics. + // Doing it in two steps because MongoDB does not support an equivalent of + // 'UPDATE .. LEFT JOIN ...'. + filterWithDeletedAt := copyBsonMap(filter) + filterWithDeletedAt["deletedat"] = b.M{"$exists": false} + cur, err := a.db.Collection("subscriptions").Find(ctx, filterWithDeletedAt, + mdbopts.Find().SetProjection(b.D{{"topic", 1}, {"_id", 0}})) + if err != nil { + return err + } + defer cur.Close(ctx) + var topics []string + for cur.Next(ctx) { + var result struct { + Topic string `bson:"topic"` + } + if err = cur.Decode(&result); err != nil { + return err + } + if t.IsChannel(result.Topic) { + // Convert channel name to group name. + topics = append(topics, t.ChnToGrp(result.Topic)) + } + topics = append(topics, result.Topic) + } + + if err = cur.Err(); err != nil { + return err + } + + if len(topics) == 0 { + // Nothing to update. + return nil + } + + // Decrement subscription count in affected topics. + a.db.Collection("topics").UpdateMany(ctx, + b.M{"_id": b.M{"$in": topics}}, + b.M{"$inc": b.M{"subcnt": -1}}) + + // Now delete or mark deleted the subscriptions. if hard { _, err = a.db.Collection("subscriptions").DeleteMany(ctx, filter) } else { now := t.TimeNow() - _, err = a.db.Collection("subscriptions").UpdateMany(ctx, filter, + _, err = a.db.Collection("subscriptions").UpdateMany(ctx, filterWithDeletedAt, b.M{"$set": b.M{"updatedat": now, "deletedat": now}}) } return err @@ -2158,48 +2286,10 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string, index[tag] = struct{}{} } - /* - matchOn := b.M{"tags": b.M{"$in": allTags}} - if activeOnly { - matchOn["state"] = b.M{"$eq": t.StateOK} - } - commonPipe := b.A{ - b.M{"$match": matchOn}, - b.M{"$project": b.M{"_id": 1, "createdat": 1, "updatedat": 1, "usebt": 1, "access": 1, "public": 1, "trusted": 1, "tags": 1}}, - b.M{"$unwind": "$tags"}, - b.M{"$match": b.M{"tags": b.M{"$in": allTags}}}, - b.M{"$group": b.M{ - "_id": "$_id", - "createdat": b.M{"$first": "$createdat"}, - "updatedat": b.M{"$first": "$updatedat"}, - "usebt": b.M{"$first": "$usebt"}, - "access": b.M{"$first": "$access"}, - "public": b.M{"$first": "$public"}, - "trusted": b.M{"$first": "$trusted"}, - "tags": b.M{"$addToSet": "$tags"}, - "matchedTagsCount": b.M{"$sum": 1}, - }}, - } - - for _, reqDisjunction := range req { - if len(reqDisjunction) == 0 { - continue - } - var reqTags []any - for _, tag := range reqDisjunction { - reqTags = append(reqTags, tag) - } - // Filter out documents where 'tags' intersection with 'reqTags' is an empty array - commonPipe = append(commonPipe, - b.M{"$match": b.M{"$expr": b.M{"$ne": b.A{b.M{"$size": b.M{"$setIntersection": b.A{"$tags", reqTags}}}, 0}}}}) - } - - // Must create a copy of commonPipe so the original commonPipe can be used unmodified in $unionWith. - pipeline := append(slices.Clone(commonPipe), - b.M{"$unionWith": b.M{"coll": "topics", "pipeline": commonPipe}}, - b.M{"$sort": b.M{"matchedTagsCount": -1}}, - b.M{"$limit": a.maxResults}) - */ + matchOn := b.M{"tags": b.M{"$in": allTags}} + if activeOnly { + matchOn["state"] = b.M{"$eq": t.StateOK} + } projectFields := b.M{"_id": 1, "createdat": 1, "updatedat": 1, "usebt": 1, "access": 1, "subcnt": 1, "public": 1, "trusted": 1, "tags": 1} @@ -2209,14 +2299,14 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string, b.M{ "$facet": b.D{ {"users", b.A{ - b.M{"$match": b.M{"tags": b.M{"$in": allTags}}}, + b.M{"$match": matchOn}, b.M{"$project": projectFields}, }}, {"topics", b.A{ b.M{"$lookup": b.D{ {"from", "topics"}, {"pipeline", b.A{ - b.M{"$match": b.M{"tags": b.M{"$in": allTags}}}, + b.M{"$match": matchOn}, b.M{"$project": projectFields}, }}, {"as", "topicDocs"}, @@ -2268,7 +2358,7 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string, pipeline = append(pipeline, // Stage 9: $sort - b.M{"$sort": b.M{"matchedCount": -1}}, + b.M{"$sort": b.D{{"matchedCount", -1}, {"subcnt", -1}}}, // Stage 10: $limit b.M{"$limit": a.maxResults}, ) @@ -2288,6 +2378,8 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string, } if topic.UseBt { + // This is a channel, convert grp to chn name: all channel-capable + // topics should appear as channels in search results. sub.Topic = t.GrpToChn(topic.Id) } else { if uid := t.ParseUid(topic.Id); !uid.IsZero() { @@ -2312,7 +2404,6 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string, sub.Private = common.FilterFoundTags(topic.Tags, index) subs = append(subs, sub) } - if err == nil { err = cur.Err() } @@ -2320,7 +2411,7 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string, return subs, err } -// FindOne returns topic or user which matches the given tag. +// FindOne returns the first topic or user which matches the given tag. func (a *adapter) FindOne(tag string) (string, error) { // Part of the pipeline identical for users and topics collections. commonPipe := b.A{b.M{"$match": b.M{"tags": tag}}, b.M{"$project": b.M{"_id": 1}}} @@ -2362,7 +2453,7 @@ func (a *adapter) MessageSave(msg *t.Message) error { return err } -// MessageGetAll returns messages matching the query +// MessageGetAll returns messages matching the query. func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ([]t.Message, error) { var limit = a.maxMessageResults var lower, upper int @@ -2420,11 +2511,11 @@ func (a *adapter) messagesHardDelete(topic string) error { return err } - if _, err = a.db.Collection("messages").DeleteMany(a.ctx, filter); err != nil { + if err = a.decFileUseCounter(a.ctx, "messages", filter); err != nil { return err } - if err = a.decFileUseCounter(a.ctx, "messages", filter); err != nil { + if _, err = a.db.Collection("messages").DeleteMany(a.ctx, filter); err != nil { return err } @@ -2652,7 +2743,7 @@ func (a *adapter) deviceInsert(userId string, dev *t.DeviceDef) error { return err } -// DeviceGetAll returns all devices for a given set of users +// DeviceGetAll returns all devices for a given set of users. func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, error) { ids := make([]any, len(uids)) for i, id := range uids { @@ -2678,7 +2769,7 @@ func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, err if err = cur.Decode(&row); err != nil { return nil, 0, err } - if row.Devices != nil && len(row.Devices) > 0 { + if len(row.Devices) > 0 { if err := uid.UnmarshalText([]byte(row.Id)); err != nil { continue } diff --git a/server/db/mysql/adapter.go b/server/db/mysql/adapter.go index b121f6c5..93cd728f 100644 --- a/server/db/mysql/adapter.go +++ b/server/db/mysql/adapter.go @@ -42,13 +42,12 @@ type adapter struct { } const ( + adpVersion = 116 + adapterName = "mysql" + defaultDSN = "root:@tcp(localhost:3306)/tinode?parseTime=true" defaultDatabase = "tinode" - adpVersion = 116 - - adapterName = "mysql" - defaultMaxResults = 1024 // This is capped by the Session's send queue limit (128). defaultMaxMessageResults = 100 @@ -369,7 +368,7 @@ func (a *adapter) CreateDb(reset bool) error { lang VARCHAR(8), PRIMARY KEY(id), FOREIGN KEY(userid) REFERENCES users(id), - UNIQUE INDEX devices_hash (hash) + UNIQUE INDEX devices_hash(hash) )`); err != nil { return err } @@ -415,7 +414,8 @@ func (a *adapter) CreateDb(reset bool) error { PRIMARY KEY(id), UNIQUE INDEX topics_name(name), INDEX topics_owner(owner), - INDEX topics_state_stateat(state, stateat) + INDEX topics_state_stateat(state, stateat), + INDEX topics_name_state_seqid(name, state, seqid) )`); err != nil { return err } @@ -458,7 +458,8 @@ func (a *adapter) CreateDb(reset bool) error { FOREIGN KEY(userid) REFERENCES users(id), UNIQUE INDEX subscriptions_topic_userid(topic, userid), INDEX subscriptions_topic(topic), - INDEX subscriptions_deletedat(deletedat) + INDEX subscriptions_deletedat(deletedat), + INDEX subscriptions_userid_topic_deletedat(userid, topic, deletedat) )`); err != nil { return err } @@ -479,7 +480,7 @@ func (a *adapter) CreateDb(reset bool) error { PRIMARY KEY(id), FOREIGN KEY(topic) REFERENCES topics(name), UNIQUE INDEX messages_topic_seqid(topic, seqid) - );`); err != nil { + )`); err != nil { return err } @@ -497,7 +498,7 @@ func (a *adapter) CreateDb(reset bool) error { INDEX dellog_topic_delid_deletedfor(topic,delid,deletedfor), INDEX dellog_topic_deletedfor_low_hi(topic,deletedfor,low,hi), INDEX dellog_deletedfor(deletedfor) - );`); err != nil { + )`); err != nil { return err } @@ -516,9 +517,9 @@ func (a *adapter) CreateDb(reset bool) error { done TINYINT NOT NULL DEFAULT 0, retries INT NOT NULL DEFAULT 0, PRIMARY KEY(id), - UNIQUE credentials_uniqueness(synthetic), - FOREIGN KEY(userid) REFERENCES users(id) - );`); err != nil { + FOREIGN KEY(userid) REFERENCES users(id), + UNIQUE credentials_uniqueness(synthetic) + )`); err != nil { return err } @@ -573,16 +574,6 @@ func (a *adapter) CreateDb(reset bool) error { return err } - // Find relevant subscriptions for given users efficiently, and use the join key too. - if _, err = tx.Exec("CREATE INDEX idx_subs_user_topic_del ON subscriptions(userid, topic, deletedat)"); err != nil { - return err - } - - // Optimizes join; state filters; seqid supports the SUM operation. - if _, err = tx.Exec("CREATE INDEX idx_topics_name_state_seqid ON topics(name, state, seqid)"); err != nil { - return err - } - return tx.Commit() } @@ -843,6 +834,7 @@ func (a *adapter) UpgradeDb() error { return nil } +// Create system topic 'sys'. func createSystemTopic(tx *sql.Tx) error { now := t.TimeNow() query := `INSERT INTO topics(createdat,updatedat,state,touchedat,name,access,public) @@ -892,8 +884,7 @@ func removeTags(tx *sqlx.Tx, table, keyName string, keyVal any, tags []string) e } query, args, _ := sqlx.In("DELETE FROM "+table+" WHERE "+keyName+"=? AND tag IN (?)", keyVal, args) - query = tx.Rebind(query) - _, err := tx.Exec(query, args...) + _, err := tx.Exec(tx.Rebind(query), args...) return err } @@ -919,9 +910,13 @@ func (a *adapter) UserCreate(user *t.User) error { decoded_uid := store.DecodeUid(user.Uid()) if _, err = tx.Exec("INSERT INTO users(id,createdat,updatedat,state,access,public,trusted,tags) VALUES(?,?,?,?,?,?,?,?)", decoded_uid, - user.CreatedAt, user.UpdatedAt, - user.State, user.Access, - common.ToJSON(user.Public), common.ToJSON(user.Trusted), user.Tags); err != nil { + user.CreatedAt, + user.UpdatedAt, + user.State, + user.Access, + common.ToJSON(user.Public), + common.ToJSON(user.Trusted), + user.Tags); err != nil { return err } @@ -945,9 +940,9 @@ func (a *adapter) AuthAddRecord(uid t.Uid, scheme, unique string, authLvl auth.L if cancel != nil { defer cancel() } - _, err := a.db.ExecContext(ctx, "INSERT INTO auth(uname,userid,scheme,authLvl,secret,expires) VALUES(?,?,?,?,?,?)", - unique, store.DecodeUid(uid), scheme, authLvl, secret, exp) - if err != nil { + + if _, err := a.db.ExecContext(ctx, "INSERT INTO auth(uname,userid,scheme,authLvl,secret,expires) VALUES(?,?,?,?,?,?)", + unique, store.DecodeUid(uid), scheme, authLvl, secret, exp); err != nil { if isDupe(err) { return t.ErrDuplicate } @@ -1110,17 +1105,16 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { } users := []t.User{} - q, uids, _ := sqlx.In("SELECT * FROM users WHERE id IN (?) AND state!=?", uids, t.StateDeleted) - q = a.db.Rebind(q) - ctx, cancel := a.getContext() if cancel != nil { defer cancel() } - rows, err := a.db.QueryxContext(ctx, q, uids...) + q, uids, _ := sqlx.In("SELECT * FROM users WHERE id IN (?) AND state!=?", uids, t.StateDeleted) + rows, err := a.db.QueryxContext(ctx, a.db.Rebind(q), uids...) if err != nil { return nil, err } + defer rows.Close() for rows.Next() { var user t.User @@ -1128,11 +1122,6 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { users = nil break } - - if user.State == t.StateDeleted { - continue - } - user.SetUid(common.EncodeUidString(user.Id)) user.Public = common.FromJSON(user.Public) user.Trusted = common.FromJSON(user.Trusted) @@ -1142,7 +1131,6 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { if err == nil { err = rows.Err() } - rows.Close() return users, err } @@ -1150,6 +1138,13 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { // UserDelete deletes specified user: wipes completely (hard-delete) or marks as deleted. // TODO: report when the user is not found. func (a *adapter) UserDelete(uid t.Uid, hard bool) error { + // Get a list of topic names owned by the user (as 'grp' and 'chn'). + ownTopics, err := a.topicNamesForUser("SELECT name FROM topics WHERE owner=? AND state!=?", + true, store.DecodeUid(uid), t.StateDeleted) + if err != nil { + return err + } + ctx, cancel := a.getContextForTx() if cancel != nil { defer cancel() @@ -1180,7 +1175,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { return err } - // Delete records of messages soft-deleted for the user. + // Delete records of messages soft-deleted for the user in all topics. if _, err = tx.Exec("DELETE FROM dellog WHERE deletedfor=?", decoded_uid); err != nil { return err } @@ -1189,32 +1184,35 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { // Just leave the messages there marked as sent by "not found" user. // Delete topics where the user is the owner. + if len(ownTopics) > 0 { + // First delete all messages in those topics. + if _, err = tx.Exec("DELETE dellog FROM dellog LEFT JOIN topics ON topics.name=dellog.topic WHERE topics.owner=?", + decoded_uid); err != nil { + return err + } - // First delete all messages in those topics. - if _, err = tx.Exec("DELETE dellog FROM dellog LEFT JOIN topics ON topics.name=dellog.topic WHERE topics.owner=?", - decoded_uid); err != nil { - return err - } - if _, err = tx.Exec("DELETE messages FROM messages LEFT JOIN topics ON topics.name=messages.topic WHERE topics.owner=?", - decoded_uid); err != nil { - return err - } + // Deletion of messages will cascade to filemsglinks and so to fileuploads. + if _, err = tx.Exec("DELETE messages FROM messages LEFT JOIN topics ON topics.name=messages.topic WHERE topics.owner=?", + decoded_uid); err != nil { + return err + } - // Delete all subscriptions. - if _, err = tx.Exec("DELETE sub FROM subscriptions AS sub LEFT JOIN topics ON topics.name=sub.topic WHERE topics.owner=?", - decoded_uid); err != nil { - return err - } + // Delete subscriptions for all users where the user is the owner of the topic. + sql, args, _ := sqlx.In("DELETE FROM subscriptions AS s WHERE topic IN (?)", ownTopics) + if _, err = tx.Exec(tx.Rebind(sql), args); err != nil { + return err + } - // Delete topic tags. - if _, err = tx.Exec("DELETE topictags FROM topictags LEFT JOIN topics ON topics.name=topictags.topic WHERE topics.owner=?", - decoded_uid); err != nil { - return err - } + // Delete topic tags. + if _, err = tx.Exec("DELETE tt FROM topictags AS tt LEFT JOIN topics AS t ON t.name=tt.topic WHERE t.owner=?", + decoded_uid); err != nil { + return err + } - // And finally delete the topics. - if _, err = tx.Exec("DELETE FROM topics WHERE owner=?", decoded_uid); err != nil { - return err + // And finally delete the topics. + if _, err = tx.Exec("DELETE FROM topics WHERE owner=?", decoded_uid); err != nil { + return err + } } // Delete user's authentication records. @@ -1241,20 +1239,21 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { } // Disable all subscriptions to topics where the user is the owner. - if _, err = tx.Exec("UPDATE subscriptions LEFT JOIN topics ON subscriptions.topic=topics.name "+ - "SET subscriptions.updatedat=?, subscriptions.deletedat=? WHERE topics.owner=?", - now, now, decoded_uid); err != nil { + sql, args, _ := sqlx.In("UPDATE subscriptions SET s.updatedat=?,s.deletedat=? WHERE topic IN (?)", now, now, ownTopics) + if _, err = tx.Exec(tx.Rebind(sql), args); err != nil { return err } + // Disable group topics where the user is the owner. if _, err = tx.Exec("UPDATE topics SET updatedat=?,touchedat=?,state=?,stateat=? WHERE owner=?", now, now, t.StateDeleted, now, decoded_uid); err != nil { return err } + // Disable p2p topics with the user (p2p topic's owner is 0). - if _, err = tx.Exec("UPDATE topics LEFT JOIN subscriptions ON topics.name=subscriptions.topic "+ - "SET topics.updatedat=?,topics.touchedat=?,topics.state=?,topics.stateat=? "+ - "WHERE topics.owner=0 AND subscriptions.userid=?", + if _, err = tx.Exec("UPDATE topics AS t LEFT JOIN subscriptions AS s ON t.name=s.topic "+ + "SET t.updatedat=?,t.touchedat=?,t.state=?,t.stateat=? "+ + "WHERE t.owner=0 AND s.userid=? AND t.name LIKE 'p2p%'", now, now, t.StateDeleted, now, decoded_uid); err != nil { return err } @@ -1267,8 +1266,8 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { return err } - // Disable user. - if _, err = tx.Exec("UPDATE users SET updatedat=?, state=?, stateat=? WHERE id=?", + // Finally disable user. + if _, err = tx.Exec("UPDATE users SET updatedat=?,state=?,stateat=? WHERE id=?", now, t.StateDeleted, now, decoded_uid); err != nil { return err } @@ -1278,6 +1277,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { } // topicStateForUser is called by UserUpdate when the update contains state change. +// Soft-deleted topics remain soft-deleted. func (a *adapter) topicStateForUser(tx *sqlx.Tx, decoded_uid int64, now time.Time, update any) error { var err error @@ -1305,7 +1305,6 @@ func (a *adapter) topicStateForUser(tx *sqlx.Tx, decoded_uid int64, now time.Tim // Subscriptions don't need to be updated: // subscriptions of a disabled user are not disabled and still can be manipulated. - return nil } @@ -1359,7 +1358,7 @@ func (a *adapter) UserUpdate(uid t.Uid, update map[string]any) error { return tx.Commit() } -// UserUpdateTags adds or resets user's tags +// UserUpdateTags adds, removes, or resets user's tags. func (a *adapter) UserUpdateTags(uid t.Uid, add, remove, reset []string) ([]string, error) { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -1436,6 +1435,7 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) { // UserUnreadCount returns the total number of unread messages in all topics with // the R permission. If read fails, the counts are still returned with the original // user IDs but with the unread count undefined and non-nil error. +// UserUnreadCount does not count unread messages in channels although it should. func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { uids := make([]any, len(ids)) counts := make(map[t.Uid]int, len(ids)) @@ -1445,20 +1445,20 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { counts[id] = 0 } - q, uids, _ := sqlx.In("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+ - "WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+ - "INSTR(s.modewant, 'R')>0 AND INSTR(s.modegiven, 'R')>0 GROUP BY s.userid", uids, t.StateDeleted) - q = a.db.Rebind(q) - ctx, cancel := a.getContext() if cancel != nil { defer cancel() } - rows, err := a.db.QueryxContext(ctx, q, uids...) + // FIXME: support channels (for channels subscriptions.topic != topics.name). + q, args, _ := sqlx.In("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+ + "WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+ + "INSTR(s.modewant, 'R')>0 AND INSTR(s.modegiven, 'R')>0 GROUP BY s.userid", uids, t.StateDeleted) + rows, err := a.db.QueryxContext(ctx, a.db.Rebind(q), args...) if err != nil { return counts, err } + defer rows.Close() var userId int64 var unreadCount int @@ -1471,7 +1471,6 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { if err == nil { err = rows.Err() } - rows.Close() return counts, err } @@ -1493,6 +1492,7 @@ func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time, limit int) ([] if err != nil { return nil, err } + defer rows.Close() for rows.Next() { var userId int64 @@ -1505,13 +1505,10 @@ func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time, limit int) ([] if err == nil { err = rows.Err() } - rows.Close() return uids, err } -// ***************************** - func (a *adapter) topicCreate(tx *sqlx.Tx, topic *t.Topic) error { _, err := tx.Exec("INSERT INTO topics(createdat,updatedat,touchedat,state,name,usebt,owner,access,public,trusted,tags,aux) "+ "VALUES(?,?,?,?,?,?,?,?,?,?,?,?)", @@ -1573,13 +1570,16 @@ func createSubscription(tx *sqlx.Tx, sub *t.Subscription, undelete bool) error { sub.Topic, decoded_uid) } } + if err == nil && isOwner { + // Update topic owner if the subscription is with owner rights. + // Don't increment subscriber count here - it's done in TopicShare in bulk. _, err = tx.Exec("UPDATE topics SET owner=? WHERE name=?", decoded_uid, sub.Topic) } return err } -// TopicCreateP2P given two users creates a p2p topic +// TopicCreateP2P given two users creates a p2p topic. func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -1600,6 +1600,7 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { return err } + // If the second subscription exists, don't overwrite it. Just make sure it's not deleted. err = createSubscription(tx, invited, true) if err != nil { return err @@ -1608,7 +1609,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { topic := &t.Topic{ObjHeader: t.ObjHeader{Id: initiator.Topic}} topic.ObjHeader.MergeTimes(&initiator.ObjHeader) topic.TouchedAt = initiator.GetTouchedAt() - topic.SubCnt = 2 err = a.topicCreate(tx, topic) if err != nil { return err @@ -1623,12 +1623,12 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) { if cancel != nil { defer cancel() } + // Fetch topic by name var tt = new(t.Topic) if err := a.db.GetContext(ctx, tt, "SELECT createdat,updatedat,state,stateat,touchedat,name AS id,usebt,access,owner,seqid,delid,subcnt,public,trusted,tags,aux "+ - "FROM topics WHERE name=?", - topic); err != nil { + "FROM topics WHERE name=?", topic); err != nil { if err == sql.ErrNoRows { // Nothing found - clear the error err = nil @@ -1636,11 +1636,21 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) { return nil, err } - // Topic found, get subsription count. Try both topic and channel names. - channel := t.GrpToChn(topic) - if err := a.db.GetContext(ctx, &tt.SubCnt, - "SELECT COUNT(*) FROM subscriptions WHERE topic IN (?,?) AND deletedat IS NULL", topic, channel); err != nil { - return nil, err + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Topic found, get subsription count (ignoring the value set in topics.subcnt). Try both topic and channel names. + var subCnt int + if err := a.db.GetContext(ctx, &subCnt, + "SELECT COUNT(*) FROM subscriptions WHERE topic IN (?,?) AND deletedat IS NULL", topic, t.GrpToChn(topic)); err != nil { + return nil, err + } + + if subCnt != tt.SubCnt { + // Update the topic with the correct subscription count. + tt.SubCnt = subCnt + if _, err := a.db.ExecContext(ctx, "UPDATE topics SET subcnt=? WHERE name=?", subCnt, topic); err != nil { + return nil, err + } + } } tt.Owner = common.EncodeUidString(tt.Owner).String() @@ -1651,7 +1661,7 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) { } // TopicsForUser loads user's contact list: p2p and grp topics, except for 'me' & 'fnd' subscriptions. -// Reads and denormalizes Public value. +// Reads and denormalizes Public & Trusted values. func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { // Fetch ALL user's subscriptions, even those which has not been modified recently. // We are going to use these subscriptions to fetch topics and users which may have been modified recently. @@ -1699,6 +1709,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if err != nil { return nil, err } + // Must close rows manually as we will be reusing it. // Fetch subscriptions. Two queries are needed: users table (p2p) and topics table (grp). // Prepare a list of separate subscriptions to users vs topics @@ -1715,7 +1726,8 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( tcat := t.GetTopicCat(tname) if tcat == t.TopicCatMe || tcat == t.TopicCatFnd { - // One of 'me', 'fnd' subscriptions, skip. Don't skip 'sys' subscription. + // One of 'me', 'fnd' subscriptions, skip. + // Don't skip 'sys' subscription. continue } else if tcat == t.TopicCatP2P { // P2P subscription, find the other user to get user.Public and user.Trusted. @@ -1727,15 +1739,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( usrq = append(usrq, store.DecodeUid(uid1)) sub.SetWith(uid1.UserId()) } - topq = append(topq, tname) - } else { - // Group or 'sys' subscription. - if tcat == t.TopicCatGrp { - // Maybe convert channel name to topic name. - tname = t.ChnToGrp(tname) - } - topq = append(topq, tname) + } else if tcat == t.TopicCatGrp { + // Maybe convert channel name to group topic name. + tname = t.ChnToGrp(tname) } + // No special handling needed for 'slf', 'sys' subscriptions. + + topq = append(topq, tname) sub.Private = common.FromJSON(sub.Private) join[tname] = sub } @@ -1755,7 +1765,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( // Fetch grp topics and join to subscriptions. if len(topq) > 0 { - q = "SELECT updatedat,state,touchedat,name AS id,usebt,access,seqid,delid,public,trusted " + + q = "SELECT updatedat,state,touchedat,name AS id,usebt,access,seqid,delid,subcnt,public,trusted " + "FROM topics WHERE name IN (?)" q, args, _ = sqlx.In(q, topq) @@ -1776,13 +1786,12 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( args = append(args, limit) } } - q = a.db.Rebind(q) ctx2, cancel2 := a.getContext() if cancel2 != nil { defer cancel2() } - rows, err = a.db.QueryxContext(ctx2, q, args...) + rows, err = a.db.QueryxContext(ctx2, a.db.Rebind(q), args...) if err != nil { return nil, err } @@ -1792,7 +1801,6 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if err = rows.StructScan(&top); err != nil { break } - sub := join[top.Id] // Check if sub.UpdatedAt needs to be adjusted to earlier or later time. sub.UpdatedAt = common.SelectLatestTime(sub.UpdatedAt, top.UpdatedAt) @@ -1800,6 +1808,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( sub.SetTouchedAt(top.TouchedAt) sub.SetSeqId(top.SeqId) if t.GetTopicCat(sub.Topic) == t.TopicCatGrp { + sub.SetSubCnt(top.SubCnt) sub.SetPublic(common.FromJSON(top.Public)) sub.SetTrusted(common.FromJSON(top.Trusted)) } @@ -1829,13 +1838,11 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( // Ignoring ims: we need all users to get LastSeen and UserAgent. - q = a.db.Rebind(q) - ctx3, cancel3 := a.getContext() if cancel3 != nil { defer cancel3() } - rows, err = a.db.QueryxContext(ctx3, q, args...) + rows, err = a.db.QueryxContext(ctx3, a.db.Rebind(q), args...) if err != nil { return nil, err } @@ -1875,13 +1882,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( return common.SelectEarliestUpdatedSubs(subs, opts, a.maxResults), nil } -// UsersForTopic loads users subscribed to the given topic. +// UsersForTopic loads users subscribed to the given topic (not channel readers). // The difference between UsersForTopic vs SubsForTopic is that the former loads user.Public, // the latter does not. func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { tcat := t.GetTopicCat(topic) - // Fetch all subscribed users. The number of users is not large + // Fetch all subscribed users. The number of users is not large. q := `SELECT s.createdat,s.updatedat,s.deletedat,s.userid,s.topic,s.delid,s.recvseqid, s.readseqid,s.modewant,s.modegiven,u.public,u.trusted,u.lastseen,u.useragent,s.private FROM subscriptions AS s JOIN users AS u ON s.userid=u.id @@ -1929,8 +1936,9 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt if err != nil { return nil, err } + defer rows.Close() - // Fetch subscriptions + // Fetch subscriptions. var sub t.Subscription var subs []t.Subscription var lastSeen sql.NullTime @@ -1955,7 +1963,6 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt if err == nil { err = rows.Err() } - rows.Close() if err == nil && tcat == t.TopicCatP2P && len(subs) > 0 { // Swap public & lastSeen values of P2P topics as expected. @@ -1996,44 +2003,53 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt } // topicNamesForUser reads a slice of strings using provided query. -func (a *adapter) topicNamesForUser(uid t.Uid, sqlQuery string) ([]string, error) { +// if includeChan is true, the query is expected to add channel names as well as group topic names. +func (a *adapter) topicNamesForUser(sqlQuery string, includeChan bool, args ...any) ([]string, error) { ctx, cancel := a.getContext() if cancel != nil { defer cancel() } - rows, err := a.db.QueryxContext(ctx, sqlQuery, store.DecodeUid(uid)) + rows, err := a.db.QueryxContext(ctx, sqlQuery, args...) if err != nil { return nil, err } + defer rows.Close() var names []string - var name string for rows.Next() { + var name string if err = rows.Scan(&name); err != nil { break } names = append(names, name) + // If the name is a group topic, also add the channel name if requested. + if includeChan { + if channel := t.GrpToChn(name); channel != "" { + names = append(names, channel) + } + } } if err == nil { err = rows.Err() } - rows.Close() return names, err } // OwnTopics loads a slice of topic names where the user is the owner. func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) { - return a.topicNamesForUser(uid, "SELECT name FROM topics WHERE owner=?") + return a.topicNamesForUser("SELECT name FROM topics WHERE owner=? AND state!=?", + false, store.DecodeUid(uid), t.StateDeleted) } // ChannelsForUser loads a slice of topic names where the user is a channel reader and notifications (P) are enabled. func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) { - return a.topicNamesForUser(uid, - "SELECT topic FROM subscriptions WHERE userid=? AND topic LIKE 'chn%' "+ - "AND INSTR(modewant, 'P')>0 AND INSTR(modegiven, 'P')>0 AND deletedat IS NULL") + return a.topicNamesForUser("SELECT topic FROM subscriptions WHERE userid=? AND topic LIKE 'chn%' "+ + "AND INSTR(modewant,'P')>0 AND INSTR(modegiven,'P')>0 AND deletedat IS NULL", + false, store.DecodeUid(uid)) } +// TopicShare adds subscriptions to a topic and increments the topic's subcnt. func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -2057,6 +2073,7 @@ func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error { } if topic != "" { + // Update topic's subscription count. if _, err = tx.Exec("UPDATE topics SET subcnt=subcnt+? WHERE name=?", len(shares), topic); err != nil { return err } @@ -2091,8 +2108,7 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { if hard { // Delete subscriptions. If this is a channel, delete both group subscriptions and channel subscriptions. q, args, _ := sqlx.In("DELETE FROM subscriptions WHERE topic IN (?)", args) - q = tx.Rebind(q) - if _, err = tx.Exec(q, args...); err != nil { + if _, err = tx.Exec(tx.Rebind(q), args...); err != nil { return err } @@ -2111,8 +2127,7 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { now := t.TimeNow() q, args, _ := sqlx.In("UPDATE subscriptions SET updatedat=?,deletedat=? WHERE topic IN (?)", now, now, args) - q = tx.Rebind(q) - if _, err = tx.Exec(q, args...); err != nil { + if _, err = tx.Exec(tx.Rebind(q), args...); err != nil { return err } @@ -2124,6 +2139,7 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { return tx.Commit() } +// TopicUpdateOnMessage updates topic's seqid and touchedat when a new message is posted. func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error { ctx, cancel := a.getContext() if cancel != nil { @@ -2134,6 +2150,20 @@ func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error { return err } +// TopicUpdateSubCnt updates subscriber count denormalized in topic. +func (a *adapter) TopicUpdateSubCnt(topic string) error { + ctx, cancel := a.getContext() + if cancel != nil { + defer cancel() + } + _, err := a.db.ExecContext(ctx, + "UPDATE topics SET subcnt=(SELECT COUNT(*) FROM subscriptions WHERE topic IN (?,?) AND deletedat IS NULL) WHERE name=?", + topic, t.GrpToChn(topic), topic) + return err +} + +// TopicUpdate updates topic's fields given in the update map. +// If update contains UpdatedAt but not TouchedAt, TouchedAt is set to Updated func (a *adapter) TopicUpdate(topic string, update map[string]any) error { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -2192,11 +2222,13 @@ func (a *adapter) SubscriptionGet(topic string, user t.Uid, keepDeleted bool) (* if cancel != nil { defer cancel() } + query := `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid, + readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=? AND userid=?` + if !keepDeleted { + query += " AND deletedat IS NULL" + } var sub t.Subscription - err := a.db.GetContext(ctx, &sub, `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid, - readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=? AND userid=?`, - topic, store.DecodeUid(user)) - + err := a.db.GetContext(ctx, &sub, query, topic, store.DecodeUid(user)) if err != nil { if err == sql.ErrNoRows { // Nothing found - clear the error @@ -2205,10 +2237,6 @@ func (a *adapter) SubscriptionGet(topic string, user t.Uid, keepDeleted bool) (* return nil, err } - if !keepDeleted && sub.DeletedAt != nil { - return nil, nil - } - sub.Private = common.FromJSON(sub.Private) return &sub, nil @@ -2229,25 +2257,25 @@ func (a *adapter) SubsForUser(forUser t.Uid) ([]t.Subscription, error) { if err != nil { return nil, err } + defer rows.Close() var subs []t.Subscription - var ss t.Subscription + var sub t.Subscription for rows.Next() { - if err = rows.StructScan(&ss); err != nil { + if err = rows.StructScan(&sub); err != nil { break } - ss.User = forUser.String() - subs = append(subs, ss) + sub.User = forUser.String() + subs = append(subs, sub) } if err == nil { err = rows.Err() } - rows.Close() return subs, err } -// SubsForTopic fetches all subsciptions for a topic. Does NOT load Public value. +// SubsForTopic fetches all subsciptions for a topic. Does NOT load Public value and does not load channel readers. // The difference between UsersForTopic vs SubsForTopic is that the former loads user.public+trusted, // the latter does not. func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { @@ -2284,22 +2312,22 @@ func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) if err != nil { return nil, err } + defer rows.Close() var subs []t.Subscription - var ss t.Subscription + var sub t.Subscription for rows.Next() { - if err = rows.StructScan(&ss); err != nil { + if err = rows.StructScan(&sub); err != nil { break } - ss.User = common.EncodeUidString(ss.User).String() - ss.Private = common.FromJSON(ss.Private) - subs = append(subs, ss) + sub.User = common.EncodeUidString(sub.User).String() + sub.Private = common.FromJSON(sub.Private) + subs = append(subs, sub) } if err == nil { err = rows.Err() } - rows.Close() return subs, err } @@ -2337,9 +2365,14 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er return tx.Commit() } -// SubsDelete marks subscription as deleted. +// SubsDelete marks at most one subscription as deleted (soft-deleting). func (a *adapter) SubsDelete(topic string, user t.Uid) error { - tx, err := a.db.Begin() + ctx, cancel := a.getContextForTx() + if cancel != nil { + defer cancel() + } + + tx, err := a.db.BeginTxx(ctx, nil) if err != nil { return err } @@ -2350,13 +2383,10 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { } }() - ctx, cancel := a.getContext() - if cancel != nil { - defer cancel() - } - decoded_id := store.DecodeUid(user) now := t.TimeNow() + + // Mark subscription as deleted. res, err := tx.ExecContext(ctx, "UPDATE subscriptions SET updatedat=?,deletedat=? WHERE topic=? AND userid=? AND deletedat IS NULL", now, now, topic, decoded_id) @@ -2371,10 +2401,21 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { return err } - // Remove records of messages soft-deleted by this user. - _, err = tx.Exec("DELETE FROM dellog WHERE topic=? AND deletedfor=?", topic, decoded_id) - if err != nil { - return err + // Channel readers cannot delete messages. + if !t.IsChannel(topic) { + // Remove records of messages soft-deleted by this user. + _, err = tx.Exec("DELETE FROM dellog WHERE topic=? AND deletedfor=?", topic, decoded_id) + if err != nil { + return err + } + } + + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Decrement topic subscription count (only one subscription is deleted). + _, err = tx.Exec("UPDATE topics SET subcnt=subcnt-1 WHERE name=?", t.ChnToGrp(topic)) + if err != nil { + return err + } } return tx.Commit() @@ -2382,13 +2423,45 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { // subsDelForUser marks user's subscriptions as deleted. func subsDelForUser(tx *sqlx.Tx, user t.Uid, hard bool) error { - var err error + decoded_uid := store.DecodeUid(user) + + // Decrement subscription count for all topics the user is subscribed to. + rows, err := tx.Query("SELECT topic FROM subscriptions WHERE userid=? AND deletedat IS NULL", decoded_uid) + if err != nil { + return err + } + var topics []any + for rows.Next() { + var name string + if err = rows.Scan(&name); err != nil { + break + } + if t.IsChannel(name) { + // Convert channel name to group name. + name = t.ChnToGrp(name) + } + topics = append(topics, name) + } + if err == nil { + err = rows.Err() + } + rows.Close() + if err != nil || len(topics) == 0 { + return err + } + + sql, args, err := sqlx.In("UPDATE topics SET subcnt=subcnt-1 WHERE name IN (?)", topics) + _, err = tx.Exec(tx.Rebind(sql), args...) + if err != nil { + return err + } + if hard { - _, err = tx.Exec("DELETE FROM subscriptions WHERE userid=?", store.DecodeUid(user)) + _, err = tx.Exec("DELETE FROM subscriptions WHERE userid=?", decoded_uid) } else { now := t.TimeNow() _, err = tx.Exec("UPDATE subscriptions SET updatedat=?,deletedat=? WHERE userid=? AND deletedat IS NULL", - now, now, store.DecodeUid(user)) + now, now, decoded_uid) } return err } @@ -2420,18 +2493,19 @@ func (a *adapter) SubsDelForUser(user t.Uid, hard bool) error { // Find returns a list of users or group topics who match given tags, such as "email:jdoe@example.com" or "tel:+18003287448". func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, activeOnly bool) ([]t.Subscription, error) { - index := make(map[string]struct{}) var args []any stateConstraint := "" if activeOnly { args = append(args, t.StateOK) stateConstraint = "u.state=? AND " } + index := make(map[string]struct{}) allReq := t.FlattenDoubleSlice(req) for _, tag := range append(allReq, opt...) { args = append(args, tag) index[tag] = struct{}{} } + var matcher string if promoPrefix != "" { // The max number of tags is 16. Using 20 to make sure one prefix match is greater than all non-prefix matches. @@ -2440,7 +2514,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, matcher = "COUNT(*)" } - query := "SELECT u.id,u.createdat,u.updatedat,0,u.access,0,u.public,u.trusted,u.tags," + matcher + " AS matches " + + query := "SELECT u.id,u.createdat,u.updatedat,0,u.access,0 AS subcnt,u.public,u.trusted,u.tags," + matcher + " AS matches " + "FROM users AS u LEFT JOIN usertags AS tg ON tg.userid=u.id " + "WHERE " + stateConstraint + "tg.tag IN (?" + strings.Repeat(",?", len(allReq)+len(opt)-1) + ") " + "GROUP BY u.id,u.createdat,u.updatedat,u.access,u.public,u.trusted,u.tags " @@ -2471,7 +2545,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, query += q args = append(args, a...) } - query += "ORDER BY matches DESC LIMIT ?" + query += "ORDER BY matches DESC, subcnt DESC LIMIT ?" args = append(args, a.maxResults) ctx, cancel := a.getContext() @@ -2486,7 +2560,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, } defer rows.Close() - // Fetch subscriptions + // Read results as subscriptions. var public, trusted any var access t.DefaultAccess var subcnt int @@ -2505,13 +2579,14 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, if id, err := strconv.ParseInt(sub.Topic, 10, 64); err == nil { sub.Topic = store.EncodeUid(id).UserId() if sub.Topic == caller { - // Skip the callee + // Skip the caller. continue } } if isChan { - // This is a channel, convert grp to chn name. + // This is a channel, convert grp to chn name: all channel-capable + // topics should appear as channels in search results. sub.Topic = t.GrpToChn(sub.Topic) } @@ -2532,7 +2607,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, return subs, err } -// FindOne returns topic or user which matches the given tag. +// FindOne returns the first topic or user which matches the given tag. func (a *adapter) FindOne(tag string) (string, error) { var args []any @@ -2553,10 +2628,12 @@ func (a *adapter) FindOne(tag string) (string, error) { if cancel != nil { defer cancel() } + rows, err := a.db.QueryxContext(ctx, query, args...) if err != nil { return "", err } + defer rows.Close() var found string if rows.Next() { @@ -2564,6 +2641,7 @@ func (a *adapter) FindOne(tag string) (string, error) { return "", err } + // Check if the found value is a topic name or a user ID. // User IDs are returned as decoded decimal strings. if id, err := strconv.ParseInt(found, 10, 64); err == nil { found = store.EncodeUid(id).UserId() @@ -2572,7 +2650,6 @@ func (a *adapter) FindOne(tag string) (string, error) { if err == nil { err = rows.Err() } - rows.Close() return found, err } @@ -2585,8 +2662,7 @@ func (a *adapter) MessageSave(msg *t.Message) error { } // store assignes message ID, but we don't use it. Message IDs are not used anywhere. // Using a sequential ID provided by the database. - res, err := a.db.ExecContext( - ctx, + res, err := a.db.ExecContext(ctx, "INSERT INTO messages(createdAt,updatedAt,seqid,topic,`from`,head,content) VALUES(?,?,?,?,?,?,?)", msg.CreatedAt, msg.UpdatedAt, msg.SeqId, msg.Topic, store.DecodeUid(t.ParseUid(msg.From)), msg.Head, common.ToJSON(msg.Content)) @@ -2598,6 +2674,7 @@ func (a *adapter) MessageSave(msg *t.Message) error { return err } +// MessageGetAll returns messages matching the query. func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ([]t.Message, error) { var limit = a.maxMessageResults @@ -2644,10 +2721,10 @@ func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ( " WHERE m.delid=0 AND m.topic=? "+seqIdConstraint+" AND d.deletedfor IS NULL"+ " ORDER BY m.seqid DESC LIMIT ?", args...) - if err != nil { return nil, err } + defer rows.Close() msgs := make([]t.Message, 0, limit) for rows.Next() { @@ -2662,7 +2739,7 @@ func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ( if err == nil { err = rows.Err() } - rows.Close() + return msgs, err } @@ -2697,6 +2774,7 @@ func (a *adapter) MessageGetDeleted(topic string, forUser t.Uid, opts *t.QueryOp if err != nil { return nil, err } + defer rows.Close() var dellog struct { Topic string @@ -2734,7 +2812,6 @@ func (a *adapter) MessageGetDeleted(topic string, forUser t.Uid, opts *t.QueryOp if err == nil { err = rows.Err() } - rows.Close() if err == nil { if dmsg.DelId > 0 { @@ -2874,7 +2951,9 @@ func deviceHasher(deviceID string) string { return strconv.FormatUint(uint64(hasher.Sum64()), 16) } -// Device management for push notifications +// Device management for push notifications. + +// DeviceUpsert creates or updates a device record. func (a *adapter) DeviceUpsert(uid t.Uid, def *t.DeviceDef) error { hash := deviceHasher(def.DeviceId) @@ -2908,6 +2987,7 @@ func (a *adapter) DeviceUpsert(uid t.Uid, def *t.DeviceDef) error { return tx.Commit() } +// DeviceGetAll returns all devices for a given set of users. func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, error) { var unums []any for _, uid := range uids { @@ -2923,6 +3003,7 @@ func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, err if err != nil { return nil, 0, err } + defer rows.Close() var device struct { Userid int64 @@ -2952,7 +3033,6 @@ func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, err if err == nil { err = rows.Err() } - rows.Close() return result, count, err } @@ -2975,6 +3055,7 @@ func deviceDelete(tx *sqlx.Tx, uid t.Uid, deviceID string) error { return err } +// DeviceDelete deletes a device record (push token). func (a *adapter) DeviceDelete(uid t.Uid, deviceID string) error { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -3055,7 +3136,7 @@ func (a *adapter) CredUpsert(cred *t.Credential) (bool, error) { return false, err } // Assume that the record exists and try to update it: undelete, update timestamp and response value. - res, err := tx.Exec("UPDATE credentials SET updatedat=?,deletedat=NULL,resp=?,done=0 WHERE synthetic=?", + res, err := tx.Exec("UPDATE credentials SET updatedat=?,deletedat=NULL,resp=?,done=FALSE WHERE synthetic=?", cred.UpdatedAt, cred.Resp, synth) if err != nil { return false, err @@ -3118,7 +3199,7 @@ func credDel(tx *sqlx.Tx, uid t.Uid, method, value string) error { } // Case 2.1 - res, err = tx.Exec("DELETE FROM credentials"+constraints+" AND (done=true OR retries=0)", args...) + res, err = tx.Exec("DELETE FROM credentials"+constraints+" AND (done=TRUE OR retries=0)", args...) if err != nil { return err } @@ -3172,8 +3253,8 @@ func (a *adapter) CredConfirm(uid t.Uid, method string) error { } res, err := a.db.ExecContext( ctx, - "UPDATE credentials SET updatedat=?,done=true,synthetic=CONCAT(method,':',value) "+ - "WHERE userid=? AND method=? AND deletedat IS NULL AND done=false", + "UPDATE credentials SET updatedat=?,done=TRUE,synthetic=CONCAT(method,':',value) "+ + "WHERE userid=? AND method=? AND deletedat IS NULL AND done=FALSE", t.TimeNow(), store.DecodeUid(uid), method) if err != nil { if isDupe(err) { @@ -3193,7 +3274,7 @@ func (a *adapter) CredFail(uid t.Uid, method string) error { if cancel != nil { defer cancel() } - _, err := a.db.ExecContext(ctx, "UPDATE credentials SET updatedat=?,retries=retries+1 WHERE userid=? AND method=? AND done=false", + _, err := a.db.ExecContext(ctx, "UPDATE credentials SET updatedat=?,retries=retries+1 WHERE userid=? AND method=? AND done=FALSE", t.TimeNow(), store.DecodeUid(uid), method) return err } @@ -3206,7 +3287,7 @@ func (a *adapter) CredGetActive(uid t.Uid, method string) (*t.Credential, error) } var cred t.Credential err := a.db.GetContext(ctx, &cred, "SELECT createdat,updatedat,method,value,resp,done,retries "+ - "FROM credentials WHERE userid=? AND deletedat IS NULL AND method=? AND done=false", + "FROM credentials WHERE userid=? AND deletedat IS NULL AND method=? AND done=FALSE", store.DecodeUid(uid), method) if err != nil { if err == sql.ErrNoRows { @@ -3228,7 +3309,7 @@ func (a *adapter) CredGetAll(uid t.Uid, method string, validatedOnly bool) ([]t. args = append(args, method) } if validatedOnly { - query += " AND done=true" + query += " AND done=TRUE" } ctx, cancel := a.getContext() @@ -3337,10 +3418,11 @@ func (a *adapter) FileGet(fid string) (*t.FileDef, error) { fd.User = common.EncodeUidString(fd.User).String() return &fd, nil - } -// FileDeleteUnused deletes file upload records. +// FileDeleteUnused deletes records where UseCount is zero. If olderThan is non-zero, deletes +// unused records with UpdatedAt before olderThan. +// Returns array of FileDef.Location of deleted filerecords so actual files can be deleted too. func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, error) { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -3373,6 +3455,7 @@ func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, er if err != nil { return nil, err } + defer rows.Close() var locations []string var ids []any @@ -3390,7 +3473,6 @@ func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, er if err == nil { err = rows.Err() } - rows.Close() if err != nil { return nil, err diff --git a/server/db/postgres/adapter.go b/server/db/postgres/adapter.go index 2985a6e3..5dd4b3b1 100644 --- a/server/db/postgres/adapter.go +++ b/server/db/postgres/adapter.go @@ -214,8 +214,7 @@ func (a *adapter) GetDbVersion() (int, error) { defer cancel() } var vers string - err := a.db.QueryRow(ctx, "SELECT value FROM kvmeta WHERE key = $1", "version").Scan(&vers) - + err := a.db.QueryRow(ctx, "SELECT value FROM kvmeta WHERE key='version'").Scan(&vers) if err != nil { if isMissingDb(err) || isMissingTable(err) || err == pgx.ErrNoRows { err = errors.New("Database not initialized") @@ -234,7 +233,7 @@ func (a *adapter) updateDbVersion(v int) error { defer cancel() } a.version = -1 - if _, err := a.db.Exec(ctx, "UPDATE kvmeta SET value = $1 WHERE key = $2", strconv.Itoa(v), "version"); err != nil { + if _, err := a.db.Exec(ctx, `UPDATE kvmeta SET "value"=$1 WHERE "key"='version'`, strconv.Itoa(v)); err != nil { return err } return nil @@ -430,7 +429,8 @@ func (a *adapter) CreateDb(reset bool) error { ); CREATE UNIQUE INDEX topics_name ON topics(name); CREATE INDEX topics_owner ON topics(owner); - CREATE INDEX topics_state_stateat ON topics(state, stateat);`); err != nil { + CREATE INDEX topics_state_stateat ON topics(state, stateat); + CREATE INDEX topics_name_state_seqid ON topics(name, state, seqid);`); err != nil { return err } @@ -473,7 +473,8 @@ func (a *adapter) CreateDb(reset bool) error { ); CREATE UNIQUE INDEX subscriptions_topic_userid ON subscriptions(topic, userid); CREATE INDEX subscriptions_topic ON subscriptions(topic); - CREATE INDEX subscriptions_deletedat ON subscriptions(deletedat);`); err != nil { + CREATE INDEX subscriptions_deletedat ON subscriptions(deletedat); + CREATE INDEX subscriptions_userid_topic_deletedat ON subscriptions(userid, topic, deletedat);`); err != nil { return err } @@ -587,16 +588,6 @@ func (a *adapter) CreateDb(reset bool) error { return err } - // Find relevant subscriptions for given users efficiently, and use the join key too. - if _, err = tx.Exec(ctx, "CREATE INDEX idx_subs_user_topic_del ON subscriptions(userid, topic, deletedat)"); err != nil { - return err - } - - // Optimizes join; state filters; seqid supports the SUM operation. - if _, err = tx.Exec(ctx, "CREATE INDEX idx_topics_name_state_seqid ON topics(name, state, seqid)"); err != nil { - return err - } - return tx.Commit(ctx) } @@ -944,7 +935,6 @@ func (a *adapter) UserGet(uid t.Uid) (*t.User, error) { var user t.User var id int64 - row, err := a.db.Query(ctx, "SELECT * FROM users WHERE id=$1 AND state!=$2", store.DecodeUid(uid), t.StateDeleted) if err != nil { return nil, err @@ -990,12 +980,8 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { users = nil break } - - if user.State == t.StateDeleted { - continue - } - user.SetUid(store.EncodeUid(id)) + users = append(users, user) } if err == nil { @@ -1008,6 +994,13 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) { // UserDelete deletes specified user: wipes completely (hard-delete) or marks as deleted. // TODO: report when the user is not found. func (a *adapter) UserDelete(uid t.Uid, hard bool) error { + // Get a list of topic names owned by the user (as 'grp' and 'chn'). + ownTopics, err := a.topicNamesForUser("SELECT name FROM topics WHERE owner=$1 AND state!=$2", + true, store.DecodeUid(uid), t.StateDeleted) + if err != nil { + return err + } + ctx, cancel := a.getContextForTx() if cancel != nil { defer cancel() @@ -1053,14 +1046,16 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { decoded_uid); err != nil { return err } + + // Deletion of messages will cascade to filemsglinks and so to fileuploads. if _, err = tx.Exec(ctx, "DELETE FROM messages USING topics WHERE topics.name=messages.topic AND topics.owner=$1", decoded_uid); err != nil { return err } - // Delete all subscriptions. - if _, err = tx.Exec(ctx, "DELETE FROM subscriptions USING topics WHERE topics.name=subscriptions.topic AND topics.owner=$1", - decoded_uid); err != nil { + // Delete subscriptions for all users where the user is the owner of the topic. + sql, args, _ := sqlx.In("DELETE FROM subscriptions AS s WHERE topic IN (?)", ownTopics) + if _, err = tx.Exec(ctx, sqlx.Rebind(sqlx.DOLLAR, sql), args...); err != nil { return err } @@ -1099,18 +1094,17 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { } // Disable all subscriptions to topics where the user is the owner. - if _, err = tx.Exec(ctx, "UPDATE subscriptions SET updatedat=$1, deletedat=$2 "+ - "FROM topics WHERE subscriptions.topic=topics.name AND topics.owner=$3", - now, now, decoded_uid); err != nil { + sql, args, _ := sqlx.In("UPDATE subscriptions SET s.updatedat=?,s.deletedat=? WHERE topic IN (?)", now, now, ownTopics) + if _, err = tx.Exec(ctx, sqlx.Rebind(sqlx.DOLLAR, sql), args...); err != nil { return err } // Disable group topics where the user is the owner. - if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1, touchedat=$2, state=$3, stateat=$4 WHERE owner=$5", + if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1, touchedat=$2,state=$3,stateat=$4 WHERE owner=$5", now, now, t.StateDeleted, now, decoded_uid); err != nil { return err } // Disable p2p topics with the user (p2p topic's owner is 0). - if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1, touchedat=$2, state=$3, stateat=$4 "+ + if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1,touchedat=$2,state=$3,stateat=$4 "+ "FROM subscriptions WHERE topics.name=subscriptions.topic "+ "AND topics.owner=0 AND subscriptions.userid=$5", now, now, t.StateDeleted, now, decoded_uid); err != nil { @@ -1118,7 +1112,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { } // Disable the other user's subscription to a disabled p2p topic. - if _, err = tx.Exec(ctx, "UPDATE subscriptions AS s_one SET updatedat=$1, deletedat=$2 "+ + if _, err = tx.Exec(ctx, "UPDATE subscriptions AS s_one SET updatedat=$1,deletedat=$2 "+ "FROM subscriptions AS s_two WHERE s_one.topic=s_two.topic "+ "AND s_two.userid=$3 AND s_two.topic LIKE 'p2p%'", now, now, decoded_uid); err != nil { @@ -1126,7 +1120,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { } // Disable user. - if _, err = tx.Exec(ctx, "UPDATE users SET updatedat=$1, state=$2, stateat=$3 WHERE id=$4", + if _, err = tx.Exec(ctx, "UPDATE users SET updatedat=$1,state=$2,stateat=$3 WHERE id=$4", now, t.StateDeleted, now, decoded_uid); err != nil { return err } @@ -1136,6 +1130,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { } // topicStateForUser is called by UserUpdate when the update contains state change. +// Soft-deleted topics remain soft-deleted. func (a *adapter) topicStateForUser(ctx context.Context, tx pgx.Tx, decoded_uid int64, now time.Time, update any) error { var err error @@ -1265,13 +1260,13 @@ func (a *adapter) UserUpdateTags(uid t.Uid, add, remove, reset []string) ([]stri if err != nil { return nil, err } + defer rows.Close() for rows.Next() { var tag string rows.Scan(&tag) allTags = append(allTags, tag) } - rows.Close() _, err = tx.Exec(ctx, "UPDATE users SET tags=$1 WHERE id=$2", t.StringSlice(allTags), decoded_uid) if err != nil { @@ -1303,6 +1298,7 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) { // UserUnreadCount returns the total number of unread messages in all topics with // the R permission. If read fails, the counts are still returned with the original // user IDs but with the unread count undefined and non-nil error. +// UserUnreadCount does not count unread messages in channels although it should. func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { uids := make([]any, len(ids)) counts := make(map[t.Uid]int, len(ids)) @@ -1312,15 +1308,15 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { counts[id] = 0 } - query, uids := expandQuery("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+ - "WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+ - "POSITION('R' IN s.modewant)>0 AND POSITION('R' IN s.modegiven)>0 GROUP BY s.userid", uids, t.StateDeleted) - ctx, cancel := a.getContext() if cancel != nil { defer cancel() } + // FIXME: support channels. + query, uids := expandQuery("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+ + "WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+ + "POSITION('R' IN s.modewant)>0 AND POSITION('R' IN s.modegiven)>0 GROUP BY s.userid", uids, t.StateDeleted) rows, err := a.db.Query(ctx, query, uids...) if err != nil { return counts, err @@ -1489,7 +1485,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { topic := &t.Topic{ObjHeader: t.ObjHeader{Id: initiator.Topic}} topic.ObjHeader.MergeTimes(&initiator.ObjHeader) topic.TouchedAt = initiator.GetTouchedAt() - topic.SubCnt = 2 err = a.topicCreate(ctx, tx, topic) if err != nil { return err @@ -1504,20 +1499,11 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) { if cancel != nil { defer cancel() } - tx, err := a.db.BeginTx(ctx, pgx.TxOptions{}) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - tx.Rollback(ctx) - } - }() // Fetch topic by name var tt = new(t.Topic) var owner int64 - err = tx.QueryRow(ctx, + err := a.db.QueryRow(ctx, "SELECT createdat,updatedat,state,stateat,touchedat,name AS id,usebt,access,owner,seqid,delid,subcnt,public,trusted,tags,aux "+ "FROM topics WHERE name=$1", topic).Scan(&tt.CreatedAt, &tt.UpdatedAt, &tt.State, &tt.StateAt, &tt.TouchedAt, &tt.Id, @@ -1526,31 +1512,30 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) { if err == pgx.ErrNoRows { // Nothing found - clear the error err = nil - // Force close transaction. - tx.Rollback(ctx) } return nil, err } - // Topic found, get subsription count. Try both topic and channel names. - channel := t.GrpToChn(topic) - var subCnt int - if err = tx.QueryRow(ctx, - "SELECT COUNT(*) FROM subscriptions WHERE topic IN ($1,$2) AND deletedat IS NULL", topic, channel).Scan(&subCnt); err != nil { - return nil, err - } - - if subCnt != tt.SubCnt { - // Update the topic with the correct subscription count. - tt.SubCnt = subCnt - if _, err = tx.Exec(ctx, "UPDATE topics SET subcnt=$1 WHERE name=$2", subCnt, topic); err != nil { + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Topic found, get subsription count. Try both topic and channel names. + var subCnt int + if err = a.db.QueryRow(ctx, + "SELECT COUNT(*) FROM subscriptions WHERE topic IN ($1,$2) AND deletedat IS NULL", topic, t.GrpToChn(topic)). + Scan(&subCnt); err != nil { return nil, err } + + if subCnt != tt.SubCnt { + // Update the topic with the correct subscription count. + tt.SubCnt = subCnt + if _, err = a.db.Exec(ctx, "UPDATE topics SET subcnt=$1 WHERE name=$2", subCnt, topic); err != nil { + return nil, err + } + } } tt.Owner = store.EncodeUid(owner).String() - err = tx.Commit(ctx) return tt, err } @@ -1566,8 +1551,9 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( // Filter out deleted rows. q += " AND deletedat IS NULL" } + limit := 0 - ipg := time.Time{} + ims := time.Time{} if opts != nil { if opts.Topic != "" { q += " AND topic=?" @@ -1583,7 +1569,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( limit = a.maxResults } } else { - ipg = *opts.IfModifiedSince + ims = *opts.IfModifiedSince } } else { limit = a.maxResults @@ -1604,6 +1590,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if err != nil { return nil, err } + // Must close rows manually as we will be reusing it. // Fetch subscriptions. Two queries are needed: users table (p2p) and topics table (grp). // Prepare a list of separate subscriptions to users vs topics @@ -1624,7 +1611,8 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( tcat := t.GetTopicCat(tname) if tcat == t.TopicCatMe || tcat == t.TopicCatFnd { - // One of 'me', 'fnd' subscriptions, skip. Don't skip 'sys' subscription. + // One of 'me', 'fnd' subscriptions, skip. + // Don't skip 'sys' subscription. continue } else if tcat == t.TopicCatP2P { // P2P subscription, find the other user to get user.Public and user.Trusted. @@ -1636,15 +1624,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( usrq = append(usrq, store.DecodeUid(uid1)) sub.SetWith(uid1.UserId()) } - topq = append(topq, tname) - } else { - // Group or 'sys' subscription. - if tcat == t.TopicCatGrp { - // Maybe convert channel name to topic name. - tname = t.ChnToGrp(tname) - } - topq = append(topq, tname) + } else if tcat == t.TopicCatGrp { + // Maybe convert channel name to topic name. + tname = t.ChnToGrp(tname) } + // No special handling needed for 'slf', 'sys' subscriptions. + + topq = append(topq, tname) sub.Private = common.FromJSON(sub.Private) join[tname] = sub } @@ -1664,7 +1650,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( // Fetch grp topics and join to subscriptions. if len(topq) > 0 { - q = "SELECT updatedat,state,touchedat,name AS id,usebt,access,seqid,delid,public,trusted " + + q = "SELECT updatedat,state,touchedat,name AS id,usebt,access,seqid,delid,subcnt,public,trusted " + "FROM topics WHERE name IN (?)" newargs := []any{topq} @@ -1674,10 +1660,10 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( newargs = append(newargs, t.StateDeleted) } - if !ipg.IsZero() { + if !ims.IsZero() { // Use cache timestamp if provided: get newer entries only. q += " AND touchedat>?" - newargs = append(newargs, ipg) + newargs = append(newargs, ims) if limit > 0 && limit < len(topq) { // No point in fetching more than the requested limit. @@ -1699,7 +1685,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( var top t.Topic for rows.Next() { if err = rows.Scan(&top.UpdatedAt, &top.State, &top.TouchedAt, &top.Id, &top.UseBt, - &top.Access, &top.SeqId, &top.DelId, &top.Public, &top.Trusted); err != nil { + &top.Access, &top.SeqId, &top.DelId, &top.SubCnt, &top.Public, &top.Trusted); err != nil { break } @@ -1710,6 +1696,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( sub.SetTouchedAt(top.TouchedAt) sub.SetSeqId(top.SeqId) if t.GetTopicCat(sub.Topic) == t.TopicCatGrp { + sub.SetSubCnt(top.SubCnt) sub.SetPublic(top.Public) sub.SetTrusted(top.Trusted) } @@ -1731,7 +1718,6 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( q = "SELECT id,updatedat,state,access,lastseen,useragent,public,trusted " + "FROM users WHERE id IN (?)" newargs := []any{usrq} - if !keepDeleted { // Optionally skip deleted users. q += " AND state!=?" @@ -1746,12 +1732,10 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if cancel3 != nil { defer cancel3() } - rows, err = a.db.Query(ctx3, q, newargs...) if err != nil { return nil, err } - defer rows.Close() for rows.Next() { var usr2 t.User @@ -1776,6 +1760,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( if err == nil { err = rows.Err() } + rows.Close() if err != nil { return nil, err @@ -1915,24 +1900,30 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt } // topicNamesForUser reads a slice of strings using provided query. -func (a *adapter) topicNamesForUser(uid t.Uid, sqlQuery string) ([]string, error) { +func (a *adapter) topicNamesForUser(sqlQuery string, includeChan bool, args ...any) ([]string, error) { ctx, cancel := a.getContext() if cancel != nil { defer cancel() } - rows, err := a.db.Query(ctx, sqlQuery, store.DecodeUid(uid)) + rows, err := a.db.Query(ctx, sqlQuery, args...) if err != nil { return nil, err } defer rows.Close() var names []string - var name string for rows.Next() { + var name string if err = rows.Scan(&name); err != nil { break } names = append(names, name) + // If the name is a group topic, also add the channel name if requested. + if includeChan { + if channel := t.GrpToChn(name); channel != "" { + names = append(names, channel) + } + } } if err == nil { err = rows.Err() @@ -1943,17 +1934,18 @@ func (a *adapter) topicNamesForUser(uid t.Uid, sqlQuery string) ([]string, error // OwnTopics loads a slice of topic names where the user is the owner. func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) { - return a.topicNamesForUser(uid, "SELECT name FROM topics WHERE owner=$1") + return a.topicNamesForUser("SELECT name FROM topics WHERE owner=$1 AND state!=$2", + false, store.DecodeUid(uid), t.StateDeleted) } // ChannelsForUser loads a slice of topic names where the user is a channel reader and notifications (P) are enabled. func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) { - return a.topicNamesForUser(uid, - "SELECT topic FROM subscriptions WHERE userid=$1 AND topic LIKE 'chn%' "+ - "AND POSITION('P' IN modewant)>0 AND POSITION('P' IN modegiven)>0 AND deletedat IS NULL") + return a.topicNamesForUser("SELECT topic FROM subscriptions WHERE userid=$1 AND topic LIKE 'chn%' "+ + "AND POSITION('P' IN modewant)>0 AND POSITION('P' IN modegiven)>0 AND deletedat IS NULL", + false, store.DecodeUid(uid)) } -// TopicShare creates topic subscriptions. +// TopicShare creates topic subscriptions and increments the topic's subcnt. func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -2011,7 +2003,6 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { if hard { // Delete subscriptions. If this is a channel, delete both group subscriptions and channel subscriptions. q, args := expandQuery("DELETE FROM subscriptions WHERE topic IN (?)", args) - if _, err = tx.Exec(ctx, q, args...); err != nil { return err } @@ -2029,8 +2020,8 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { } } else { now := t.TimeNow() - q, args := expandQuery("UPDATE subscriptions SET updatedat=?,deletedat=? WHERE topic IN (?)", now, now, args) + q, args := expandQuery("UPDATE subscriptions SET updatedat=?,deletedat=? WHERE topic IN (?)", now, now, args) if _, err = tx.Exec(ctx, q, args); err != nil { return err } @@ -2053,6 +2044,18 @@ func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error { return err } +// TopicUpdateSubCnt updates subscriber count denormalized in topic. +func (a *adapter) TopicUpdateSubCnt(topic string) error { + ctx, cancel := a.getContext() + if cancel != nil { + defer cancel() + } + _, err := a.db.Exec(ctx, + "UPDATE topics SET subcnt=(SELECT COUNT(*) FROM subscriptions WHERE topic IN ($1,$2) AND deletedat IS NULL) WHERE name=$1", + topic, t.GrpToChn(topic)) + return err +} + func (a *adapter) TopicUpdate(topic string, update map[string]any) error { ctx, cancel := a.getContextForTx() if cancel != nil { @@ -2111,12 +2114,15 @@ func (a *adapter) SubscriptionGet(topic string, user t.Uid, keepDeleted bool) (* if cancel != nil { defer cancel() } + query := `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid, + readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=$1 AND userid=$2` + if !keepDeleted { + query += " AND deletedat IS NULL" + } var sub t.Subscription var userId int64 var modeWant, modeGiven []byte - err := a.db.QueryRow(ctx, `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid, - readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=$1 AND userid=$2`, - topic, store.DecodeUid(user)).Scan(&sub.CreatedAt, &sub.UpdatedAt, &sub.DeletedAt, &userId, + err := a.db.QueryRow(ctx, query, topic, store.DecodeUid(user)).Scan(&sub.CreatedAt, &sub.UpdatedAt, &sub.DeletedAt, &userId, &sub.Topic, &sub.DelId, &sub.RecvSeqId, &sub.ReadSeqId, &modeWant, &modeGiven, &sub.Private) if err != nil { @@ -2127,10 +2133,6 @@ func (a *adapter) SubscriptionGet(topic string, user t.Uid, keepDeleted bool) (* return nil, err } - if !keepDeleted && sub.DeletedAt != nil { - return nil, nil - } - sub.User = store.EncodeUid(userId).String() sub.ModeWant.Scan(modeWant) sub.ModeGiven.Scan(modeGiven) @@ -2185,7 +2187,6 @@ func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=?` args := []any{topic} - if !keepDeleted { // Filter out deleted rows. q += " AND deletedat IS NULL" @@ -2258,12 +2259,12 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er }() cols, args := common.UpdateByMap(update) - args = append(args, topic) q := "UPDATE subscriptions SET " + strings.Join(cols, ",") + " WHERE topic=?" + args = append(args, topic) if !user.IsZero() { // Update just one topic subscription - args = append(args, store.DecodeUid(user)) q += " AND userid=?" + args = append(args, store.DecodeUid(user)) } q, args = expandQuery(q, args...) @@ -2274,7 +2275,7 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er return tx.Commit(ctx) } -// SubsDelete marks subscription as deleted. +// SubsDelete marks at most one subscription as deleted. func (a *adapter) SubsDelete(topic string, user t.Uid) error { ctx, cancel := a.getContext() if cancel != nil { @@ -2308,16 +2309,21 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { return err } - // Remove records of messages soft-deleted by this user. - _, err = tx.Exec(ctx, "DELETE FROM dellog WHERE topic=$1 AND deletedfor=$2", topic, decoded_id) - if err != nil { - return err + // Channel readers cannot delete messages. + if !t.IsChannel(topic) { + // Remove records of messages soft-deleted by this user. + _, err = tx.Exec(ctx, "DELETE FROM dellog WHERE topic=$1 AND deletedfor=$2", topic, decoded_id) + if err != nil { + return err + } } - // Decrement topic subscription count. - _, err = tx.Exec(ctx, "UPDATE topics SET subcnt=subcnt-1 WHERE name=$1", topic) - if err != nil { - return err + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Decrement topic subscription count (only one subscription is deleted). + _, err = tx.Exec(ctx, "UPDATE topics SET subcnt=subcnt-1 WHERE name=$1", topic) + if err != nil { + return err + } } return tx.Commit(ctx) @@ -2325,8 +2331,15 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error { // subsDelForUser marks user's subscriptions as deleted. func subsDelForUser(ctx context.Context, tx pgx.Tx, user t.Uid, hard bool) error { - var err error - // TODO: update subscriber count in topics. + decoded_uid := store.DecodeUid(user) + + // Decrement subscription count for all topics the user is subscribed to. + _, err := tx.Exec(ctx, "UPDATE topics AS t LEFT JOIN subscriptions AS s ON t.name=s.topic "+ + "SET t.subcnt=t.subcnt-1 WHERE s.userid=$1 AND s.deletedat IS NULL", decoded_uid) + if err != nil { + return err + } + if hard { // Hard delete: remove all subscriptions for the user. _, err = tx.Exec(ctx, "DELETE FROM subscriptions WHERE userid=$1;", store.DecodeUid(user)) @@ -2374,11 +2387,10 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, stateConstraint = "u.state=? AND " } allReq := t.FlattenDoubleSlice(req) - allTags := append(allReq, opt...) - for _, tag := range allTags { + for _, tag := range append(allReq, opt...) { + args = append(args, tag) index[tag] = struct{}{} } - args = append(args, allTags) var matcher string if promoPrefix != "" { @@ -2388,7 +2400,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, matcher = "COUNT(*)" } - query := "SELECT CAST(u.id AS VARCHAR) AS topic,u.createdat,u.updatedat,FALSE,u.access::jsonb,0,u.public::jsonb,u.trusted::jsonb,u.tags::jsonb," + + query := "SELECT CAST(u.id AS VARCHAR) AS topic,u.createdat,u.updatedat,FALSE,u.access::jsonb,0 AS subcnt,u.public::jsonb,u.trusted::jsonb,u.tags::jsonb," + matcher + " AS matches " + "FROM users AS u LEFT JOIN usertags AS tg ON tg.userid=u.id " + "WHERE " + stateConstraint + "tg.tag IN (?) " + @@ -2396,7 +2408,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, if len(allReq) > 0 { q, a := common.DisjunctionSql(req, "tg.tag") query += q - args = append(args, a) + args = append(args, a...) } query += "UNION ALL " @@ -2407,7 +2419,9 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, } else { stateConstraint = "" } - args = append(args, allTags) + for _, tag := range append(allReq, opt...) { + args = append(args, tag) + } query += "SELECT t.name AS topic,t.createdat,t.updatedat,t.usebt,t.access::jsonb,t.subcnt,t.public::jsonb,t.trusted::jsonb,t.tags::jsonb," + matcher + " AS matches " + @@ -2417,15 +2431,15 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, if len(allReq) > 0 { q, a := common.DisjunctionSql(req, "tg.tag") query += q - args = append(args, a) + args = append(args, a...) } - - query, args = expandQuery(query+"ORDER BY matches DESC LIMIT ?", args, a.maxResults) + query, args = expandQuery(query+"ORDER BY matches DESC, subcnt DESC LIMIT ?", args, a.maxResults) ctx, cancel := a.getContext() if cancel != nil { defer cancel() } + // Get users matched by tags, sort by number of matches from high to low. rows, err := a.db.Query(ctx, query, args...) if err != nil { @@ -2433,6 +2447,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, } defer rows.Close() + // Fetch subscriptions var public, trusted any var access t.DefaultAccess var subcnt int @@ -2442,8 +2457,8 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, var sub t.Subscription var subs []t.Subscription for rows.Next() { - if err = rows.Scan(&sub.Topic, &sub.CreatedAt, &sub.UpdatedAt, &isChan, &access, - &subcnt, &public, &trusted, &setTags, &ignored); err != nil { + if err = rows.Scan(&sub.Topic, &sub.CreatedAt, &sub.UpdatedAt, &isChan, &access, &subcnt, + &public, &trusted, &setTags, &ignored); err != nil { subs = nil break } @@ -2457,6 +2472,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, } if isChan { + // This is a channel, convert grp to chn name. sub.Topic = t.GrpToChn(sub.Topic) } @@ -2519,7 +2535,6 @@ func (a *adapter) FindOne(tag string) (string, error) { found = store.EncodeUid(id).UserId() } } - if err == nil { err = rows.Err() } @@ -2868,18 +2883,17 @@ func (a *adapter) DeviceUpsert(uid t.Uid, def *t.DeviceDef) error { } func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, error) { - var unupg []any + var unums []any for _, uid := range uids { - unupg = append(unupg, store.DecodeUid(uid)) + unums = append(unums, store.DecodeUid(uid)) } - query, unupg := expandQuery("SELECT userid,deviceid,platform,lastseen,lang FROM devices WHERE userid IN (?)", unupg) - + query, unums := expandQuery("SELECT userid,deviceid,platform,lastseen,lang FROM devices WHERE userid IN (?)", unums) ctx, cancel := a.getContext() if cancel != nil { defer cancel() } - rows, err := a.db.Query(ctx, query, unupg...) + rows, err := a.db.Query(ctx, query, unums...) if err != nil { return nil, 0, err } @@ -3079,7 +3093,7 @@ func credDel(ctx context.Context, tx pgx.Tx, uid t.Uid, method, value string) er } // Case 2.1 - res, err = tx.Exec(ctx, "DELETE FROM credentials"+where+" AND (done=true OR retries=0)", args...) + res, err = tx.Exec(ctx, "DELETE FROM credentials"+where+" AND (done=TRUE OR retries=0)", args...) if err != nil { return err } @@ -3133,7 +3147,7 @@ func (a *adapter) CredConfirm(uid t.Uid, method string) error { } res, err := a.db.Exec( ctx, - "UPDATE credentials SET updatedat=$1,done=true,synthetic=CONCAT(method,':',value) "+ + "UPDATE credentials SET updatedat=$1,done=TRUE,synthetic=CONCAT(method,':',value) "+ "WHERE userid=$2 AND method=$3 AND deletedat IS NULL AND done=FALSE", t.TimeNow(), store.DecodeUid(uid), method) if err != nil { @@ -3170,14 +3184,12 @@ func (a *adapter) CredGetActive(uid t.Uid, method string) (*t.Credential, error) err := a.db.QueryRow(ctx, "SELECT createdat,updatedat,method,value,resp,done,retries "+ "FROM credentials WHERE userid=$1 AND deletedat IS NULL AND method=$2 AND done=FALSE", store.DecodeUid(uid), method).Scan(&cred.CreatedAt, &cred.UpdatedAt, &cred.Method, &cred.Value, &cred.Resp, &cred.Done, &cred.Retries) - if err != nil { if err == pgx.ErrNoRows { err = nil } return nil, err } - cred.User = uid.String() return &cred, nil @@ -3309,11 +3321,10 @@ func (a *adapter) FileGet(fid string) (*t.FileDef, error) { return nil, err } - fd.SetUid(store.EncodeUid(ID)) + fd.Id = common.EncodeUidString(fd.Id).String() fd.User = store.EncodeUid(userId).String() return &fd, nil - } // FileDeleteUnused deletes file upload records. @@ -3336,7 +3347,6 @@ func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, er query := "SELECT fu.id,fu.location FROM fileuploads AS fu LEFT JOIN filemsglinks AS fml ON fml.fileid=fu.id " + "WHERE fml.id IS NULL" var args []any - if !olderThan.IsZero() { query += " AND fu.updatedat 0 { + // 1. Delete dellog + // 2. Decrement use counter of fileuploads: topic itself and messages. + // 3. Delete all messages. + // 4. Delete subscriptions. + if _, err = rdb.DB(a.dbName).Table("topics").GetAll(ownTopics...).ForEach( + func(topic rdb.Term) rdb.Term { + return rdb.Expr([]any{ + // Delete dellog + rdb.DB(a.dbName).Table("dellog").Between( + []any{topic.Field("Id"), rdb.MinVal}, + []any{topic.Field("Id"), rdb.MaxVal}, + rdb.BetweenOpts{Index: "Topic_DelId"}).Delete(), + // Decrement topic attachment UseCounter + rdb.DB(a.dbName).Table("fileuploads").GetAll(topic.Field("Attachments")). + Update(func(fu rdb.Term) any { + return map[string]any{"UseCount": fu.Field("UseCount").Default(1).Sub(1)} + }), + // Decrement message attachments UseCounter + rdb.DB(a.dbName).Table("fileuploads").GetAll( + rdb.Args( + rdb.DB(a.dbName).Table("messages").Between( + []any{topic.Field("Id"), rdb.MinVal}, + []any{topic.Field("Id"), rdb.MaxVal}, + rdb.BetweenOpts{Index: "Topic_SeqId"}). + // Fetch messages with attachments only + Filter(func(msg rdb.Term) rdb.Term { + return msg.HasFields("Attachments") + }). + // Flatten arrays + ConcatMap(func(row rdb.Term) any { return row.Field("Attachments") }). + CoerceTo("array"))). + Update(func(fu rdb.Term) any { + return map[string]any{"UseCount": fu.Field("UseCount").Default(1).Sub(1)} + }), + // Delete messages + rdb.DB(a.dbName).Table("messages").Between( + []any{topic.Field("Id"), rdb.MinVal}, + []any{topic.Field("Id"), rdb.MaxVal}, + rdb.BetweenOpts{Index: "Topic_SeqId"}).Delete(), + // Delete subscriptions + rdb.DB(a.dbName).Table("subscriptions"). + GetAllByIndex("Topic", topic.Field("Id")).Delete(), + }) + }).RunWrite(a.conn); err != nil { + return err + } + + // And finally delete the topics. + if _, err = rdb.DB(a.dbName).Table("topics").GetAllByIndex("Owner", uid.String()). + Delete().RunWrite(a.conn); err != nil { + return err + } } // Delete user's authentication records. @@ -886,33 +900,150 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error { now := t.TimeNow() disable := map[string]any{ - "UpdatedAt": now, "State": t.StateDeleted, "StateAt": now, + "UpdatedAt": now, + "State": t.StateDeleted, + "StateAt": now, } - if _, err = rdb.DB(a.dbName).Table("topics").GetAllByIndex("Owner", uid.String()).ForEach( - func(topic rdb.Term) rdb.Term { - return rdb.Expr([]any{ - // Disable subscriptions for topics where the user is the owner. - rdb.DB(a.dbName).Table("subscriptions"). - GetAllByIndex("Topic", topic.Field("Id")). - Update(disable), - // Disable topics where the user is the owner. - rdb.DB(a.dbName).Table("topics"). - Get(topic.Field("Id")). - Update(map[string]any{ - "UpdatedAt": now, "TouchedAt": now, "State": t.StateDeleted, "StateAt": now, - }), - }) - }).RunWrite(a.conn); err != nil { + disableSub := map[string]any{ + "UpdatedAt": now, + "DeletedAt": now, + } + if len(ownTopics) > 0 { + // Disable all subscriptions in topics where the user is the owner. + if _, err = rdb.DB(a.dbName).Table("subscriptions"). + GetAllByIndex("Topic", ownTopics...). + Update(disableSub). + RunWrite(a.conn); err != nil { + return err + } + + // Disable topics where the user is the owner. + if _, err = rdb.DB(a.dbName).Table("topics"). + GetAll(ownTopics...). + Update(disable). + RunWrite(a.conn); err != nil { + return err + } + } + + // Disable p2p topics with the user. + p2pTopics, err := a.p2pTopicsForUser(uid) + if err != nil { return err } + if len(p2pTopics) > 0 { + // Disable all subscriptions in p2p topics with the user. + if _, err = rdb.DB(a.dbName).Table("subscriptions"). + GetAllByIndex("Topic", p2pTopics...). + Update(disableSub). + RunWrite(a.conn); err != nil { + return err + } + // Disable p2p topics with the user. + if _, err = rdb.DB(a.dbName).Table("topics"). + GetAll(p2pTopics...). + Update(disable). + RunWrite(a.conn); err != nil { + return err + } + } - // FIXME: disable p2p topics with the user. - - _, err = rdb.DB(a.dbName).Table("users").Get(uid.String()).Update(disable).RunWrite(a.conn) + // Disable the user (same fields as topic). + _, err = rdb.DB(a.dbName).Table("users").Get(uid.String()). + Update(disable).RunWrite(a.conn) } return err } +// Delete records of messages soft-deleted for the user in all topics. +func (a *adapter) clearUserDellog(uid t.Uid, topics []any) error { + var err error + forUser := uid.String() + if topics == nil { + // Get a list of all topics where the user has subscriptions. + topics, err = a.topicNamesForUser(rdb.DB(a.dbName). + Table("subscriptions"). + GetAllByIndex("User", forUser). + Field("Topic"), false) + if err != nil { + return err + } + } + + // No need to convert channel names to group names: + // channel readers cannot delete messages. + + // Remove current user from the messages' soft-deletion lists + // in all topics where the user has subscriptions. + _, err = rdb.DB(a.dbName).Table("topics").GetAll(topics...). + ForEach(func(topic rdb.Term) rdb.Term { + return rdb.DB(a.dbName).Table("messages").Between( + []any{topic.Field("Id"), forUser, rdb.MinVal}, + []any{topic.Field("Id"), forUser, rdb.MaxVal}, + rdb.BetweenOpts{Index: "Topic_DeletedFor"}). + Update(map[string]any{ + // Take the DeletedFor array, subtract all values which contain current user ID in 'User' field. + "DeletedFor": rdb.Row.Field("DeletedFor"). + SetDifference( + rdb.Row.Field("DeletedFor"). + Filter(map[string]any{"User": forUser}))}) + }).RunWrite(a.conn) + if err != nil { + return err + } + + // Delete entries in dellog for this user in all topics where the user + // has subscriptions. + _, err = rdb.DB(a.dbName).Table("topics").GetAll(topics...). + ForEach(func(topic rdb.Term) rdb.Term { + return rdb.DB(a.dbName).Table("dellog"). + // Select all log entries for the given table. + Between( + []any{topic.Field("Id"), rdb.MinVal}, + []any{topic.Field("Id"), rdb.MaxVal}, + rdb.BetweenOpts{Index: "Topic_DelId"}). + // Keep entries soft-deleted for the current user only. + Filter(rdb.Row.Field("DeletedFor").Eq(forUser)). + // Delete them. + Delete() + }).RunWrite(a.conn) + + return err +} + +// topicNamesForUser returns a list of topic names by query. +func (a *adapter) topicNamesForUser(query rdb.Term, includeChan bool) ([]any, error) { + cursor, err := query.Run(a.conn) + if err != nil { + return nil, err + } + defer cursor.Close() + + var result []string + if err = cursor.All(&result); err != nil { + return nil, err + } + + var args []any + for _, name := range result { + args = append(args, name) + if includeChan { + // Append 'chn' topic names for each 'grp' name. + if channel := t.GrpToChn(name); channel != "" { + args = append(args, channel) + } + } + } + return args, nil +} + +func (a *adapter) p2pTopicsForUser(uid t.Uid) ([]any, error) { + return a.topicNamesForUser(rdb.DB(a.dbName).Table("subscriptions"). + GetAllByIndex("User", uid.String()). + Field("Topic"). + Filter(rdb.Row.Field("Topic").Match("^p2p")), false) +} + // topicStateForUser is called by UserUpdate when the update contains state change. func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error { state, ok := update.(t.ObjState) @@ -1039,6 +1170,7 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) { // UserUnreadCount returns the total number of unread messages in all topics with // the R permission. If read fails, the counts are still returned with the original // user IDs but with the unread count undefined and non-nil error. +// UserUnreadCount does not count unread messages in channels although it should. func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) { // The call expects user IDs to be plain strings like "356zaYaumiU". uids := make([]any, len(ids)) @@ -1147,8 +1279,6 @@ func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time, limit int) ([] return uids, err } -// ***************************** - // TopicCreate creates a topic from template func (a *adapter) TopicCreate(topic *t.Topic) error { _, err := rdb.DB(a.dbName).Table("topics").Insert(&topic).RunWrite(a.conn) @@ -1193,7 +1323,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error { topic := &t.Topic{ObjHeader: t.ObjHeader{Id: initiator.Topic}} topic.ObjHeader.MergeTimes(&initiator.ObjHeader) topic.TouchedAt = initiator.GetTouchedAt() - topic.SubCnt = 2 return a.TopicCreate(topic) } @@ -1215,27 +1344,30 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) { // The cursor is automatically closed by executing cursor.One. - // Topic found, get subsription count. Try both topic and channel names. - if cursor, err = rdb.DB(a.dbName).Table("subscriptions"). - GetAllByIndex("Topic", topic, t.GrpToChn(topic)). - Filter(rdb.Row.HasFields("DeletedAt").Not()). - Count().Run(a.conn); err != nil { - return nil, err - } - subCnt := 0 - if err = cursor.One(&subCnt); err != nil { - return nil, err - } - // No need to close the cursor. - - if subCnt != tt.SubCnt { - // Update the topic with the correct subscription count. - tt.SubCnt = subCnt - if _, err = rdb.DB(a.dbName).Table("topics").Get(topic). - Update(map[string]any{"SubCnt": subCnt}).RunWrite(a.conn); err != nil { + if t.GetTopicCat(topic) == t.TopicCatGrp { + // Topic found, get subsription count. Try both topic and channel names. + if cursor, err = rdb.DB(a.dbName).Table("subscriptions"). + GetAllByIndex("Topic", topic, t.GrpToChn(topic)). + Filter(rdb.Row.HasFields("DeletedAt").Not()). + Count().Run(a.conn); err != nil { return nil, err } + subCnt := 0 + if err = cursor.One(&subCnt); err != nil { + return nil, err + } + // No need to close the cursor. + + if subCnt != tt.SubCnt { + // Update the topic with the correct subscription count. + tt.SubCnt = subCnt + if _, err = rdb.DB(a.dbName).Table("topics").Get(topic). + Update(map[string]any{"SubCnt": subCnt}).RunWrite(a.conn); err != nil { + return nil, err + } + } } + return tt, nil } @@ -1305,15 +1437,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( usrq = append(usrq, uid1.String()) sub.SetWith(uid1.UserId()) } - topq = append(topq, tname) - } else { - // Group or sys subscription. - if tcat == t.TopicCatGrp { - // Maybe convert channel name to topic name. - tname = t.ChnToGrp(tname) - } - topq = append(topq, tname) + } else if tcat == t.TopicCatGrp { + // Maybe convert channel name to topic name. + tname = t.ChnToGrp(tname) } + // No special handling needed for 'slf', 'sys' subscriptions. + + topq = append(topq, tname) join[tname] = sub } err = cursor.Err() @@ -1360,6 +1490,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( sub.SetTouchedAt(top.TouchedAt) sub.SetSeqId(top.SeqId) if t.GetTopicCat(sub.Topic) == t.TopicCatGrp { + sub.SetSubCnt(top.SubCnt) sub.SetPublic(top.Public) sub.SetTrusted(top.Trusted) } @@ -1418,7 +1549,9 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ( return common.SelectEarliestUpdatedSubs(subs, opts, a.maxResults), nil } -// UsersForTopic loads users subscribed to the given topic +// UsersForTopic loads users subscribed to the given topic (not channel readers). +// The difference between UsersForTopic vs SubsForTopic is that the former loads user.Public, +// the latter does not. func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) { tcat := t.GetTopicCat(topic) @@ -1564,7 +1697,7 @@ func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) { return names, nil } -// TopicShare creates topic subscriptions. +// TopicShare adds subscriptions to a topic and increments the topic's subcnt. func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error { // Assign Ids. for _, sub := range shares { @@ -1589,7 +1722,7 @@ func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error { if err == nil && topic != "" { _, err = rdb.DB(a.dbName).Table("topics"). Get(topic). - Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").Default(0).Sub(len(shares))}). + Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").Default(0).Add(len(shares))}). RunWrite(a.conn) } return err @@ -1628,7 +1761,6 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error { // TopicUpdateOnMessage deserializes message-related values into topic. func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error { - update := struct { SeqId int TouchedAt time.Time @@ -1640,6 +1772,19 @@ func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error { return err } +// TopicUpdateSubCnt updates subscriber count denormalized in topic. +func (a *adapter) TopicUpdateSubCnt(topic string) error { + _, err := rdb.DB(a.dbName).Table("topics"). + Get(topic). + Update(map[string]any{ + "SubCnt": rdb.Table("subscriptions"). + GetAllByIndex("Topic", topic, t.GrpToChn(topic)). + Filter(rdb.Row.HasFields("DeletedAt").Not()). + Count(), + }).RunWrite(a.conn) + return err +} + // TopicUpdate performs a generic topic update. func (a *adapter) TopicUpdate(topic string, update map[string]any) error { if t, u := update["TouchedAt"], update["UpdatedAt"]; t == nil && u != nil { @@ -1757,18 +1902,30 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er return err } -// SubsDelete marks subscription as deleted. +// SubsDelete marks at most one subscription as deleted. func (a *adapter) SubsDelete(topic string, user t.Uid) error { now := t.TimeNow() forUser := user.String() // Mark subscription as deleted. - _, err := rdb.DB(a.dbName).Table("subscriptions"). + res, err := rdb.DB(a.dbName).Table("subscriptions"). Get(topic + ":" + forUser).Update(map[string]any{ "UpdatedAt": now, "DeletedAt": now, }).RunWrite(a.conn) + if err != nil { + return err + } + if res.Replaced == 0 { + // Nothing was updated, nothing more to do. + return t.ErrNotFound + } + + // Decrement topic's SubCnt. + _, err = rdb.DB(a.dbName).Table("topics").Get(topic). + Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").Default(1).Sub(1)}). + RunWrite(a.conn) if err != nil { return err } @@ -1840,49 +1997,28 @@ func (a *adapter) subsDelForTopic(topic string, isChan, hard bool) error { return err } -// subsDelForUser deletes or marks all subscriptions of a given user as deleted +// subsDelForUser marks all subscriptions of a given user as deleted. func (a *adapter) subsDelForUser(user t.Uid, hard bool) error { var err error forUser := user.String() - // Iterate over user's subscriptions. - _, err = rdb.DB(a.dbName).Table("subscriptions").GetAllByIndex("User", forUser).ForEach( - func(sub rdb.Term) rdb.Term { - return rdb.Expr([]any{ - // Delete dellog - rdb.DB(a.dbName).Table("dellog"). - // Select all log entries for the given table. - Between( - []any{sub.Field("Topic"), rdb.MinVal}, - []any{sub.Field("Topic"), rdb.MaxVal}, - rdb.BetweenOpts{Index: "Topic_DelId"}). - // Keep entries soft-deleted for the current user only. - Filter(func(dellog rdb.Term) rdb.Term { - return dellog.Field("DeletedFor").Eq(forUser) - }). - // Delete them. - Delete(), - // Remove user from the messages' soft-deletion lists. - rdb.DB(a.dbName).Table("messages"). - // Select user's soft-deleted messages in the current table. - Between( - []any{sub.Field("Topic"), forUser, rdb.MinVal}, - []any{sub.Field("Topic"), forUser, rdb.MaxVal}, - rdb.BetweenOpts{Index: "Topic_DeletedFor"}). - // Update the field DeletedFor: - Update(func(msg rdb.Term) any { - return map[string]any{ - // Take the array, subtract all values with the current user ID. - "DeletedFor": msg.Field("DeletedFor"). - SetDifference( - msg.Field("DeletedFor"). - Filter(map[string]any{"User": forUser})), - } - }), - }) - }).RunWrite(a.conn) + // Get all topics the user is subscribed to. Channels are left as channels. + topics, err := a.topicNamesForUser(rdb.DB(a.dbName).Table("subscriptions"). + GetAllByIndex("User", forUser), false) + if err != nil { + return err + } + // 1. Decrement SubCnt in topic. + if _, err = rdb.DB(a.dbName).Table("topics").Get(topics...). + Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt"). + Default(1).Sub(1)}). + RunWrite(a.conn); err != nil { + return err + } + + err = a.clearUserDellog(user, topics) if err != nil { return err } @@ -2014,7 +2150,6 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string, } return subs, cursor.Err() - } // FindOne returns topic or user which matches the given tag. diff --git a/server/hub.go b/server/hub.go index f77ee04e..4ecf2197 100644 --- a/server/hub.go +++ b/server/hub.go @@ -625,6 +625,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) { desc.Public = stopic.Public desc.Trusted = stopic.Trusted desc.IsChan = stopic.UseBt + desc.SubCnt = stopic.SubCnt if stopic.Owner == msg.AsUser { desc.DefaultAcs = &MsgDefaultAcsMode{ Auth: stopic.Access.Auth.String(), diff --git a/server/init_topic.go b/server/init_topic.go index 98ff6b94..3c3cf1db 100644 --- a/server/init_topic.go +++ b/server/init_topic.go @@ -615,6 +615,8 @@ func initTopicNewGrp(t *Topic, sreg *ClientComMessage, isChan bool) error { } t.xoriginal = t.name // keeping 'new' or 'nch' as original has no value to the client + t.subCnt = 1 // One subscription, the owner. + pktsub.Created = true pktsub.Newsub = true @@ -659,6 +661,7 @@ func initTopicGrp(t *Topic) error { } t.lastID = stopic.SeqId t.delID = stopic.DelId + t.subCnt = stopic.SubCnt // Initialize channel for receiving session online updates. t.supd = make(chan *sessionUpdate, 32) diff --git a/server/store/mock_store/mock_store.go b/server/store/mock_store/mock_store.go index 03547808..1315b7d4 100644 --- a/server/store/mock_store/mock_store.go +++ b/server/store/mock_store/mock_store.go @@ -922,6 +922,20 @@ func (mr *MockTopicsPersistenceInterfaceMockRecorder) Update(topic, update inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockTopicsPersistenceInterface)(nil).Update), topic, update) } +// UpdateSubCnt mocks base method. +func (m *MockTopicsPersistenceInterface) UpdateSubCnt(topic string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateSubCnt", topic) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateSubCnt indicates an expected call of UpdateSubCnt. +func (mr *MockTopicsPersistenceInterfaceMockRecorder) UpdateSubCnt(topic interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSubCnt", reflect.TypeOf((*MockTopicsPersistenceInterface)(nil).UpdateSubCnt), topic) +} + // MockSubsPersistenceInterface is a mock of SubsPersistenceInterface interface. type MockSubsPersistenceInterface struct { ctrl *gomock.Controller diff --git a/server/store/store.go b/server/store/store.go index 40377c20..e43fe541 100644 --- a/server/store/store.go +++ b/server/store/store.go @@ -511,6 +511,7 @@ type TopicsPersistenceInterface interface { GetSubs(topic string, opts *types.QueryOpt) ([]types.Subscription, error) GetSubsAny(topic string, opts *types.QueryOpt) ([]types.Subscription, error) Update(topic string, update map[string]any) error + UpdateSubCnt(topic string) error OwnerChange(topic string, newOwner types.Uid) error Delete(topic string, isChan, hard bool) error } @@ -585,6 +586,11 @@ func (topicsMapper) GetSubsAny(topic string, opts *types.QueryOpt) ([]types.Subs return adp.SubsForTopic(topic, true, opts) } +// UpdateSubCnt refreshes subscriber count value denormalized in topic. +func (topicsMapper) UpdateSubCnt(topic string) error { + return adp.TopicUpdateSubCnt(topic) +} + // Update is a generic topic update. func (topicsMapper) Update(topic string, update map[string]any) error { if _, ok := update["UpdatedAt"]; !ok { @@ -652,7 +658,8 @@ func (subsMapper) Update(topic string, user types.Uid, update map[string]any) er return adp.SubsUpdate(topic, user, update) } -// Delete deletes a subscription +// Delete deletes a subscription. +// To delete channel subscription the channel name must be explicitly specified. func (subsMapper) Delete(topic string, user types.Uid) error { return adp.SubsDelete(topic, user) } diff --git a/server/store/types/types.go b/server/store/types/types.go index 39727659..8552ae88 100644 --- a/server/store/types/types.go +++ b/server/store/types/types.go @@ -207,6 +207,8 @@ func ParseUserId(s string) Uid { } // GrpToChn converts group topic name to corresponding channel name. +// If it's a non-group channel topic, the name is returned unchanged. +// If it's neither, an empty string is returned. func GrpToChn(grp string) string { if strings.HasPrefix(grp, "grp") { return strings.Replace(grp, "grp", "chn", 1) @@ -227,6 +229,7 @@ func IsChannel(name string) bool { // ChnToGrp gets group topic name from channel name. // If it's a non-channel group topic, the name is returned unchanged. +// If it's neither, an empty string is returned. func ChnToGrp(chn string) string { if strings.HasPrefix(chn, "chn") { return strings.Replace(chn, "chn", "grp", 1) @@ -925,7 +928,7 @@ type Subscription struct { // Timestamp & user agent of when the user was last online. lastSeenUA *LastSeenUA - // Group topics only: count of subscribers. + // Count of subscribers. subCnt int // P2P only. ID of the other user @@ -1126,7 +1129,7 @@ type Topic struct { DelId int // Count of topic subscribers. - SubCnt int `json:"SubCnt,omitempty" bson:",omitempty"` + SubCnt int Public any Trusted any @@ -1388,7 +1391,7 @@ func GetTopicCat(name string) TopicCat { } // IsEphemeralTopic checks if the topic is ephemeral, i.e. it's a reference to the user, -// it's not stored in the 'topics' table. +// it's not stored in the 'topics' table like 'me' or 'fnd' topics. func IsEphemeralTopic(topic string) bool { cat := GetTopicCat(topic) return cat == TopicCatMe || cat == TopicCatFnd diff --git a/server/topic.go b/server/topic.go index ed9a309d..b4f7473c 100644 --- a/server/topic.go +++ b/server/topic.go @@ -47,6 +47,10 @@ type Topic struct { // ID of the deletion operation. Not an ID of the message. delID int + // Total count of subscribers (excluding deleted). + // This is different from subsCount() for channels. + subCnt int + // Last published userAgent ('me' topic only) userAgent string @@ -546,6 +550,13 @@ func (t *Topic) handleTopicTermination(sd *shutDown) { s.detachSession(t.name) } + if t.cat == types.TopicCatGrp { + // Update topic subscriber count. + if err := store.Topics.UpdateSubCnt(t.name); err != nil { + logs.Warn.Println("topic update sub cnt:", err) + } + } + usersRegisterTopic(t, false) // Report completion back to sender, if 'done' is not nil. @@ -807,7 +818,7 @@ func (t *Topic) handleLeaveRequest(msg *ClientComMessage, sess *Session) { if !meUid.IsZero() { // Update user's last online timestamp & user agent. Only one user can be subscribed to 'me' topic. if err := store.Users.UpdateLastSeen(meUid, mrs.userAgent, now); err != nil { - logs.Warn.Println(err) + logs.Warn.Println("user update last seen:", err) } } case types.TopicCatFnd: @@ -1377,8 +1388,8 @@ func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error { asUid := types.ParseUserId(msg.AsUser) - if !msgsub.Newsub && (t.cat == types.TopicCatP2P || t.cat == types.TopicCatGrp || t.cat == types.TopicCatSys) { - // Check if this is a new subscription. + if !msgsub.Newsub && (t.cat == types.TopicCatP2P || t.cat == types.TopicCatGrp) { + // Check if this is a new subscription (P2P & GRP only. SLF, SYS are excluded here). pud, found := t.perUser[asUid] msgsub.Newsub = !found || pud.deleted } @@ -1429,6 +1440,11 @@ func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error { userData.online++ t.perUser[asUid] = userData } + + if t.cat == types.TopicCatGrp && msgsub.Newsub { + // Increment subscriber count for new group subscriptions only. + t.subCnt++ + } } params := map[string]any{} @@ -2087,8 +2103,11 @@ func (t *Topic) replyGetDesc(sess *Session, asUid types.Uid, _ bool, opts *MsgGe pud, full := t.perUser[asUid] full = full || t.cat == types.TopicCatMe + if t.cat == types.TopicCatGrp { desc.IsChan = t.isChan + desc.SubCnt = t.subCnt + logs.Info.Println("replyGetDesc: grp topic", t.name, "subs", t.subCnt) } if ifUpdated { @@ -2601,6 +2620,8 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level UserAgent: sub.GetUserAgent(), } } + + mts.SubCnt = sub.GetSubCnt() } } else { // Mark subscriptions that the user does not care about. @@ -2631,6 +2652,7 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level if !sub.UpdatedAt.IsZero() { mts.UpdatedAt = &sub.UpdatedAt } + if isReader && !banned { mts.ReadSeqId = sub.ReadSeqId mts.RecvSeqId = sub.RecvSeqId @@ -2662,6 +2684,7 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level mts.Acs.Mode = defacs.Auth.String() } } + mts.SubCnt = sub.GetSubCnt() } // Returning public and private only if they have changed since ifModified @@ -3393,7 +3416,7 @@ func (t *Topic) replyDelSub(sess *Session, asUid types.Uid, msg *ClientComMessag return nil } -// replyLeaveUnsub is request to unsubscribe user and detach all user's sessions from topic. +// replyLeaveUnsub is a request to unsubscribe user and detach all user's sessions from topic. func (t *Topic) replyLeaveUnsub(sess *Session, msg *ClientComMessage, asUid types.Uid) error { now := types.TimeNow() @@ -3467,6 +3490,11 @@ func (t *Topic) replyLeaveUnsub(sess *Session, msg *ClientComMessage, asUid type // Notify plugins. pluginSubscription(&types.Subscription{Topic: t.name, User: asUid.String()}, plgActDel) + if t.cat == types.TopicCatGrp { + // Decrement group's cached member count. + t.subCnt-- + } + // If all P2P users were deleted, suspend the topic to let it shut down. if t.cat == types.TopicCatP2P && t.subsCount() == 0 { t.markPaused(true) @@ -3824,7 +3852,9 @@ func (t *Topic) accessFor(authLvl auth.Level) types.AccessMode { return selectAccessMode(authLvl, t.accessAnon, t.accessAuth, getDefaultAccess(t.cat, true, false)) } -// subsCount returns the number of topic subscribers +// subsCount returns the number of topic subscribers. This method is different from subCnt with respect to channels: +// * subsCount counts subscribers + attached channel users. +// * subCnt counts all subscribers (including all channel users). func (t *Topic) subsCount() int { if t.cat == types.TopicCatP2P { count := 0