support for subscriber count, bug fixes in cleaning up deleted channels

This commit is contained in:
or-else
2025-10-06 16:12:26 +03:00
parent bfe265658b
commit a0c11d56be
12 changed files with 1121 additions and 742 deletions
+9 -1
View File
@@ -448,6 +448,7 @@ type MsgTopicDesc struct {
RecvSeqId int `json:"recv,omitempty"`
// Id of the last delete operation as seen by the requesting user
DelId int `json:"clear,omitempty"`
SubCnt int `json:"subcnt,omitempty"`
Public any `json:"public,omitempty"`
Trusted any `json:"trusted,omitempty"`
// Per-subscription private data
@@ -475,6 +476,9 @@ func (src *MsgTopicDesc) describe() string {
if src.DelId != 0 {
s += " clear=" + strconv.Itoa(src.DelId)
}
if src.SubCnt != 0 {
s += " subcnt=" + strconv.Itoa(src.SubCnt)
}
if src.Public != nil {
s += " pub='...'"
}
@@ -529,7 +533,8 @@ type MsgTopicSub struct {
SeqId int `json:"seq,omitempty"`
// Id of the latest Delete operation
DelId int `json:"clear,omitempty"`
// Number of subscribers, group topics only.
SubCnt int `json:"subcnt,omitempty"`
// P2P topics in 'me' {get subs} response:
// Other user's last online timestamp & user agent
@@ -550,6 +555,9 @@ func (src *MsgTopicSub) describe() string {
}
if src.DelId != 0 {
s += " clear=" + strconv.Itoa(src.DelId)
}
if src.SubCnt != 0 {
s += " subcnt=" + strconv.Itoa(src.SubCnt)
}
if src.Public != nil {
s += " pub='...'"
+2
View File
@@ -121,6 +121,8 @@ type Adapter interface {
TopicDelete(topic string, isChan, hard bool) error
// TopicUpdateOnMessage increments Topic's or User's SeqId value and updates TouchedAt timestamp.
TopicUpdateOnMessage(topic string, msg *t.Message) error
// TopicUpdateSubCnt refreshes denormalized topic subscriber count.
TopicUpdateSubCnt(topic string) error
// TopicUpdate updates topic record.
TopicUpdate(topic string, update map[string]any) error
// TopicOwnerChange updates topic's owner
+330 -239
View File
@@ -40,12 +40,12 @@ type adapter struct {
}
const (
defaultHost = "localhost:27017"
defaultDatabase = "tinode"
adpVersion = 116
adapterName = "mongodb"
defaultHost = "localhost:27017"
defaultDatabase = "tinode"
defaultMaxResults = 1024
// This is capped by the Session's send queue limit (128).
defaultMaxMessageResults = 100
@@ -79,6 +79,20 @@ type configType struct {
APIVersion mdbopts.ServerAPIVersion `json:"api_version,omitempty"`
}
func (a *adapter) maybeStartTransaction(sess mdb.Session) error {
if a.useTransactions {
return sess.StartTransaction()
}
return nil
}
func (a *adapter) maybeCommitTransaction(ctx context.Context, sess mdb.Session) error {
if a.useTransactions {
return sess.CommitTransaction(ctx)
}
return nil
}
// Open initializes mongodb session
func (a *adapter) Open(jsonconfig json.RawMessage) error {
if a.conn != nil {
@@ -237,6 +251,15 @@ func (a *adapter) GetDbVersion() (int, error) {
return result.Value, nil
}
func (a *adapter) updateDbVersion(v int) error {
a.version = -1
_, err := a.db.Collection("kvmeta").UpdateOne(a.ctx,
b.M{"_id": "version"},
b.M{"$set": b.M{"value": v}},
)
return err
}
// CheckDbVersion checks if the actual database version matches adapter version.
func (a *adapter) CheckDbVersion() error {
version, err := a.GetDbVersion()
@@ -549,15 +572,6 @@ func (a *adapter) UpgradeDb() error {
return nil
}
func (a *adapter) updateDbVersion(v int) error {
a.version = -1
_, err := a.db.Collection("kvmeta").UpdateOne(a.ctx,
b.M{"_id": "version"},
b.M{"$set": b.M{"value": v}},
)
return err
}
// Create system topic 'sys'.
func createSystemTopic(a *adapter) error {
now := t.TimeNow()
@@ -623,34 +637,23 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) {
}
user.Public = unmarshalBsonD(user.Public)
user.Trusted = unmarshalBsonD(user.Trusted)
users = append(users, user)
}
return users, nil
}
func (a *adapter) maybeStartTransaction(sess mdb.Session) error {
if a.useTransactions {
return sess.StartTransaction()
}
return nil
}
func (a *adapter) maybeCommitTransaction(ctx context.Context, sess mdb.Session) error {
if a.useTransactions {
return sess.CommitTransaction(ctx)
}
return nil
}
// UserDelete deletes user record.
// UserDelete deletes specified user: wipes completely (hard-delete) or marks as deleted.
func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
forUser := uid.String()
// Select topics where the user is the owner.
topicIds, err := a.db.Collection("topics").Distinct(a.ctx, "_id", b.M{"owner": forUser})
ownTopics, err := a.topicNamesForUser("topics",
b.M{"owner": forUser, "state": b.M{"$ne": t.StateDeleted}}, "_id", true)
if err != nil {
return err
}
topicFilter := b.M{"topic": b.M{"$in": topicIds}}
ownTopicsFilter := b.M{"topic": b.M{"$in": ownTopics}}
var sess mdb.Session
if sess, err = a.conn.StartSession(); err != nil {
@@ -665,19 +668,32 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
if err = mdb.WithSession(a.ctx, sess, func(sc mdb.SessionContext) error {
if hard {
// No need to delete user's devices: devices are stored in user's record and will be deleted with it.
// Delete user's subscriptions in all topics and decrement subcnt in topic.
if err = a.subsDelete(sc, b.M{"user": forUser}, true); err != nil {
return err
}
// Delete user's dellog entries in all topics.
err = a.clearUserDellog(sc, forUser)
if err != nil {
return err
}
// Can't delete user's messages in all topics because we cannot notify topics of such deletion.
// Or we have to delete these messages one by one.
// For now, just leave the messages there marked as sent by "not found" user.
// Just leave the messages there marked as sent by "not found" user.
// Delete topics where the user is the owner:
if len(topicIds) > 0 {
if len(ownTopics) > 0 {
// 1. Delete dellog
// 2. Decrement fileuploads.
// 3. Delete all messages.
// 4. Delete subscriptions.
// Delete dellog entries.
_, err = a.db.Collection("dellog").DeleteMany(sc, topicFilter)
// Delete dellog for topics owned by the user.
_, err = a.db.Collection("dellog").DeleteMany(sc, ownTopicsFilter)
if err != nil {
return err
}
@@ -685,61 +701,37 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
// Decrement fileuploads UseCounter
// First get array of attachments IDs that were used in messages of topics from topicIds
// Then decrement the usecount field of these file records
err = a.decFileUseCounter(sc, "messages", b.M{"topic": b.M{"$in": topicIds}})
err = a.decFileUseCounter(sc, "messages", ownTopicsFilter)
if err != nil {
return err
}
// Decrement use counter for topic avatars
err = a.decFileUseCounter(sc, "topics", b.M{"_id": b.M{"$in": topicIds}})
// Decrement use counter for topic avatars.
err = a.decFileUseCounter(sc, "topics", b.M{"_id": b.M{"$in": ownTopics}})
if err != nil {
return err
}
// Delete messages
_, err = a.db.Collection("messages").DeleteMany(sc, topicFilter)
_, err = a.db.Collection("messages").DeleteMany(sc, ownTopicsFilter)
if err != nil {
return err
}
// Delete subscriptions
_, err = a.db.Collection("subscriptions").DeleteMany(sc, topicFilter)
// Delete subscriptions for all users where the user is the owner of the topic.
_, err = a.db.Collection("subscriptions").DeleteMany(sc, ownTopicsFilter)
if err != nil {
return err
}
// No need to delete topic tags: they are stored in topic record and will be deleted with it.
// And finally delete the topics.
if _, err = a.db.Collection("topics").DeleteMany(sc, b.M{"owner": forUser}); err != nil {
return err
}
}
// Select all other topics where the user is a subscriber.
topicIds, err = a.db.Collection("subscriptions").Distinct(sc, "topic", b.M{"user": forUser})
if err != nil {
return err
}
if len(topicIds) > 0 {
// Delete user's dellog entries.
if _, err = a.db.Collection("dellog").DeleteMany(sc,
b.M{"topic": b.M{"$in": topicIds}, "deletedfor": forUser}); err != nil {
return err
}
// Delete user's markings of soft-deleted messages
filter := b.M{"topic": b.M{"$in": topicIds}, "deletedfor.user": forUser}
if _, err = a.db.Collection("messages").
UpdateMany(sc, filter, b.M{"$pull": b.M{"deletedfor": b.M{"user": forUser}}}); err != nil {
return err
}
// Delete user's subscriptions in all topics.
if err = a.subsDelete(sc, b.M{"user": forUser}, true); err != nil {
return err
}
}
// Delete user's authentication records.
if _, err = a.authDelAllRecords(sc, uid); err != nil {
return err
@@ -755,6 +747,8 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
return err
}
// No need to delete user's tags: they are stored in user's record and will be deleted with it.
// And finally delete the user.
if _, err = a.db.Collection("users").DeleteOne(sc, b.M{"_id": forUser}); err != nil {
return err
@@ -769,18 +763,37 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
disable := b.M{"$set": b.M{"updatedat": now, "state": t.StateDeleted, "stateat": now}}
// Disable subscriptions for topics where the user is the owner.
if _, err = a.db.Collection("subscriptions").UpdateMany(sc, topicFilter, disable); err != nil {
if _, err = a.db.Collection("subscriptions").UpdateMany(sc, ownTopicsFilter, disable); err != nil {
return err
}
// Disable topics where the user is the owner.
if _, err = a.db.Collection("topics").UpdateMany(sc, b.M{"_id": b.M{"$in": topicIds}},
// Disable group topics where the user is the owner.
if _, err = a.db.Collection("topics").UpdateMany(sc, b.M{"_id": b.M{"$in": ownTopics}},
b.M{"$set": b.M{
"updatedat": now, "touchedat": now, "state": t.StateDeleted, "stateat": now,
}}); err != nil {
return err
}
// FIXME: disable p2p topics with the user.
// Disable p2p topics with the user.
p2pTopics, err := a.p2pTopicsForUser(uid)
if err != nil {
return err
}
if len(p2pTopics) > 0 {
if _, err = a.db.Collection("topics").UpdateMany(sc, b.M{"_id": b.M{"$in": p2pTopics}},
b.M{"$set": b.M{
"updatedat": now, "touchedat": now, "state": t.StateDeleted, "stateat": now,
}}); err != nil {
return err
}
// Disable subscription to user's disabled p2p topics.
if _, err = a.db.Collection("subscriptions").UpdateMany(sc,
b.M{"topic": b.M{"$in": p2pTopics}}, disable); err != nil {
return err
}
}
// Finally disable the user.
if _, err = a.db.Collection("users").UpdateMany(sc, b.M{"_id": forUser}, disable); err != nil {
@@ -797,7 +810,8 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
return err
}
// topicStateForUser is called by UserUpdate when the update contains state change
// topicStateForUser is called by UserUpdate when the update contains state change.
// Soft-deleted topics remain soft-deleted.
func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error {
state, ok := update.(t.ObjState)
if !ok {
@@ -816,16 +830,19 @@ func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error
}
// Change state of p2p topics with the user (p2p topic's owner is blank)
topicIds, err := a.db.Collection("subscriptions").Distinct(a.ctx, "topic", b.M{"user": uid.String()})
// Get list of p2p topics with the user.
p2pTopics, err := a.p2pTopicsForUser(uid)
if err != nil {
return err
}
if _, err := a.db.Collection("topics").UpdateMany(a.ctx,
b.M{"_id": b.M{"$in": topicIds}, "owner": "", "state": b.M{"$ne": t.StateDeleted}},
b.M{"$set": b.M{"state": state, "stateat": now}}); err != nil {
return err
if len(p2pTopics) > 0 {
if _, err := a.db.Collection("topics").UpdateMany(a.ctx,
b.M{"_id": b.M{"$in": p2pTopics}, "state": b.M{"$ne": t.StateDeleted}},
b.M{"$set": b.M{"state": state, "stateat": now}}); err != nil {
return err
}
}
// Subscriptions don't need to be updated:
// subscriptions of a disabled user are not disabled and still can be manipulated.
return nil
@@ -833,7 +850,7 @@ func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error
// UserUpdate updates user record
func (a *adapter) UserUpdate(uid t.Uid, update map[string]any) error {
// to get round the hardcoded "UpdatedAt" key in store.Users.Update()
// Convert field names from CamelCase to lowercase.
update = normalizeUpdateMap(update)
_, err := a.db.Collection("users").UpdateOne(a.ctx, b.M{"_id": uid.String()}, b.M{"$set": update})
@@ -845,46 +862,37 @@ func (a *adapter) UserUpdate(uid t.Uid, update map[string]any) error {
now, _ := update["stateat"].(time.Time)
err = a.topicStateForUser(uid, now, state)
}
// Tags are stored in the same record, no need to update them separately.
return err
}
// UserUpdateTags adds, removes, or resets user's tags
// UserUpdateTags adds, removes, or resets user's tags.
func (a *adapter) UserUpdateTags(uid t.Uid, add, remove, reset []string) ([]string, error) {
var newTags t.StringSlice
// Compare to nil vs checking for zero length: zero length reset is valid.
if reset != nil {
// Replace Tags with the new value
return reset, a.UserUpdate(uid, map[string]any{"tags": reset})
// Replace tags with the new value
newTags = reset
} else {
var user t.User
err := a.db.Collection("users").FindOne(a.ctx, b.M{"_id": uid.String()}).Decode(&user)
if err != nil {
return nil, err
}
// Mutate the tag list.
newTags := user.Tags
if len(add) > 0 {
newTags = union(newTags, add)
}
if len(remove) > 0 {
newTags = diff(newTags, remove)
}
}
var user t.User
err := a.db.Collection("users").FindOne(a.ctx, b.M{"_id": uid.String()}).Decode(&user)
if err != nil {
return nil, err
}
// Mutate the tag list.
newTags := user.Tags
if len(add) > 0 {
newTags = union(newTags, add)
}
if len(remove) > 0 {
newTags = diff(newTags, remove)
}
update := map[string]any{"tags": newTags}
if err := a.UserUpdate(uid, update); err != nil {
return nil, err
}
// Get the new tags
var tags map[string][]string
findOpts := mdbopts.FindOne().SetProjection(b.M{"tags": 1, "_id": 0})
err = a.db.Collection("users").FindOne(a.ctx, b.M{"_id": uid.String()}, findOpts).Decode(&tags)
if err != nil {
return nil, err
}
return tags["tags"], nil
return newTags, a.UserUpdate(uid, map[string]any{"tags": newTags})
}
// UserGetByCred returns user ID for the given validated credential.
@@ -939,7 +947,8 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
pipeline := b.A{
b.M{"$match": b.M{"user": b.M{"$in": uids}}},
// Join documents from two collection
// Join documents from two collection.
// FIXME: this does not work for channels as localField[topic] is not the same as foreignField[_id].
b.M{"$lookup": b.M{
"from": "topics",
"localField": "topic",
@@ -1422,7 +1431,7 @@ func (a *adapter) TopicCreate(topic *t.Topic) error {
return err
}
// TopicCreateP2P creates a p2p topic
// TopicCreateP2P creates a p2p topic.
func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error {
initiator.Id = initiator.Topic + ":" + initiator.User
// Don't care if the initiator changes own subscription
@@ -1452,7 +1461,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error {
topic := &t.Topic{
ObjHeader: t.ObjHeader{Id: initiator.Topic},
TouchedAt: initiator.GetTouchedAt(),
SubCnt: 2,
}
topic.ObjHeader.MergeTimes(&initiator.ObjHeader)
return a.TopicCreate(topic)
@@ -1460,34 +1468,41 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error {
// TopicGet loads a single topic by name, if it exists. If the topic does not exist the call returns (nil, nil)
func (a *adapter) TopicGet(topic string) (*t.Topic, error) {
var tpc = new(t.Topic)
if err := a.db.Collection("topics").FindOne(a.ctx, b.M{"_id": topic}).Decode(tpc); err != nil {
var tt = new(t.Topic)
if err := a.db.Collection("topics").FindOne(a.ctx, b.M{"_id": topic}).Decode(tt); err != nil {
if err == mdb.ErrNoDocuments {
return nil, nil
}
return nil, err
}
// Topic found, get subsription count. Try both topic and channel names.
if err := a.db.Collection("subscriptions").FindOne(a.ctx, b.A{
b.M{"$match": b.M{
"topic": b.M{"$in": b.A{topic, t.GrpToChn(topic)}},
"deletedat": b.M{"$exists": false},
}},
b.M{"$count": "subCnt"},
}).Decode(&tpc.SubCnt); err != nil {
return nil, err
if t.GetTopicCat(topic) == t.TopicCatGrp {
// Topic found, get subsription count.
subCnt, err := a.subscriptionCount(topic)
if err != nil {
return nil, err
}
if int(subCnt) != tt.SubCnt {
// Update the topic with the correct subscription count.
tt.SubCnt = int(subCnt)
err = a.topicUpdate(topic, b.M{"subcnt": tt.SubCnt})
if err != nil {
return nil, err
}
}
}
tpc.Public = unmarshalBsonD(tpc.Public)
tpc.Trusted = unmarshalBsonD(tpc.Trusted)
// tpc.Aux = unmarshalBsonD(tpc.Aux)
return tpc, nil
tt.Public = unmarshalBsonD(tt.Public)
tt.Trusted = unmarshalBsonD(tt.Trusted)
return tt, nil
}
// TopicsForUser loads user's contact list: p2p and grp topics, except for 'me' & 'fnd' subscriptions.
// Reads and denormalizes Public & Trusted values.
func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
// Fetch user's subscriptions
// Fetch all user's subscriptions.
filter := b.M{"user": uid.String()}
if !keepDeleted {
// Filter out rows with defined deletedat
@@ -1525,6 +1540,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
if err != nil {
return nil, err
}
// Must close the cursor manually as we will be reusing it.
// Fetch subscriptions. Two queries are needed: users table (me & p2p) and topics table (p2p and grp).
// Prepare a list of Separate subscriptions to users vs topics
@@ -1554,14 +1570,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
sub.SetWith(uid1.UserId())
}
topq = append(topq, tname)
} else {
// Group or sys subscription.
if tcat == t.TopicCatGrp {
// Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
}
topq = append(topq, tname)
} else if tcat == t.TopicCatGrp {
// Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
}
// No special handling needed for 'slf', 'sys' subscriptions.
topq = append(topq, tname)
sub.Private = unmarshalBsonD(sub.Private)
join[tname] = sub
}
@@ -1578,9 +1593,11 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
if len(topq) > 0 {
// Fetch grp & p2p topics
filter = b.M{"_id": b.M{"$in": topq}}
if !keepDeleted {
filter["state"] = b.M{"$ne": t.StateDeleted}
}
if !ims.IsZero() {
// Use cache timestamp if provided: get newer entries only.
filter["touchedat"] = b.M{"$gt": ims}
@@ -1591,6 +1608,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
findOpts = mdbopts.Find().SetSort(b.D{{"touchedat", 1}}).SetLimit(int64(limit))
}
}
cur, err = a.db.Collection("topics").Find(a.ctx, filter, findOpts)
if err != nil {
return nil, err
@@ -1602,11 +1620,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
break
}
sub := join[top.Id]
// Check if sub.UpdatedAt needs to be adjusted to earlier or later time.
sub.UpdatedAt = common.SelectLatestTime(sub.UpdatedAt, top.UpdatedAt)
sub.SetState(top.State)
sub.SetTouchedAt(top.TouchedAt)
sub.SetSeqId(top.SeqId)
if t.GetTopicCat(sub.Topic) == t.TopicCatGrp {
sub.SetSubCnt(top.SubCnt)
sub.SetPublic(unmarshalBsonD(top.Public))
sub.SetTrusted(unmarshalBsonD(top.Trusted))
}
@@ -1614,6 +1634,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
join[top.Id] = sub
}
cur.Close(a.ctx)
if err != nil {
return nil, err
}
@@ -1651,6 +1672,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
}
}
cur.Close(a.ctx)
if err != nil {
return nil, err
}
@@ -1664,12 +1686,12 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
return common.SelectEarliestUpdatedSubs(subs, opts, a.maxResults), nil
}
// UsersForTopic loads users' subscriptions for a given topic. Public & Trusted are loaded.
// UsersForTopic loads users' subscriptions for a given topic (not channel readers).
// Public & Trusted are loaded.
func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
tcat := t.GetTopicCat(topic)
// Fetch topic subscribers
// Fetch all subscribed users. The number of users is not large
// Fetch all subscribed users. The number of users is not large.
filter := b.M{"topic": topic}
if !keepDeleted && tcat != t.TopicCatP2P {
// Filter out rows with DeletedAt being not null.
@@ -1700,7 +1722,7 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
return nil, err
}
// Fetch subscriptions
// Fetch subscriptions.
var subs []t.Subscription
join := make(map[string]t.Subscription)
usrq := make([]any, 0, 16)
@@ -1717,10 +1739,9 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
return nil, err
}
// Fetch users by a list of subscriptions.
if len(usrq) > 0 {
subs = make([]t.Subscription, 0, len(usrq))
// Fetch users by a list of subscriptions
cur, err = a.db.Collection("users").Find(a.ctx, b.M{
"_id": b.M{"$in": usrq},
"state": b.M{"$ne": t.StateDeleted}})
@@ -1739,7 +1760,6 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
sub.SetPublic(unmarshalBsonD(usr2.Public))
sub.SetTrusted(unmarshalBsonD(usr2.Trusted))
sub.SetLastSeenAndUA(usr2.LastSeen, usr2.UserAgent)
subs = append(subs, sub)
}
}
@@ -1787,14 +1807,15 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
return subs, nil
}
// OwnTopics loads a slice of topic names where the user is the owner.
func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) {
filter := b.M{"owner": uid.String(), "state": b.M{"$ne": t.StateDeleted}}
findOpts := mdbopts.Find().SetProjection(b.M{"_id": 1})
cur, err := a.db.Collection("topics").Find(a.ctx, filter, findOpts)
// topicNamesForUser reads topic names from the 'field' of 'collection' using 'filter'.
// If includeChan is true, for group topics also add the corresponding channel name.
func (a *adapter) topicNamesForUser(collection string, filter b.M, field string, includeChan bool) ([]string, error) {
cur, err := a.db.Collection(collection).Find(a.ctx, filter,
mdbopts.Find().SetProjection(b.M{field: 1}))
if err != nil {
return nil, err
}
defer cur.Close(a.ctx)
var names []string
for cur.Next(a.ctx) {
@@ -1802,38 +1823,44 @@ func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) {
if err = cur.Decode(&res); err != nil {
break
}
names = append(names, res["_id"])
names = append(names, res[field])
// If the name is a group topic, also add the channel name if requested.
if includeChan {
if channel := t.GrpToChn(res[field]); channel != "" {
names = append(names, channel)
}
}
}
cur.Close(a.ctx)
return names, err
}
func (a *adapter) p2pTopicsForUser(uid t.Uid) ([]string, error) {
return a.topicNamesForUser("subscriptions",
b.M{
"user": uid.String(),
"deletedat": b.M{"$exists": false},
"topic": b.M{"$regex": primitive.Regex{Pattern: "^p2p"}}},
"topic", false)
}
// OwnTopics loads a slice of topic names where the user is the owner.
func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) {
return a.topicNamesForUser("topics",
b.M{"owner": uid.String(), "state": b.M{"$ne": t.StateDeleted}},
"_id", false)
}
// ChannelsForUser loads a slice of topic names where the user is a channel reader and notifications (P) are enabled.
func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) {
filter := b.M{
"user": uid.String(),
"deletedat": b.M{"$exists": false},
"topic": b.M{"$regex": primitive.Regex{Pattern: "^chn"}},
"modewant": b.M{"$bitsAllSet": b.A{t.ModePres}},
"modegiven": b.M{"$bitsAllSet": b.A{t.ModePres}}}
findOpts := mdbopts.Find().SetProjection(b.M{"topic": 1})
cur, err := a.db.Collection("subscriptions").Find(a.ctx, filter, findOpts)
if err != nil {
return nil, err
}
var names []string
for cur.Next(a.ctx) {
var res map[string]string
if err = cur.Decode(&res); err != nil {
break
}
names = append(names, res["topic"])
}
cur.Close(a.ctx)
return names, err
return a.topicNamesForUser("subscriptions",
b.M{
"user": uid.String(),
"deletedat": b.M{"$exists": false},
"topic": b.M{"$regex": primitive.Regex{Pattern: "^chn"}},
"modewant": b.M{"$bitsAllSet": b.A{t.ModePres}},
"modegiven": b.M{"$bitsAllSet": b.A{t.ModePres}}},
"topic", false)
}
// TopicShare creates topic subscriptions.
@@ -1859,11 +1886,13 @@ func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error {
}
}
// Update topic's subscription count.
// The error is ignored because the subvscriptions have been created already.
a.db.Collection("topics").UpdateOne(a.ctx,
b.M{"_id": topic},
b.M{"$inc": map[string]any{"subcnt": len(shares)}})
if topic != "" {
// Update topic's subscription count.
// The error is ignored because the subscriptions have been created already.
a.db.Collection("topics").UpdateOne(a.ctx,
b.M{"_id": topic},
b.M{"$inc": b.M{"subcnt": len(shares)}})
}
return nil
}
@@ -1887,10 +1916,10 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error {
filter = b.M{"_id": topic}
if hard {
if err = a.MessageDeleteList(topic, nil); err != nil {
if err = a.decFileUseCounter(a.ctx, "topics", filter); err != nil {
return err
}
if err = a.decFileUseCounter(a.ctx, "topics", filter); err != nil {
if err = a.MessageDeleteList(topic, nil); err != nil {
return err
}
_, err = a.db.Collection("topics").DeleteOne(a.ctx, filter)
@@ -1906,7 +1935,26 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error {
// TopicUpdateOnMessage increments Topic's or User's SeqId value and updates TouchedAt timestamp.
func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error {
return a.topicUpdate(topic, map[string]any{"seqid": msg.SeqId, "touchedat": msg.CreatedAt})
return a.topicUpdate(topic, b.M{"seqid": msg.SeqId, "touchedat": msg.CreatedAt})
}
func (a *adapter) subscriptionCount(topic string) (int64, error) {
// Get count of non-deleted subscriptions to the topic.
return a.db.Collection("subscriptions").CountDocuments(a.ctx, b.M{
"topic": b.M{"$in": b.A{topic, t.GrpToChn(topic)}},
"deletedat": b.M{"$exists": false},
})
}
// TopicUpdateSubCnt updates subscriber count denormalized in topic.
func (a *adapter) TopicUpdateSubCnt(topic string) error {
// Get count of non-deleted subscriptions to the topic.
// UPDATE ... SET=(SELECT ...) is not supported in MongoDB, so we have to do it in two queries.
count, err := a.subscriptionCount(topic)
if err != nil {
return err
}
return a.topicUpdate(topic, b.M{"subcnt": count})
}
// TopicUpdate updates topic record.
@@ -1974,7 +2022,9 @@ func (a *adapter) SubsForUser(user t.Uid) ([]t.Subscription, error) {
return subs, cur.Err()
}
// SubsForTopic gets a list of subscriptions to a given topic. Does NOT load Public & Trusted values.
// SubsForTopic fetches all subsciptions for a topic. Does NOT load Public value and does not load channel readers.
// The difference between UsersForTopic vs SubsForTopic is that the former loads user.public+trusted,
// the latter does not.
func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
filter := b.M{"topic": topic}
if !keepDeleted {
@@ -2016,7 +2066,7 @@ func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt)
// SubsUpdate updates part of a subscription object. Pass nil for fields which don't need to be updated
func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) error {
// to get round the hardcoded pass of "Private" key
// Convert CamelCase field names to lowercase.
update = normalizeUpdateMap(update)
filter := b.M{}
@@ -2031,7 +2081,7 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er
return err
}
// SubsDelete deletes a single subscription
// SubsDelete marks at most one subscription as deleted (soft-deleting).
func (a *adapter) SubsDelete(topic string, user t.Uid) error {
var sess mdb.Session
var err error
@@ -2052,6 +2102,7 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error {
return err
}
// Channel readers cannot delete messages.
if !t.IsChannel(topic) {
// Delete user's dellog entries.
@@ -2066,19 +2117,96 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error {
return err
}
}
if t.GetTopicCat(topic) == t.TopicCatGrp {
// Decrement topic subscription count (only one subscription is deleted).
if err := a.topicUpdate(topic, b.M{"subcnt": -1}); err != nil {
return err
}
}
// Commit changes.
return a.maybeCommitTransaction(sc, sess)
})
}
// Delete/mark deleted subscriptions.
// clearUserDellog deletes all dellog entries and deletedfor markings of a given user.
func (a *adapter) clearUserDellog(sc mdb.SessionContext, forUser string) error {
topics, err := a.db.Collection("subscriptions").Distinct(sc, "topic",
b.M{"user": forUser, "deletedat": b.M{"$exists": false}})
if err != nil {
return err
}
// No need to convert channel names to group names:
// channel readers cannot delete messages.
if len(topics) > 0 {
// Delete user's dellog entries.
if _, err = a.db.Collection("dellog").DeleteMany(sc,
b.M{"topic": b.M{"$in": topics}, "deletedfor": forUser}); err != nil {
return err
}
// Delete user's markings of soft-deleted messages
filter := b.M{"topic": b.M{"$in": topics}, "deletedfor.user": forUser}
if _, err = a.db.Collection("messages").
UpdateMany(sc, filter, b.M{"$pull": b.M{"deletedfor": b.M{"user": forUser}}}); err != nil {
return err
}
}
return nil
}
// Delete/mark deleted subscriptions and decrement subcnt in topic.
func (a *adapter) subsDelete(ctx context.Context, filter b.M, hard bool) error {
var err error
// First, decrement subscription count in all affected topics.
// Doing it in two steps because MongoDB does not support an equivalent of
// 'UPDATE .. LEFT JOIN ...'.
filterWithDeletedAt := copyBsonMap(filter)
filterWithDeletedAt["deletedat"] = b.M{"$exists": false}
cur, err := a.db.Collection("subscriptions").Find(ctx, filterWithDeletedAt,
mdbopts.Find().SetProjection(b.D{{"topic", 1}, {"_id", 0}}))
if err != nil {
return err
}
defer cur.Close(ctx)
var topics []string
for cur.Next(ctx) {
var result struct {
Topic string `bson:"topic"`
}
if err = cur.Decode(&result); err != nil {
return err
}
if t.IsChannel(result.Topic) {
// Convert channel name to group name.
topics = append(topics, t.ChnToGrp(result.Topic))
}
topics = append(topics, result.Topic)
}
if err = cur.Err(); err != nil {
return err
}
if len(topics) == 0 {
// Nothing to update.
return nil
}
// Decrement subscription count in affected topics.
a.db.Collection("topics").UpdateMany(ctx,
b.M{"_id": b.M{"$in": topics}},
b.M{"$inc": b.M{"subcnt": -1}})
// Now delete or mark deleted the subscriptions.
if hard {
_, err = a.db.Collection("subscriptions").DeleteMany(ctx, filter)
} else {
now := t.TimeNow()
_, err = a.db.Collection("subscriptions").UpdateMany(ctx, filter,
_, err = a.db.Collection("subscriptions").UpdateMany(ctx, filterWithDeletedAt,
b.M{"$set": b.M{"updatedat": now, "deletedat": now}})
}
return err
@@ -2158,48 +2286,10 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string,
index[tag] = struct{}{}
}
/*
matchOn := b.M{"tags": b.M{"$in": allTags}}
if activeOnly {
matchOn["state"] = b.M{"$eq": t.StateOK}
}
commonPipe := b.A{
b.M{"$match": matchOn},
b.M{"$project": b.M{"_id": 1, "createdat": 1, "updatedat": 1, "usebt": 1, "access": 1, "public": 1, "trusted": 1, "tags": 1}},
b.M{"$unwind": "$tags"},
b.M{"$match": b.M{"tags": b.M{"$in": allTags}}},
b.M{"$group": b.M{
"_id": "$_id",
"createdat": b.M{"$first": "$createdat"},
"updatedat": b.M{"$first": "$updatedat"},
"usebt": b.M{"$first": "$usebt"},
"access": b.M{"$first": "$access"},
"public": b.M{"$first": "$public"},
"trusted": b.M{"$first": "$trusted"},
"tags": b.M{"$addToSet": "$tags"},
"matchedTagsCount": b.M{"$sum": 1},
}},
}
for _, reqDisjunction := range req {
if len(reqDisjunction) == 0 {
continue
}
var reqTags []any
for _, tag := range reqDisjunction {
reqTags = append(reqTags, tag)
}
// Filter out documents where 'tags' intersection with 'reqTags' is an empty array
commonPipe = append(commonPipe,
b.M{"$match": b.M{"$expr": b.M{"$ne": b.A{b.M{"$size": b.M{"$setIntersection": b.A{"$tags", reqTags}}}, 0}}}})
}
// Must create a copy of commonPipe so the original commonPipe can be used unmodified in $unionWith.
pipeline := append(slices.Clone(commonPipe),
b.M{"$unionWith": b.M{"coll": "topics", "pipeline": commonPipe}},
b.M{"$sort": b.M{"matchedTagsCount": -1}},
b.M{"$limit": a.maxResults})
*/
matchOn := b.M{"tags": b.M{"$in": allTags}}
if activeOnly {
matchOn["state"] = b.M{"$eq": t.StateOK}
}
projectFields := b.M{"_id": 1, "createdat": 1, "updatedat": 1, "usebt": 1,
"access": 1, "subcnt": 1, "public": 1, "trusted": 1, "tags": 1}
@@ -2209,14 +2299,14 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string,
b.M{
"$facet": b.D{
{"users", b.A{
b.M{"$match": b.M{"tags": b.M{"$in": allTags}}},
b.M{"$match": matchOn},
b.M{"$project": projectFields},
}},
{"topics", b.A{
b.M{"$lookup": b.D{
{"from", "topics"},
{"pipeline", b.A{
b.M{"$match": b.M{"tags": b.M{"$in": allTags}}},
b.M{"$match": matchOn},
b.M{"$project": projectFields},
}},
{"as", "topicDocs"},
@@ -2268,7 +2358,7 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string,
pipeline = append(pipeline,
// Stage 9: $sort
b.M{"$sort": b.M{"matchedCount": -1}},
b.M{"$sort": b.D{{"matchedCount", -1}, {"subcnt", -1}}},
// Stage 10: $limit
b.M{"$limit": a.maxResults},
)
@@ -2288,6 +2378,8 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string,
}
if topic.UseBt {
// This is a channel, convert grp to chn name: all channel-capable
// topics should appear as channels in search results.
sub.Topic = t.GrpToChn(topic.Id)
} else {
if uid := t.ParseUid(topic.Id); !uid.IsZero() {
@@ -2312,7 +2404,6 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string,
sub.Private = common.FilterFoundTags(topic.Tags, index)
subs = append(subs, sub)
}
if err == nil {
err = cur.Err()
}
@@ -2320,7 +2411,7 @@ func (a *adapter) Find(caller, prefPrefix string, req [][]string, opt []string,
return subs, err
}
// FindOne returns topic or user which matches the given tag.
// FindOne returns the first topic or user which matches the given tag.
func (a *adapter) FindOne(tag string) (string, error) {
// Part of the pipeline identical for users and topics collections.
commonPipe := b.A{b.M{"$match": b.M{"tags": tag}}, b.M{"$project": b.M{"_id": 1}}}
@@ -2362,7 +2453,7 @@ func (a *adapter) MessageSave(msg *t.Message) error {
return err
}
// MessageGetAll returns messages matching the query
// MessageGetAll returns messages matching the query.
func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ([]t.Message, error) {
var limit = a.maxMessageResults
var lower, upper int
@@ -2420,11 +2511,11 @@ func (a *adapter) messagesHardDelete(topic string) error {
return err
}
if _, err = a.db.Collection("messages").DeleteMany(a.ctx, filter); err != nil {
if err = a.decFileUseCounter(a.ctx, "messages", filter); err != nil {
return err
}
if err = a.decFileUseCounter(a.ctx, "messages", filter); err != nil {
if _, err = a.db.Collection("messages").DeleteMany(a.ctx, filter); err != nil {
return err
}
@@ -2652,7 +2743,7 @@ func (a *adapter) deviceInsert(userId string, dev *t.DeviceDef) error {
return err
}
// DeviceGetAll returns all devices for a given set of users
// DeviceGetAll returns all devices for a given set of users.
func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, error) {
ids := make([]any, len(uids))
for i, id := range uids {
@@ -2678,7 +2769,7 @@ func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, err
if err = cur.Decode(&row); err != nil {
return nil, 0, err
}
if row.Devices != nil && len(row.Devices) > 0 {
if len(row.Devices) > 0 {
if err := uid.UnmarshalText([]byte(row.Id)); err != nil {
continue
}
+266 -184
View File
File diff suppressed because it is too large Load Diff
+149 -146
View File
@@ -214,8 +214,7 @@ func (a *adapter) GetDbVersion() (int, error) {
defer cancel()
}
var vers string
err := a.db.QueryRow(ctx, "SELECT value FROM kvmeta WHERE key = $1", "version").Scan(&vers)
err := a.db.QueryRow(ctx, "SELECT value FROM kvmeta WHERE key='version'").Scan(&vers)
if err != nil {
if isMissingDb(err) || isMissingTable(err) || err == pgx.ErrNoRows {
err = errors.New("Database not initialized")
@@ -234,7 +233,7 @@ func (a *adapter) updateDbVersion(v int) error {
defer cancel()
}
a.version = -1
if _, err := a.db.Exec(ctx, "UPDATE kvmeta SET value = $1 WHERE key = $2", strconv.Itoa(v), "version"); err != nil {
if _, err := a.db.Exec(ctx, `UPDATE kvmeta SET "value"=$1 WHERE "key"='version'`, strconv.Itoa(v)); err != nil {
return err
}
return nil
@@ -430,7 +429,8 @@ func (a *adapter) CreateDb(reset bool) error {
);
CREATE UNIQUE INDEX topics_name ON topics(name);
CREATE INDEX topics_owner ON topics(owner);
CREATE INDEX topics_state_stateat ON topics(state, stateat);`); err != nil {
CREATE INDEX topics_state_stateat ON topics(state, stateat);
CREATE INDEX topics_name_state_seqid ON topics(name, state, seqid);`); err != nil {
return err
}
@@ -473,7 +473,8 @@ func (a *adapter) CreateDb(reset bool) error {
);
CREATE UNIQUE INDEX subscriptions_topic_userid ON subscriptions(topic, userid);
CREATE INDEX subscriptions_topic ON subscriptions(topic);
CREATE INDEX subscriptions_deletedat ON subscriptions(deletedat);`); err != nil {
CREATE INDEX subscriptions_deletedat ON subscriptions(deletedat);
CREATE INDEX subscriptions_userid_topic_deletedat ON subscriptions(userid, topic, deletedat);`); err != nil {
return err
}
@@ -587,16 +588,6 @@ func (a *adapter) CreateDb(reset bool) error {
return err
}
// Find relevant subscriptions for given users efficiently, and use the join key too.
if _, err = tx.Exec(ctx, "CREATE INDEX idx_subs_user_topic_del ON subscriptions(userid, topic, deletedat)"); err != nil {
return err
}
// Optimizes join; state filters; seqid supports the SUM operation.
if _, err = tx.Exec(ctx, "CREATE INDEX idx_topics_name_state_seqid ON topics(name, state, seqid)"); err != nil {
return err
}
return tx.Commit(ctx)
}
@@ -944,7 +935,6 @@ func (a *adapter) UserGet(uid t.Uid) (*t.User, error) {
var user t.User
var id int64
row, err := a.db.Query(ctx, "SELECT * FROM users WHERE id=$1 AND state!=$2", store.DecodeUid(uid), t.StateDeleted)
if err != nil {
return nil, err
@@ -990,12 +980,8 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) {
users = nil
break
}
if user.State == t.StateDeleted {
continue
}
user.SetUid(store.EncodeUid(id))
users = append(users, user)
}
if err == nil {
@@ -1008,6 +994,13 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) {
// UserDelete deletes specified user: wipes completely (hard-delete) or marks as deleted.
// TODO: report when the user is not found.
func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
// Get a list of topic names owned by the user (as 'grp' and 'chn').
ownTopics, err := a.topicNamesForUser("SELECT name FROM topics WHERE owner=$1 AND state!=$2",
true, store.DecodeUid(uid), t.StateDeleted)
if err != nil {
return err
}
ctx, cancel := a.getContextForTx()
if cancel != nil {
defer cancel()
@@ -1053,14 +1046,16 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
decoded_uid); err != nil {
return err
}
// Deletion of messages will cascade to filemsglinks and so to fileuploads.
if _, err = tx.Exec(ctx, "DELETE FROM messages USING topics WHERE topics.name=messages.topic AND topics.owner=$1",
decoded_uid); err != nil {
return err
}
// Delete all subscriptions.
if _, err = tx.Exec(ctx, "DELETE FROM subscriptions USING topics WHERE topics.name=subscriptions.topic AND topics.owner=$1",
decoded_uid); err != nil {
// Delete subscriptions for all users where the user is the owner of the topic.
sql, args, _ := sqlx.In("DELETE FROM subscriptions AS s WHERE topic IN (?)", ownTopics)
if _, err = tx.Exec(ctx, sqlx.Rebind(sqlx.DOLLAR, sql), args...); err != nil {
return err
}
@@ -1099,18 +1094,17 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
}
// Disable all subscriptions to topics where the user is the owner.
if _, err = tx.Exec(ctx, "UPDATE subscriptions SET updatedat=$1, deletedat=$2 "+
"FROM topics WHERE subscriptions.topic=topics.name AND topics.owner=$3",
now, now, decoded_uid); err != nil {
sql, args, _ := sqlx.In("UPDATE subscriptions SET s.updatedat=?,s.deletedat=? WHERE topic IN (?)", now, now, ownTopics)
if _, err = tx.Exec(ctx, sqlx.Rebind(sqlx.DOLLAR, sql), args...); err != nil {
return err
}
// Disable group topics where the user is the owner.
if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1, touchedat=$2, state=$3, stateat=$4 WHERE owner=$5",
if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1, touchedat=$2,state=$3,stateat=$4 WHERE owner=$5",
now, now, t.StateDeleted, now, decoded_uid); err != nil {
return err
}
// Disable p2p topics with the user (p2p topic's owner is 0).
if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1, touchedat=$2, state=$3, stateat=$4 "+
if _, err = tx.Exec(ctx, "UPDATE topics SET updatedat=$1,touchedat=$2,state=$3,stateat=$4 "+
"FROM subscriptions WHERE topics.name=subscriptions.topic "+
"AND topics.owner=0 AND subscriptions.userid=$5",
now, now, t.StateDeleted, now, decoded_uid); err != nil {
@@ -1118,7 +1112,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
}
// Disable the other user's subscription to a disabled p2p topic.
if _, err = tx.Exec(ctx, "UPDATE subscriptions AS s_one SET updatedat=$1, deletedat=$2 "+
if _, err = tx.Exec(ctx, "UPDATE subscriptions AS s_one SET updatedat=$1,deletedat=$2 "+
"FROM subscriptions AS s_two WHERE s_one.topic=s_two.topic "+
"AND s_two.userid=$3 AND s_two.topic LIKE 'p2p%'",
now, now, decoded_uid); err != nil {
@@ -1126,7 +1120,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
}
// Disable user.
if _, err = tx.Exec(ctx, "UPDATE users SET updatedat=$1, state=$2, stateat=$3 WHERE id=$4",
if _, err = tx.Exec(ctx, "UPDATE users SET updatedat=$1,state=$2,stateat=$3 WHERE id=$4",
now, t.StateDeleted, now, decoded_uid); err != nil {
return err
}
@@ -1136,6 +1130,7 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
}
// topicStateForUser is called by UserUpdate when the update contains state change.
// Soft-deleted topics remain soft-deleted.
func (a *adapter) topicStateForUser(ctx context.Context, tx pgx.Tx, decoded_uid int64, now time.Time, update any) error {
var err error
@@ -1265,13 +1260,13 @@ func (a *adapter) UserUpdateTags(uid t.Uid, add, remove, reset []string) ([]stri
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var tag string
rows.Scan(&tag)
allTags = append(allTags, tag)
}
rows.Close()
_, err = tx.Exec(ctx, "UPDATE users SET tags=$1 WHERE id=$2", t.StringSlice(allTags), decoded_uid)
if err != nil {
@@ -1303,6 +1298,7 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) {
// UserUnreadCount returns the total number of unread messages in all topics with
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
// UserUnreadCount does not count unread messages in channels although it should.
func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
uids := make([]any, len(ids))
counts := make(map[t.Uid]int, len(ids))
@@ -1312,15 +1308,15 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
counts[id] = 0
}
query, uids := expandQuery("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+
"WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+
"POSITION('R' IN s.modewant)>0 AND POSITION('R' IN s.modegiven)>0 GROUP BY s.userid", uids, t.StateDeleted)
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
// FIXME: support channels.
query, uids := expandQuery("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+
"WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+
"POSITION('R' IN s.modewant)>0 AND POSITION('R' IN s.modegiven)>0 GROUP BY s.userid", uids, t.StateDeleted)
rows, err := a.db.Query(ctx, query, uids...)
if err != nil {
return counts, err
@@ -1489,7 +1485,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error {
topic := &t.Topic{ObjHeader: t.ObjHeader{Id: initiator.Topic}}
topic.ObjHeader.MergeTimes(&initiator.ObjHeader)
topic.TouchedAt = initiator.GetTouchedAt()
topic.SubCnt = 2
err = a.topicCreate(ctx, tx, topic)
if err != nil {
return err
@@ -1504,20 +1499,11 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) {
if cancel != nil {
defer cancel()
}
tx, err := a.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return nil, err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
}
}()
// Fetch topic by name
var tt = new(t.Topic)
var owner int64
err = tx.QueryRow(ctx,
err := a.db.QueryRow(ctx,
"SELECT createdat,updatedat,state,stateat,touchedat,name AS id,usebt,access,owner,seqid,delid,subcnt,public,trusted,tags,aux "+
"FROM topics WHERE name=$1",
topic).Scan(&tt.CreatedAt, &tt.UpdatedAt, &tt.State, &tt.StateAt, &tt.TouchedAt, &tt.Id,
@@ -1526,31 +1512,30 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) {
if err == pgx.ErrNoRows {
// Nothing found - clear the error
err = nil
// Force close transaction.
tx.Rollback(ctx)
}
return nil, err
}
// Topic found, get subsription count. Try both topic and channel names.
channel := t.GrpToChn(topic)
var subCnt int
if err = tx.QueryRow(ctx,
"SELECT COUNT(*) FROM subscriptions WHERE topic IN ($1,$2) AND deletedat IS NULL", topic, channel).Scan(&subCnt); err != nil {
return nil, err
}
if subCnt != tt.SubCnt {
// Update the topic with the correct subscription count.
tt.SubCnt = subCnt
if _, err = tx.Exec(ctx, "UPDATE topics SET subcnt=$1 WHERE name=$2", subCnt, topic); err != nil {
if t.GetTopicCat(topic) == t.TopicCatGrp {
// Topic found, get subsription count. Try both topic and channel names.
var subCnt int
if err = a.db.QueryRow(ctx,
"SELECT COUNT(*) FROM subscriptions WHERE topic IN ($1,$2) AND deletedat IS NULL", topic, t.GrpToChn(topic)).
Scan(&subCnt); err != nil {
return nil, err
}
if subCnt != tt.SubCnt {
// Update the topic with the correct subscription count.
tt.SubCnt = subCnt
if _, err = a.db.Exec(ctx, "UPDATE topics SET subcnt=$1 WHERE name=$2", subCnt, topic); err != nil {
return nil, err
}
}
}
tt.Owner = store.EncodeUid(owner).String()
err = tx.Commit(ctx)
return tt, err
}
@@ -1566,8 +1551,9 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
// Filter out deleted rows.
q += " AND deletedat IS NULL"
}
limit := 0
ipg := time.Time{}
ims := time.Time{}
if opts != nil {
if opts.Topic != "" {
q += " AND topic=?"
@@ -1583,7 +1569,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
limit = a.maxResults
}
} else {
ipg = *opts.IfModifiedSince
ims = *opts.IfModifiedSince
}
} else {
limit = a.maxResults
@@ -1604,6 +1590,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
if err != nil {
return nil, err
}
// Must close rows manually as we will be reusing it.
// Fetch subscriptions. Two queries are needed: users table (p2p) and topics table (grp).
// Prepare a list of separate subscriptions to users vs topics
@@ -1624,7 +1611,8 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
tcat := t.GetTopicCat(tname)
if tcat == t.TopicCatMe || tcat == t.TopicCatFnd {
// One of 'me', 'fnd' subscriptions, skip. Don't skip 'sys' subscription.
// One of 'me', 'fnd' subscriptions, skip.
// Don't skip 'sys' subscription.
continue
} else if tcat == t.TopicCatP2P {
// P2P subscription, find the other user to get user.Public and user.Trusted.
@@ -1636,15 +1624,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
usrq = append(usrq, store.DecodeUid(uid1))
sub.SetWith(uid1.UserId())
}
topq = append(topq, tname)
} else {
// Group or 'sys' subscription.
if tcat == t.TopicCatGrp {
// Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
}
topq = append(topq, tname)
} else if tcat == t.TopicCatGrp {
// Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
}
// No special handling needed for 'slf', 'sys' subscriptions.
topq = append(topq, tname)
sub.Private = common.FromJSON(sub.Private)
join[tname] = sub
}
@@ -1664,7 +1650,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
// Fetch grp topics and join to subscriptions.
if len(topq) > 0 {
q = "SELECT updatedat,state,touchedat,name AS id,usebt,access,seqid,delid,public,trusted " +
q = "SELECT updatedat,state,touchedat,name AS id,usebt,access,seqid,delid,subcnt,public,trusted " +
"FROM topics WHERE name IN (?)"
newargs := []any{topq}
@@ -1674,10 +1660,10 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
newargs = append(newargs, t.StateDeleted)
}
if !ipg.IsZero() {
if !ims.IsZero() {
// Use cache timestamp if provided: get newer entries only.
q += " AND touchedat>?"
newargs = append(newargs, ipg)
newargs = append(newargs, ims)
if limit > 0 && limit < len(topq) {
// No point in fetching more than the requested limit.
@@ -1699,7 +1685,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
var top t.Topic
for rows.Next() {
if err = rows.Scan(&top.UpdatedAt, &top.State, &top.TouchedAt, &top.Id, &top.UseBt,
&top.Access, &top.SeqId, &top.DelId, &top.Public, &top.Trusted); err != nil {
&top.Access, &top.SeqId, &top.DelId, &top.SubCnt, &top.Public, &top.Trusted); err != nil {
break
}
@@ -1710,6 +1696,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
sub.SetTouchedAt(top.TouchedAt)
sub.SetSeqId(top.SeqId)
if t.GetTopicCat(sub.Topic) == t.TopicCatGrp {
sub.SetSubCnt(top.SubCnt)
sub.SetPublic(top.Public)
sub.SetTrusted(top.Trusted)
}
@@ -1731,7 +1718,6 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
q = "SELECT id,updatedat,state,access,lastseen,useragent,public,trusted " +
"FROM users WHERE id IN (?)"
newargs := []any{usrq}
if !keepDeleted {
// Optionally skip deleted users.
q += " AND state!=?"
@@ -1746,12 +1732,10 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
if cancel3 != nil {
defer cancel3()
}
rows, err = a.db.Query(ctx3, q, newargs...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var usr2 t.User
@@ -1776,6 +1760,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
if err == nil {
err = rows.Err()
}
rows.Close()
if err != nil {
return nil, err
@@ -1915,24 +1900,30 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
}
// topicNamesForUser reads a slice of strings using provided query.
func (a *adapter) topicNamesForUser(uid t.Uid, sqlQuery string) ([]string, error) {
func (a *adapter) topicNamesForUser(sqlQuery string, includeChan bool, args ...any) ([]string, error) {
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
rows, err := a.db.Query(ctx, sqlQuery, store.DecodeUid(uid))
rows, err := a.db.Query(ctx, sqlQuery, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var names []string
var name string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
break
}
names = append(names, name)
// If the name is a group topic, also add the channel name if requested.
if includeChan {
if channel := t.GrpToChn(name); channel != "" {
names = append(names, channel)
}
}
}
if err == nil {
err = rows.Err()
@@ -1943,17 +1934,18 @@ func (a *adapter) topicNamesForUser(uid t.Uid, sqlQuery string) ([]string, error
// OwnTopics loads a slice of topic names where the user is the owner.
func (a *adapter) OwnTopics(uid t.Uid) ([]string, error) {
return a.topicNamesForUser(uid, "SELECT name FROM topics WHERE owner=$1")
return a.topicNamesForUser("SELECT name FROM topics WHERE owner=$1 AND state!=$2",
false, store.DecodeUid(uid), t.StateDeleted)
}
// ChannelsForUser loads a slice of topic names where the user is a channel reader and notifications (P) are enabled.
func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) {
return a.topicNamesForUser(uid,
"SELECT topic FROM subscriptions WHERE userid=$1 AND topic LIKE 'chn%' "+
"AND POSITION('P' IN modewant)>0 AND POSITION('P' IN modegiven)>0 AND deletedat IS NULL")
return a.topicNamesForUser("SELECT topic FROM subscriptions WHERE userid=$1 AND topic LIKE 'chn%' "+
"AND POSITION('P' IN modewant)>0 AND POSITION('P' IN modegiven)>0 AND deletedat IS NULL",
false, store.DecodeUid(uid))
}
// TopicShare creates topic subscriptions.
// TopicShare creates topic subscriptions and increments the topic's subcnt.
func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error {
ctx, cancel := a.getContextForTx()
if cancel != nil {
@@ -2011,7 +2003,6 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error {
if hard {
// Delete subscriptions. If this is a channel, delete both group subscriptions and channel subscriptions.
q, args := expandQuery("DELETE FROM subscriptions WHERE topic IN (?)", args)
if _, err = tx.Exec(ctx, q, args...); err != nil {
return err
}
@@ -2029,8 +2020,8 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error {
}
} else {
now := t.TimeNow()
q, args := expandQuery("UPDATE subscriptions SET updatedat=?,deletedat=? WHERE topic IN (?)", now, now, args)
q, args := expandQuery("UPDATE subscriptions SET updatedat=?,deletedat=? WHERE topic IN (?)", now, now, args)
if _, err = tx.Exec(ctx, q, args); err != nil {
return err
}
@@ -2053,6 +2044,18 @@ func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error {
return err
}
// TopicUpdateSubCnt updates subscriber count denormalized in topic.
func (a *adapter) TopicUpdateSubCnt(topic string) error {
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
_, err := a.db.Exec(ctx,
"UPDATE topics SET subcnt=(SELECT COUNT(*) FROM subscriptions WHERE topic IN ($1,$2) AND deletedat IS NULL) WHERE name=$1",
topic, t.GrpToChn(topic))
return err
}
func (a *adapter) TopicUpdate(topic string, update map[string]any) error {
ctx, cancel := a.getContextForTx()
if cancel != nil {
@@ -2111,12 +2114,15 @@ func (a *adapter) SubscriptionGet(topic string, user t.Uid, keepDeleted bool) (*
if cancel != nil {
defer cancel()
}
query := `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid,
readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=$1 AND userid=$2`
if !keepDeleted {
query += " AND deletedat IS NULL"
}
var sub t.Subscription
var userId int64
var modeWant, modeGiven []byte
err := a.db.QueryRow(ctx, `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid,
readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=$1 AND userid=$2`,
topic, store.DecodeUid(user)).Scan(&sub.CreatedAt, &sub.UpdatedAt, &sub.DeletedAt, &userId,
err := a.db.QueryRow(ctx, query, topic, store.DecodeUid(user)).Scan(&sub.CreatedAt, &sub.UpdatedAt, &sub.DeletedAt, &userId,
&sub.Topic, &sub.DelId, &sub.RecvSeqId, &sub.ReadSeqId, &modeWant, &modeGiven, &sub.Private)
if err != nil {
@@ -2127,10 +2133,6 @@ func (a *adapter) SubscriptionGet(topic string, user t.Uid, keepDeleted bool) (*
return nil, err
}
if !keepDeleted && sub.DeletedAt != nil {
return nil, nil
}
sub.User = store.EncodeUid(userId).String()
sub.ModeWant.Scan(modeWant)
sub.ModeGiven.Scan(modeGiven)
@@ -2185,7 +2187,6 @@ func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt)
readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=?`
args := []any{topic}
if !keepDeleted {
// Filter out deleted rows.
q += " AND deletedat IS NULL"
@@ -2258,12 +2259,12 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er
}()
cols, args := common.UpdateByMap(update)
args = append(args, topic)
q := "UPDATE subscriptions SET " + strings.Join(cols, ",") + " WHERE topic=?"
args = append(args, topic)
if !user.IsZero() {
// Update just one topic subscription
args = append(args, store.DecodeUid(user))
q += " AND userid=?"
args = append(args, store.DecodeUid(user))
}
q, args = expandQuery(q, args...)
@@ -2274,7 +2275,7 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er
return tx.Commit(ctx)
}
// SubsDelete marks subscription as deleted.
// SubsDelete marks at most one subscription as deleted.
func (a *adapter) SubsDelete(topic string, user t.Uid) error {
ctx, cancel := a.getContext()
if cancel != nil {
@@ -2308,16 +2309,21 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error {
return err
}
// Remove records of messages soft-deleted by this user.
_, err = tx.Exec(ctx, "DELETE FROM dellog WHERE topic=$1 AND deletedfor=$2", topic, decoded_id)
if err != nil {
return err
// Channel readers cannot delete messages.
if !t.IsChannel(topic) {
// Remove records of messages soft-deleted by this user.
_, err = tx.Exec(ctx, "DELETE FROM dellog WHERE topic=$1 AND deletedfor=$2", topic, decoded_id)
if err != nil {
return err
}
}
// Decrement topic subscription count.
_, err = tx.Exec(ctx, "UPDATE topics SET subcnt=subcnt-1 WHERE name=$1", topic)
if err != nil {
return err
if t.GetTopicCat(topic) == t.TopicCatGrp {
// Decrement topic subscription count (only one subscription is deleted).
_, err = tx.Exec(ctx, "UPDATE topics SET subcnt=subcnt-1 WHERE name=$1", topic)
if err != nil {
return err
}
}
return tx.Commit(ctx)
@@ -2325,8 +2331,15 @@ func (a *adapter) SubsDelete(topic string, user t.Uid) error {
// subsDelForUser marks user's subscriptions as deleted.
func subsDelForUser(ctx context.Context, tx pgx.Tx, user t.Uid, hard bool) error {
var err error
// TODO: update subscriber count in topics.
decoded_uid := store.DecodeUid(user)
// Decrement subscription count for all topics the user is subscribed to.
_, err := tx.Exec(ctx, "UPDATE topics AS t LEFT JOIN subscriptions AS s ON t.name=s.topic "+
"SET t.subcnt=t.subcnt-1 WHERE s.userid=$1 AND s.deletedat IS NULL", decoded_uid)
if err != nil {
return err
}
if hard {
// Hard delete: remove all subscriptions for the user.
_, err = tx.Exec(ctx, "DELETE FROM subscriptions WHERE userid=$1;", store.DecodeUid(user))
@@ -2374,11 +2387,10 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
stateConstraint = "u.state=? AND "
}
allReq := t.FlattenDoubleSlice(req)
allTags := append(allReq, opt...)
for _, tag := range allTags {
for _, tag := range append(allReq, opt...) {
args = append(args, tag)
index[tag] = struct{}{}
}
args = append(args, allTags)
var matcher string
if promoPrefix != "" {
@@ -2388,7 +2400,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
matcher = "COUNT(*)"
}
query := "SELECT CAST(u.id AS VARCHAR) AS topic,u.createdat,u.updatedat,FALSE,u.access::jsonb,0,u.public::jsonb,u.trusted::jsonb,u.tags::jsonb," +
query := "SELECT CAST(u.id AS VARCHAR) AS topic,u.createdat,u.updatedat,FALSE,u.access::jsonb,0 AS subcnt,u.public::jsonb,u.trusted::jsonb,u.tags::jsonb," +
matcher + " AS matches " +
"FROM users AS u LEFT JOIN usertags AS tg ON tg.userid=u.id " +
"WHERE " + stateConstraint + "tg.tag IN (?) " +
@@ -2396,7 +2408,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
if len(allReq) > 0 {
q, a := common.DisjunctionSql(req, "tg.tag")
query += q
args = append(args, a)
args = append(args, a...)
}
query += "UNION ALL "
@@ -2407,7 +2419,9 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
} else {
stateConstraint = ""
}
args = append(args, allTags)
for _, tag := range append(allReq, opt...) {
args = append(args, tag)
}
query += "SELECT t.name AS topic,t.createdat,t.updatedat,t.usebt,t.access::jsonb,t.subcnt,t.public::jsonb,t.trusted::jsonb,t.tags::jsonb," +
matcher + " AS matches " +
@@ -2417,15 +2431,15 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
if len(allReq) > 0 {
q, a := common.DisjunctionSql(req, "tg.tag")
query += q
args = append(args, a)
args = append(args, a...)
}
query, args = expandQuery(query+"ORDER BY matches DESC LIMIT ?", args, a.maxResults)
query, args = expandQuery(query+"ORDER BY matches DESC, subcnt DESC LIMIT ?", args, a.maxResults)
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
// Get users matched by tags, sort by number of matches from high to low.
rows, err := a.db.Query(ctx, query, args...)
if err != nil {
@@ -2433,6 +2447,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
}
defer rows.Close()
// Fetch subscriptions
var public, trusted any
var access t.DefaultAccess
var subcnt int
@@ -2442,8 +2457,8 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
var sub t.Subscription
var subs []t.Subscription
for rows.Next() {
if err = rows.Scan(&sub.Topic, &sub.CreatedAt, &sub.UpdatedAt, &isChan, &access,
&subcnt, &public, &trusted, &setTags, &ignored); err != nil {
if err = rows.Scan(&sub.Topic, &sub.CreatedAt, &sub.UpdatedAt, &isChan, &access, &subcnt,
&public, &trusted, &setTags, &ignored); err != nil {
subs = nil
break
}
@@ -2457,6 +2472,7 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
}
if isChan {
// This is a channel, convert grp to chn name.
sub.Topic = t.GrpToChn(sub.Topic)
}
@@ -2519,7 +2535,6 @@ func (a *adapter) FindOne(tag string) (string, error) {
found = store.EncodeUid(id).UserId()
}
}
if err == nil {
err = rows.Err()
}
@@ -2868,18 +2883,17 @@ func (a *adapter) DeviceUpsert(uid t.Uid, def *t.DeviceDef) error {
}
func (a *adapter) DeviceGetAll(uids ...t.Uid) (map[t.Uid][]t.DeviceDef, int, error) {
var unupg []any
var unums []any
for _, uid := range uids {
unupg = append(unupg, store.DecodeUid(uid))
unums = append(unums, store.DecodeUid(uid))
}
query, unupg := expandQuery("SELECT userid,deviceid,platform,lastseen,lang FROM devices WHERE userid IN (?)", unupg)
query, unums := expandQuery("SELECT userid,deviceid,platform,lastseen,lang FROM devices WHERE userid IN (?)", unums)
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
rows, err := a.db.Query(ctx, query, unupg...)
rows, err := a.db.Query(ctx, query, unums...)
if err != nil {
return nil, 0, err
}
@@ -3079,7 +3093,7 @@ func credDel(ctx context.Context, tx pgx.Tx, uid t.Uid, method, value string) er
}
// Case 2.1
res, err = tx.Exec(ctx, "DELETE FROM credentials"+where+" AND (done=true OR retries=0)", args...)
res, err = tx.Exec(ctx, "DELETE FROM credentials"+where+" AND (done=TRUE OR retries=0)", args...)
if err != nil {
return err
}
@@ -3133,7 +3147,7 @@ func (a *adapter) CredConfirm(uid t.Uid, method string) error {
}
res, err := a.db.Exec(
ctx,
"UPDATE credentials SET updatedat=$1,done=true,synthetic=CONCAT(method,':',value) "+
"UPDATE credentials SET updatedat=$1,done=TRUE,synthetic=CONCAT(method,':',value) "+
"WHERE userid=$2 AND method=$3 AND deletedat IS NULL AND done=FALSE",
t.TimeNow(), store.DecodeUid(uid), method)
if err != nil {
@@ -3170,14 +3184,12 @@ func (a *adapter) CredGetActive(uid t.Uid, method string) (*t.Credential, error)
err := a.db.QueryRow(ctx, "SELECT createdat,updatedat,method,value,resp,done,retries "+
"FROM credentials WHERE userid=$1 AND deletedat IS NULL AND method=$2 AND done=FALSE",
store.DecodeUid(uid), method).Scan(&cred.CreatedAt, &cred.UpdatedAt, &cred.Method, &cred.Value, &cred.Resp, &cred.Done, &cred.Retries)
if err != nil {
if err == pgx.ErrNoRows {
err = nil
}
return nil, err
}
cred.User = uid.String()
return &cred, nil
@@ -3309,11 +3321,10 @@ func (a *adapter) FileGet(fid string) (*t.FileDef, error) {
return nil, err
}
fd.SetUid(store.EncodeUid(ID))
fd.Id = common.EncodeUidString(fd.Id).String()
fd.User = store.EncodeUid(userId).String()
return &fd, nil
}
// FileDeleteUnused deletes file upload records.
@@ -3336,7 +3347,6 @@ func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, er
query := "SELECT fu.id,fu.location FROM fileuploads AS fu LEFT JOIN filemsglinks AS fml ON fml.fileid=fu.id " +
"WHERE fml.id IS NULL"
var args []any
if !olderThan.IsZero() {
query += " AND fu.updatedat<?"
args = append(args, olderThan)
@@ -3351,6 +3361,7 @@ func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, er
if err != nil {
return nil, err
}
defer rows.Close()
var locations []string
var ids []any
@@ -3368,7 +3379,6 @@ func (a *adapter) FileDeleteUnused(olderThan time.Time, limit int) ([]string, er
if err == nil {
err = rows.Err()
}
rows.Close()
if err != nil {
return nil, err
@@ -3450,7 +3460,6 @@ func (a *adapter) FileLinkAttachments(topic string, userId, msgId t.Uid, fids []
query, args := expandQuery("INSERT INTO filemsglinks(createdat,fileid,"+linkBy+") VALUES (?,?,?)"+
strings.Repeat(",(?,?,?)", len(dids)-1), args...)
_, err = tx.Exec(ctx, query, args...)
if err != nil {
return err
@@ -3590,16 +3599,8 @@ func expandQuery(query string, args ...any) (string, []any) {
if len(args) != strings.Count(query, "?") {
args = flatMap(args)
}
expandedQuery, expandedArgs, _ = sqlx.In(query, args...)
placeholders := make([]string, len(expandedArgs))
for i := range expandedArgs {
placeholders[i] = "$" + strconv.Itoa(i+1)
expandedQuery = strings.Replace(expandedQuery, "?", placeholders[i], 1)
}
return expandedQuery, expandedArgs
return sqlx.Rebind(sqlx.DOLLAR, expandedQuery), expandedArgs
}
// Convert a slice of slices into a flat slice.
@@ -3623,6 +3624,8 @@ func init() {
store.RegisterAdapter(&adapter{})
}
/*
func (a *adapter) GetTopicsLastMsgWriter(subs []t.Subscription) []t.Subscription {
return subs
}
*/
+298 -163
View File
@@ -32,13 +32,12 @@ type adapter struct {
}
const (
adpVersion = 116
adapterName = "rethinkdb"
defaultHost = "localhost:28015"
defaultDatabase = "tinode"
adpVersion = 116
adapterName = "rethinkdb"
defaultMaxResults = 1024
// This is capped by the Session's send queue limit (128).
defaultMaxMessageResults = 100
@@ -774,88 +773,103 @@ func (a *adapter) UserGetAll(ids ...t.Uid) ([]t.User, error) {
}
users := []t.User{}
if cursor, err := rdb.DB(a.dbName).Table("users").GetAll(uids...).
Filter(rdb.Row.Field("State").Eq(t.StateDeleted).Not()).Run(a.conn); err == nil {
defer cursor.Close()
var user t.User
for cursor.Next(&user) {
users = append(users, user)
}
if err = cursor.Err(); err != nil {
return nil, err
}
} else {
cursor, err := rdb.DB(a.dbName).Table("users").GetAll(uids...).
Filter(rdb.Row.Field("State").Eq(t.StateDeleted).Not()).Run(a.conn)
if err != nil {
return nil, err
}
return users, nil
defer cursor.Close()
var user t.User
for cursor.Next(&user) {
users = append(users, user)
}
return users, cursor.Err()
}
// UserDelete deletes user record.
func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
var err error
// Get a list of topic names owned by the user (as 'grp' and 'chn').
ownTopics, err := a.topicNamesForUser(rdb.DB(a.dbName).Table("topics").
GetAllByIndex("Owner", uid.String()).Filter(rdb.Row.Field("State").Eq(t.StateDeleted).Not()).
Field("Id"), true)
if err != nil {
return err
}
if hard {
// User's devices are store in user record, no separate table.
// Delete user's subscriptions in all topics.
if err = a.subsDelForUser(uid, true); err != nil {
return err
}
// Can't delete user's messages in all topics because we cannot notify topics of such deletion.
// Or we have to delete these messages one by one.
// For now, just leave the messages marked as sent by "not found" user.
// Delete topics where the user is the owner:
// 1. Delete dellog
// 2. Decrement use counter of fileuploads: topic itself and messages.
// 3. Delete all messages.
// 4. Delete subscriptions.
if _, err = rdb.DB(a.dbName).Table("topics").GetAllByIndex("Owner", uid.String()).ForEach(
func(topic rdb.Term) rdb.Term {
return rdb.Expr([]any{
// Delete dellog
rdb.DB(a.dbName).Table("dellog").Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_DelId"}).Delete(),
// Decrement topic attachment UseCounter
rdb.DB(a.dbName).Table("fileuploads").GetAll(topic.Field("Attachments")).
Update(func(fu rdb.Term) any {
return map[string]any{"UseCount": fu.Field("UseCount").Default(1).Sub(1)}
}),
// Decrement message attachments UseCounter
rdb.DB(a.dbName).Table("fileuploads").GetAll(
rdb.Args(
rdb.DB(a.dbName).Table("messages").Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_SeqId"}).
// Fetch messages with attachments only
Filter(func(msg rdb.Term) rdb.Term {
return msg.HasFields("Attachments")
}).
// Flatten arrays
ConcatMap(func(row rdb.Term) any { return row.Field("Attachments") }).
CoerceTo("array"))).
Update(func(fu rdb.Term) any {
return map[string]any{"UseCount": fu.Field("UseCount").Default(1).Sub(1)}
}),
// Delete messages
rdb.DB(a.dbName).Table("messages").Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_SeqId"}).Delete(),
// Delete subscriptions
rdb.DB(a.dbName).Table("subscriptions").GetAllByIndex("Topic", topic.Field("Id")).Delete(),
})
}).RunWrite(a.conn); err != nil {
// Delete records of messages soft-deleted for the user in all topics
// and dellog entries.
if err = a.clearUserDellog(uid, nil); err != nil {
return err
}
// And finally delete the topics.
if _, err = rdb.DB(a.dbName).Table("topics").GetAllByIndex("Owner", uid.String()).
Delete().RunWrite(a.conn); err != nil {
return err
// Can't delete user's messages in all topics because we cannot notify topics of such deletion.
// Just leave the messages marked as sent by "not found" user.
// Delete topics where the user is the owner:
if len(ownTopics) > 0 {
// 1. Delete dellog
// 2. Decrement use counter of fileuploads: topic itself and messages.
// 3. Delete all messages.
// 4. Delete subscriptions.
if _, err = rdb.DB(a.dbName).Table("topics").GetAll(ownTopics...).ForEach(
func(topic rdb.Term) rdb.Term {
return rdb.Expr([]any{
// Delete dellog
rdb.DB(a.dbName).Table("dellog").Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_DelId"}).Delete(),
// Decrement topic attachment UseCounter
rdb.DB(a.dbName).Table("fileuploads").GetAll(topic.Field("Attachments")).
Update(func(fu rdb.Term) any {
return map[string]any{"UseCount": fu.Field("UseCount").Default(1).Sub(1)}
}),
// Decrement message attachments UseCounter
rdb.DB(a.dbName).Table("fileuploads").GetAll(
rdb.Args(
rdb.DB(a.dbName).Table("messages").Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_SeqId"}).
// Fetch messages with attachments only
Filter(func(msg rdb.Term) rdb.Term {
return msg.HasFields("Attachments")
}).
// Flatten arrays
ConcatMap(func(row rdb.Term) any { return row.Field("Attachments") }).
CoerceTo("array"))).
Update(func(fu rdb.Term) any {
return map[string]any{"UseCount": fu.Field("UseCount").Default(1).Sub(1)}
}),
// Delete messages
rdb.DB(a.dbName).Table("messages").Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_SeqId"}).Delete(),
// Delete subscriptions
rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("Topic", topic.Field("Id")).Delete(),
})
}).RunWrite(a.conn); err != nil {
return err
}
// And finally delete the topics.
if _, err = rdb.DB(a.dbName).Table("topics").GetAllByIndex("Owner", uid.String()).
Delete().RunWrite(a.conn); err != nil {
return err
}
}
// Delete user's authentication records.
@@ -886,33 +900,150 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
now := t.TimeNow()
disable := map[string]any{
"UpdatedAt": now, "State": t.StateDeleted, "StateAt": now,
"UpdatedAt": now,
"State": t.StateDeleted,
"StateAt": now,
}
if _, err = rdb.DB(a.dbName).Table("topics").GetAllByIndex("Owner", uid.String()).ForEach(
func(topic rdb.Term) rdb.Term {
return rdb.Expr([]any{
// Disable subscriptions for topics where the user is the owner.
rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("Topic", topic.Field("Id")).
Update(disable),
// Disable topics where the user is the owner.
rdb.DB(a.dbName).Table("topics").
Get(topic.Field("Id")).
Update(map[string]any{
"UpdatedAt": now, "TouchedAt": now, "State": t.StateDeleted, "StateAt": now,
}),
})
}).RunWrite(a.conn); err != nil {
disableSub := map[string]any{
"UpdatedAt": now,
"DeletedAt": now,
}
if len(ownTopics) > 0 {
// Disable all subscriptions in topics where the user is the owner.
if _, err = rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("Topic", ownTopics...).
Update(disableSub).
RunWrite(a.conn); err != nil {
return err
}
// Disable topics where the user is the owner.
if _, err = rdb.DB(a.dbName).Table("topics").
GetAll(ownTopics...).
Update(disable).
RunWrite(a.conn); err != nil {
return err
}
}
// Disable p2p topics with the user.
p2pTopics, err := a.p2pTopicsForUser(uid)
if err != nil {
return err
}
if len(p2pTopics) > 0 {
// Disable all subscriptions in p2p topics with the user.
if _, err = rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("Topic", p2pTopics...).
Update(disableSub).
RunWrite(a.conn); err != nil {
return err
}
// Disable p2p topics with the user.
if _, err = rdb.DB(a.dbName).Table("topics").
GetAll(p2pTopics...).
Update(disable).
RunWrite(a.conn); err != nil {
return err
}
}
// FIXME: disable p2p topics with the user.
_, err = rdb.DB(a.dbName).Table("users").Get(uid.String()).Update(disable).RunWrite(a.conn)
// Disable the user (same fields as topic).
_, err = rdb.DB(a.dbName).Table("users").Get(uid.String()).
Update(disable).RunWrite(a.conn)
}
return err
}
// Delete records of messages soft-deleted for the user in all topics.
func (a *adapter) clearUserDellog(uid t.Uid, topics []any) error {
var err error
forUser := uid.String()
if topics == nil {
// Get a list of all topics where the user has subscriptions.
topics, err = a.topicNamesForUser(rdb.DB(a.dbName).
Table("subscriptions").
GetAllByIndex("User", forUser).
Field("Topic"), false)
if err != nil {
return err
}
}
// No need to convert channel names to group names:
// channel readers cannot delete messages.
// Remove current user from the messages' soft-deletion lists
// in all topics where the user has subscriptions.
_, err = rdb.DB(a.dbName).Table("topics").GetAll(topics...).
ForEach(func(topic rdb.Term) rdb.Term {
return rdb.DB(a.dbName).Table("messages").Between(
[]any{topic.Field("Id"), forUser, rdb.MinVal},
[]any{topic.Field("Id"), forUser, rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_DeletedFor"}).
Update(map[string]any{
// Take the DeletedFor array, subtract all values which contain current user ID in 'User' field.
"DeletedFor": rdb.Row.Field("DeletedFor").
SetDifference(
rdb.Row.Field("DeletedFor").
Filter(map[string]any{"User": forUser}))})
}).RunWrite(a.conn)
if err != nil {
return err
}
// Delete entries in dellog for this user in all topics where the user
// has subscriptions.
_, err = rdb.DB(a.dbName).Table("topics").GetAll(topics...).
ForEach(func(topic rdb.Term) rdb.Term {
return rdb.DB(a.dbName).Table("dellog").
// Select all log entries for the given table.
Between(
[]any{topic.Field("Id"), rdb.MinVal},
[]any{topic.Field("Id"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_DelId"}).
// Keep entries soft-deleted for the current user only.
Filter(rdb.Row.Field("DeletedFor").Eq(forUser)).
// Delete them.
Delete()
}).RunWrite(a.conn)
return err
}
// topicNamesForUser returns a list of topic names by query.
func (a *adapter) topicNamesForUser(query rdb.Term, includeChan bool) ([]any, error) {
cursor, err := query.Run(a.conn)
if err != nil {
return nil, err
}
defer cursor.Close()
var result []string
if err = cursor.All(&result); err != nil {
return nil, err
}
var args []any
for _, name := range result {
args = append(args, name)
if includeChan {
// Append 'chn' topic names for each 'grp' name.
if channel := t.GrpToChn(name); channel != "" {
args = append(args, channel)
}
}
}
return args, nil
}
func (a *adapter) p2pTopicsForUser(uid t.Uid) ([]any, error) {
return a.topicNamesForUser(rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("User", uid.String()).
Field("Topic").
Filter(rdb.Row.Field("Topic").Match("^p2p")), false)
}
// topicStateForUser is called by UserUpdate when the update contains state change.
func (a *adapter) topicStateForUser(uid t.Uid, now time.Time, update any) error {
state, ok := update.(t.ObjState)
@@ -1039,6 +1170,7 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) {
// UserUnreadCount returns the total number of unread messages in all topics with
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
// UserUnreadCount does not count unread messages in channels although it should.
func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
// The call expects user IDs to be plain strings like "356zaYaumiU".
uids := make([]any, len(ids))
@@ -1147,8 +1279,6 @@ func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time, limit int) ([]
return uids, err
}
// *****************************
// TopicCreate creates a topic from template
func (a *adapter) TopicCreate(topic *t.Topic) error {
_, err := rdb.DB(a.dbName).Table("topics").Insert(&topic).RunWrite(a.conn)
@@ -1193,7 +1323,6 @@ func (a *adapter) TopicCreateP2P(initiator, invited *t.Subscription) error {
topic := &t.Topic{ObjHeader: t.ObjHeader{Id: initiator.Topic}}
topic.ObjHeader.MergeTimes(&initiator.ObjHeader)
topic.TouchedAt = initiator.GetTouchedAt()
topic.SubCnt = 2
return a.TopicCreate(topic)
}
@@ -1215,27 +1344,30 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) {
// The cursor is automatically closed by executing cursor.One.
// Topic found, get subsription count. Try both topic and channel names.
if cursor, err = rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("Topic", topic, t.GrpToChn(topic)).
Filter(rdb.Row.HasFields("DeletedAt").Not()).
Count().Run(a.conn); err != nil {
return nil, err
}
subCnt := 0
if err = cursor.One(&subCnt); err != nil {
return nil, err
}
// No need to close the cursor.
if subCnt != tt.SubCnt {
// Update the topic with the correct subscription count.
tt.SubCnt = subCnt
if _, err = rdb.DB(a.dbName).Table("topics").Get(topic).
Update(map[string]any{"SubCnt": subCnt}).RunWrite(a.conn); err != nil {
if t.GetTopicCat(topic) == t.TopicCatGrp {
// Topic found, get subsription count. Try both topic and channel names.
if cursor, err = rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("Topic", topic, t.GrpToChn(topic)).
Filter(rdb.Row.HasFields("DeletedAt").Not()).
Count().Run(a.conn); err != nil {
return nil, err
}
subCnt := 0
if err = cursor.One(&subCnt); err != nil {
return nil, err
}
// No need to close the cursor.
if subCnt != tt.SubCnt {
// Update the topic with the correct subscription count.
tt.SubCnt = subCnt
if _, err = rdb.DB(a.dbName).Table("topics").Get(topic).
Update(map[string]any{"SubCnt": subCnt}).RunWrite(a.conn); err != nil {
return nil, err
}
}
}
return tt, nil
}
@@ -1305,15 +1437,13 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
usrq = append(usrq, uid1.String())
sub.SetWith(uid1.UserId())
}
topq = append(topq, tname)
} else {
// Group or sys subscription.
if tcat == t.TopicCatGrp {
// Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
}
topq = append(topq, tname)
} else if tcat == t.TopicCatGrp {
// Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
}
// No special handling needed for 'slf', 'sys' subscriptions.
topq = append(topq, tname)
join[tname] = sub
}
err = cursor.Err()
@@ -1360,6 +1490,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
sub.SetTouchedAt(top.TouchedAt)
sub.SetSeqId(top.SeqId)
if t.GetTopicCat(sub.Topic) == t.TopicCatGrp {
sub.SetSubCnt(top.SubCnt)
sub.SetPublic(top.Public)
sub.SetTrusted(top.Trusted)
}
@@ -1418,7 +1549,9 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
return common.SelectEarliestUpdatedSubs(subs, opts, a.maxResults), nil
}
// UsersForTopic loads users subscribed to the given topic
// UsersForTopic loads users subscribed to the given topic (not channel readers).
// The difference between UsersForTopic vs SubsForTopic is that the former loads user.Public,
// the latter does not.
func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
tcat := t.GetTopicCat(topic)
@@ -1564,7 +1697,7 @@ func (a *adapter) ChannelsForUser(uid t.Uid) ([]string, error) {
return names, nil
}
// TopicShare creates topic subscriptions.
// TopicShare adds subscriptions to a topic and increments the topic's subcnt.
func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error {
// Assign Ids.
for _, sub := range shares {
@@ -1589,7 +1722,7 @@ func (a *adapter) TopicShare(topic string, shares []*t.Subscription) error {
if err == nil && topic != "" {
_, err = rdb.DB(a.dbName).Table("topics").
Get(topic).
Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").Default(0).Sub(len(shares))}).
Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").Default(0).Add(len(shares))}).
RunWrite(a.conn)
}
return err
@@ -1628,7 +1761,6 @@ func (a *adapter) TopicDelete(topic string, isChan, hard bool) error {
// TopicUpdateOnMessage deserializes message-related values into topic.
func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error {
update := struct {
SeqId int
TouchedAt time.Time
@@ -1640,6 +1772,19 @@ func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error {
return err
}
// TopicUpdateSubCnt updates subscriber count denormalized in topic.
func (a *adapter) TopicUpdateSubCnt(topic string) error {
_, err := rdb.DB(a.dbName).Table("topics").
Get(topic).
Update(map[string]any{
"SubCnt": rdb.Table("subscriptions").
GetAllByIndex("Topic", topic, t.GrpToChn(topic)).
Filter(rdb.Row.HasFields("DeletedAt").Not()).
Count(),
}).RunWrite(a.conn)
return err
}
// TopicUpdate performs a generic topic update.
func (a *adapter) TopicUpdate(topic string, update map[string]any) error {
if t, u := update["TouchedAt"], update["UpdatedAt"]; t == nil && u != nil {
@@ -1757,18 +1902,30 @@ func (a *adapter) SubsUpdate(topic string, user t.Uid, update map[string]any) er
return err
}
// SubsDelete marks subscription as deleted.
// SubsDelete marks at most one subscription as deleted.
func (a *adapter) SubsDelete(topic string, user t.Uid) error {
now := t.TimeNow()
forUser := user.String()
// Mark subscription as deleted.
_, err := rdb.DB(a.dbName).Table("subscriptions").
res, err := rdb.DB(a.dbName).Table("subscriptions").
Get(topic + ":" + forUser).Update(map[string]any{
"UpdatedAt": now,
"DeletedAt": now,
}).RunWrite(a.conn)
if err != nil {
return err
}
if res.Replaced == 0 {
// Nothing was updated, nothing more to do.
return t.ErrNotFound
}
// Decrement topic's SubCnt.
_, err = rdb.DB(a.dbName).Table("topics").Get(topic).
Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").Default(1).Sub(1)}).
RunWrite(a.conn)
if err != nil {
return err
}
@@ -1840,49 +1997,28 @@ func (a *adapter) subsDelForTopic(topic string, isChan, hard bool) error {
return err
}
// subsDelForUser deletes or marks all subscriptions of a given user as deleted
// subsDelForUser marks all subscriptions of a given user as deleted.
func (a *adapter) subsDelForUser(user t.Uid, hard bool) error {
var err error
forUser := user.String()
// Iterate over user's subscriptions.
_, err = rdb.DB(a.dbName).Table("subscriptions").GetAllByIndex("User", forUser).ForEach(
func(sub rdb.Term) rdb.Term {
return rdb.Expr([]any{
// Delete dellog
rdb.DB(a.dbName).Table("dellog").
// Select all log entries for the given table.
Between(
[]any{sub.Field("Topic"), rdb.MinVal},
[]any{sub.Field("Topic"), rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_DelId"}).
// Keep entries soft-deleted for the current user only.
Filter(func(dellog rdb.Term) rdb.Term {
return dellog.Field("DeletedFor").Eq(forUser)
}).
// Delete them.
Delete(),
// Remove user from the messages' soft-deletion lists.
rdb.DB(a.dbName).Table("messages").
// Select user's soft-deleted messages in the current table.
Between(
[]any{sub.Field("Topic"), forUser, rdb.MinVal},
[]any{sub.Field("Topic"), forUser, rdb.MaxVal},
rdb.BetweenOpts{Index: "Topic_DeletedFor"}).
// Update the field DeletedFor:
Update(func(msg rdb.Term) any {
return map[string]any{
// Take the array, subtract all values with the current user ID.
"DeletedFor": msg.Field("DeletedFor").
SetDifference(
msg.Field("DeletedFor").
Filter(map[string]any{"User": forUser})),
}
}),
})
}).RunWrite(a.conn)
// Get all topics the user is subscribed to. Channels are left as channels.
topics, err := a.topicNamesForUser(rdb.DB(a.dbName).Table("subscriptions").
GetAllByIndex("User", forUser), false)
if err != nil {
return err
}
// 1. Decrement SubCnt in topic.
if _, err = rdb.DB(a.dbName).Table("topics").Get(topics...).
Update(map[string]any{"SubCnt": rdb.Row.Field("SubCnt").
Default(1).Sub(1)}).
RunWrite(a.conn); err != nil {
return err
}
err = a.clearUserDellog(user, topics)
if err != nil {
return err
}
@@ -2014,7 +2150,6 @@ func (a *adapter) Find(caller, promoPrefix string, req [][]string, opt []string,
}
return subs, cursor.Err()
}
// FindOne returns topic or user which matches the given tag.
+1
View File
@@ -625,6 +625,7 @@ func replyOfflineTopicGetDesc(sess *Session, msg *ClientComMessage) {
desc.Public = stopic.Public
desc.Trusted = stopic.Trusted
desc.IsChan = stopic.UseBt
desc.SubCnt = stopic.SubCnt
if stopic.Owner == msg.AsUser {
desc.DefaultAcs = &MsgDefaultAcsMode{
Auth: stopic.Access.Auth.String(),
+3
View File
@@ -615,6 +615,8 @@ func initTopicNewGrp(t *Topic, sreg *ClientComMessage, isChan bool) error {
}
t.xoriginal = t.name // keeping 'new' or 'nch' as original has no value to the client
t.subCnt = 1 // One subscription, the owner.
pktsub.Created = true
pktsub.Newsub = true
@@ -659,6 +661,7 @@ func initTopicGrp(t *Topic) error {
}
t.lastID = stopic.SeqId
t.delID = stopic.DelId
t.subCnt = stopic.SubCnt
// Initialize channel for receiving session online updates.
t.supd = make(chan *sessionUpdate, 32)
+14
View File
@@ -922,6 +922,20 @@ func (mr *MockTopicsPersistenceInterfaceMockRecorder) Update(topic, update inter
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockTopicsPersistenceInterface)(nil).Update), topic, update)
}
// UpdateSubCnt mocks base method.
func (m *MockTopicsPersistenceInterface) UpdateSubCnt(topic string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateSubCnt", topic)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateSubCnt indicates an expected call of UpdateSubCnt.
func (mr *MockTopicsPersistenceInterfaceMockRecorder) UpdateSubCnt(topic interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSubCnt", reflect.TypeOf((*MockTopicsPersistenceInterface)(nil).UpdateSubCnt), topic)
}
// MockSubsPersistenceInterface is a mock of SubsPersistenceInterface interface.
type MockSubsPersistenceInterface struct {
ctrl *gomock.Controller
+8 -1
View File
@@ -511,6 +511,7 @@ type TopicsPersistenceInterface interface {
GetSubs(topic string, opts *types.QueryOpt) ([]types.Subscription, error)
GetSubsAny(topic string, opts *types.QueryOpt) ([]types.Subscription, error)
Update(topic string, update map[string]any) error
UpdateSubCnt(topic string) error
OwnerChange(topic string, newOwner types.Uid) error
Delete(topic string, isChan, hard bool) error
}
@@ -585,6 +586,11 @@ func (topicsMapper) GetSubsAny(topic string, opts *types.QueryOpt) ([]types.Subs
return adp.SubsForTopic(topic, true, opts)
}
// UpdateSubCnt refreshes subscriber count value denormalized in topic.
func (topicsMapper) UpdateSubCnt(topic string) error {
return adp.TopicUpdateSubCnt(topic)
}
// Update is a generic topic update.
func (topicsMapper) Update(topic string, update map[string]any) error {
if _, ok := update["UpdatedAt"]; !ok {
@@ -652,7 +658,8 @@ func (subsMapper) Update(topic string, user types.Uid, update map[string]any) er
return adp.SubsUpdate(topic, user, update)
}
// Delete deletes a subscription
// Delete deletes a subscription.
// To delete channel subscription the channel name must be explicitly specified.
func (subsMapper) Delete(topic string, user types.Uid) error {
return adp.SubsDelete(topic, user)
}
+6 -3
View File
@@ -207,6 +207,8 @@ func ParseUserId(s string) Uid {
}
// GrpToChn converts group topic name to corresponding channel name.
// If it's a non-group channel topic, the name is returned unchanged.
// If it's neither, an empty string is returned.
func GrpToChn(grp string) string {
if strings.HasPrefix(grp, "grp") {
return strings.Replace(grp, "grp", "chn", 1)
@@ -227,6 +229,7 @@ func IsChannel(name string) bool {
// ChnToGrp gets group topic name from channel name.
// If it's a non-channel group topic, the name is returned unchanged.
// If it's neither, an empty string is returned.
func ChnToGrp(chn string) string {
if strings.HasPrefix(chn, "chn") {
return strings.Replace(chn, "chn", "grp", 1)
@@ -925,7 +928,7 @@ type Subscription struct {
// Timestamp & user agent of when the user was last online.
lastSeenUA *LastSeenUA
// Group topics only: count of subscribers.
// Count of subscribers.
subCnt int
// P2P only. ID of the other user
@@ -1126,7 +1129,7 @@ type Topic struct {
DelId int
// Count of topic subscribers.
SubCnt int `json:"SubCnt,omitempty" bson:",omitempty"`
SubCnt int
Public any
Trusted any
@@ -1388,7 +1391,7 @@ func GetTopicCat(name string) TopicCat {
}
// IsEphemeralTopic checks if the topic is ephemeral, i.e. it's a reference to the user,
// it's not stored in the 'topics' table.
// it's not stored in the 'topics' table like 'me' or 'fnd' topics.
func IsEphemeralTopic(topic string) bool {
cat := GetTopicCat(topic)
return cat == TopicCatMe || cat == TopicCatFnd
+35 -5
View File
@@ -47,6 +47,10 @@ type Topic struct {
// ID of the deletion operation. Not an ID of the message.
delID int
// Total count of subscribers (excluding deleted).
// This is different from subsCount() for channels.
subCnt int
// Last published userAgent ('me' topic only)
userAgent string
@@ -546,6 +550,13 @@ func (t *Topic) handleTopicTermination(sd *shutDown) {
s.detachSession(t.name)
}
if t.cat == types.TopicCatGrp {
// Update topic subscriber count.
if err := store.Topics.UpdateSubCnt(t.name); err != nil {
logs.Warn.Println("topic update sub cnt:", err)
}
}
usersRegisterTopic(t, false)
// Report completion back to sender, if 'done' is not nil.
@@ -807,7 +818,7 @@ func (t *Topic) handleLeaveRequest(msg *ClientComMessage, sess *Session) {
if !meUid.IsZero() {
// Update user's last online timestamp & user agent. Only one user can be subscribed to 'me' topic.
if err := store.Users.UpdateLastSeen(meUid, mrs.userAgent, now); err != nil {
logs.Warn.Println(err)
logs.Warn.Println("user update last seen:", err)
}
}
case types.TopicCatFnd:
@@ -1377,8 +1388,8 @@ func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error {
asUid := types.ParseUserId(msg.AsUser)
if !msgsub.Newsub && (t.cat == types.TopicCatP2P || t.cat == types.TopicCatGrp || t.cat == types.TopicCatSys) {
// Check if this is a new subscription.
if !msgsub.Newsub && (t.cat == types.TopicCatP2P || t.cat == types.TopicCatGrp) {
// Check if this is a new subscription (P2P & GRP only. SLF, SYS are excluded here).
pud, found := t.perUser[asUid]
msgsub.Newsub = !found || pud.deleted
}
@@ -1429,6 +1440,11 @@ func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error {
userData.online++
t.perUser[asUid] = userData
}
if t.cat == types.TopicCatGrp && msgsub.Newsub {
// Increment subscriber count for new group subscriptions only.
t.subCnt++
}
}
params := map[string]any{}
@@ -2087,8 +2103,11 @@ func (t *Topic) replyGetDesc(sess *Session, asUid types.Uid, _ bool, opts *MsgGe
pud, full := t.perUser[asUid]
full = full || t.cat == types.TopicCatMe
if t.cat == types.TopicCatGrp {
desc.IsChan = t.isChan
desc.SubCnt = t.subCnt
logs.Info.Println("replyGetDesc: grp topic", t.name, "subs", t.subCnt)
}
if ifUpdated {
@@ -2601,6 +2620,8 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level
UserAgent: sub.GetUserAgent(),
}
}
mts.SubCnt = sub.GetSubCnt()
}
} else {
// Mark subscriptions that the user does not care about.
@@ -2631,6 +2652,7 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level
if !sub.UpdatedAt.IsZero() {
mts.UpdatedAt = &sub.UpdatedAt
}
if isReader && !banned {
mts.ReadSeqId = sub.ReadSeqId
mts.RecvSeqId = sub.RecvSeqId
@@ -2662,6 +2684,7 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level
mts.Acs.Mode = defacs.Auth.String()
}
}
mts.SubCnt = sub.GetSubCnt()
}
// Returning public and private only if they have changed since ifModified
@@ -3393,7 +3416,7 @@ func (t *Topic) replyDelSub(sess *Session, asUid types.Uid, msg *ClientComMessag
return nil
}
// replyLeaveUnsub is request to unsubscribe user and detach all user's sessions from topic.
// replyLeaveUnsub is a request to unsubscribe user and detach all user's sessions from topic.
func (t *Topic) replyLeaveUnsub(sess *Session, msg *ClientComMessage, asUid types.Uid) error {
now := types.TimeNow()
@@ -3467,6 +3490,11 @@ func (t *Topic) replyLeaveUnsub(sess *Session, msg *ClientComMessage, asUid type
// Notify plugins.
pluginSubscription(&types.Subscription{Topic: t.name, User: asUid.String()}, plgActDel)
if t.cat == types.TopicCatGrp {
// Decrement group's cached member count.
t.subCnt--
}
// If all P2P users were deleted, suspend the topic to let it shut down.
if t.cat == types.TopicCatP2P && t.subsCount() == 0 {
t.markPaused(true)
@@ -3824,7 +3852,9 @@ func (t *Topic) accessFor(authLvl auth.Level) types.AccessMode {
return selectAccessMode(authLvl, t.accessAnon, t.accessAuth, getDefaultAccess(t.cat, true, false))
}
// subsCount returns the number of topic subscribers
// subsCount returns the number of topic subscribers. This method is different from subCnt with respect to channels:
// * subsCount counts subscribers + attached channel users.
// * subCnt counts all subscribers (including all channel users).
func (t *Topic) subsCount() int {
if t.cat == types.TopicCatP2P {
count := 0