/****************************************************************************** * * Description : * An isolated communication channel (chat room, 1:1 conversation) for * usually multiple users. There is no communication across topics. * *****************************************************************************/ package main import ( "errors" "sort" "strings" "sync/atomic" "time" "github.com/tinode/chat/server/auth" "github.com/tinode/chat/server/logs" "github.com/tinode/chat/server/store" "github.com/tinode/chat/server/store/types" ) // Topic is an isolated communication channel type Topic struct { // Đ•xpanded/unique name of the topic. name string // For single-user topics session-specific topic name, such as 'me', // otherwise the same as 'name'. xoriginal string // Topic category cat types.TopicCat // Name of the master node for this topic if isProxy is true. masterNode string // Time when the topic was first created. created time.Time // Time when the topic was last updated. updated time.Time // Time of the last outgoing message. touched time.Time // Server-side ID of the last data message lastID int // ID of the deletion operation. Not an ID of the message. delID int // Total count of subscribers (excluding deleted). // This is different from subsCount() for channels. subCnt int // Last published userAgent ('me' topic only) userAgent string // User ID of the topic owner/creator. Could be zero. owner types.Uid // Default access mode accessAuth types.AccessMode accessAnon types.AccessMode // Topic discovery tags tags []string // Auxiliary set of key-value pairs aux map[string]any // Topic's public data public any // Topic's trusted data trusted any // Topic's per-subscriber data perUser map[types.Uid]perUserData // Union of permissions across all users (used by proxy sessions with uid = 0). // These are used by master topics only (in the proxy-master topic context) // as a coarse-grained attempt to perform acs checks since proxy sessions "impersonate" // multiple normal sessions (uids) which may have different uids. modeWantUnion types.AccessMode modeGivenUnion types.AccessMode // User's contact list (not nil for 'me' topic only). // The map keys are UserIds for P2P topics and grpXXX for group topics. perSubs map[string]perSubsData // Sessions attached to this topic. The UID kept here may not match Session.uid if session is // subscribed on behalf of another user. sessions map[*Session]perSessionData // Present video call data. Null when there's no call in progress or being established. // Only available for p2p topics. currentCall *videoCall // Channel for receiving client messages from sessions or other topics, buffered = 256. clientMsg chan *ClientComMessage // Channel for receiving server messages generated on the server or received from other cluster nodes, buffered = 64. serverMsg chan *ServerComMessage // Channel for receiving {get}/{set}/{del} requests, buffered = 64 meta chan *ClientComMessage // Subscribe requests from sessions, buffered = 256 reg chan *ClientComMessage // Unsubscribe requests from sessions, buffered = 256 unreg chan *ClientComMessage // Session updates: background sessions coming online, User Agent changes. Buffered = 32 supd chan *sessionUpdate // Channel to terminate topic -- either the topic is deleted or system is being shut down. Buffered = 1. exit chan *shutDown // Channel to receive topic master responses (used only by proxy topics). proxy chan *ClusterResp // Channel to receive topic proxy service requests, e.g. sending deferred notifications. master chan *ClusterSessUpdate // Flag which tells topic lifecycle status: new, ready, paused, marked for deletion. status int32 // Channel functionality is enabled for the group topic. isChan bool // If isProxy == true, the actual topic is hosted by another cluster member. // The topic should: // 1. forward all messages to master // 2. route replies from the master to sessions. // 3. disconnect sessions at master's request. // 4. shut down the topic at master's request. // 5. aggregate access permissions on behalf of attached sessions. isProxy bool // Countdown timer for destroying the topic when there are no more attached sessions to it. killTimer *time.Timer // Countdown timer for terminating iniatated (but not established) calls. callEstablishmentTimer *time.Timer } // perUserData holds topic's cache of per-subscriber data type perUserData struct { // Count of subscription online and announced (presence not deferred). online int // Last t.lastId reported by user through {pres} as received or read recvID int readID int // ID of the latest Delete operation delID int private any modeWant types.AccessMode modeGiven types.AccessMode // P2P only: public any trusted any lastSeen *time.Time lastUA string topicName string deleted bool // The user is a channel subscriber. isChan bool } // perSubsData holds user's (on 'me' topic) cache of subscription data type perSubsData struct { // The other user's/topic's online status as seen by this user. online bool // True if we care about the updates from the other user/topic: (want&given).IsPresencer(). // Does not affect sending notifications from this user to other users. enabled bool } // Data related to a subscription of a session to a topic. type perSessionData struct { // ID of the subscribed user (asUid); not necessarily the session owner. // Could be zero for multiplexed sessions in cluster. uid types.Uid // This is a channel subscription isChanSub bool // IDs of subscribed users in a multiplexing session. muids []types.Uid } // Reasons why topic is being shut down. const ( // StopNone no reason given/default. StopNone = iota // StopShutdown terminated due to system shutdown. StopShutdown // StopDeleted terminated due to being deleted. StopDeleted // StopRehashing terminated due to cluster rehashing (moved to a different node). StopRehashing ) // Topic shutdown type shutDown struct { // Channel to report back completion of topic shutdown. Could be nil done chan<- bool // Topic is being deleted as opposite to total system shutdown reason int } // Session update: user agent change or background session becoming normal. // If sess is nil then user agent change, otherwise bg to fg update. type sessionUpdate struct { sess *Session userAgent string } var ( nilPresParams = &presParams{} nilPresFilters = &presFilters{} ) func (t *Topic) run(hub *Hub) { if !t.isProxy { t.runLocal(hub) } else { t.runProxy(hub) } } // getPerUserAcs returns `want` and `given` permissions for the given user id. func (t *Topic) getPerUserAcs(uid types.Uid) (types.AccessMode, types.AccessMode) { if uid.IsZero() { // For zero uids (typically for proxy sessions), return the union of all permissions. return t.modeWantUnion, t.modeGivenUnion } pud := t.perUser[uid] return pud.modeWant, pud.modeGiven } // passesPresenceFilters applies presence filters to `msg` // depending on per-user want and given acls for the provided `uid`. func (t *Topic) passesPresenceFilters(pres *MsgServerPres, uid types.Uid) bool { modeWant, modeGiven := t.getPerUserAcs(uid) // "gone" and "acs" notifications are sent even if the topic is muted. return ((modeGiven & modeWant).IsPresencer() || pres.What == "gone" || pres.What == "acs") && (pres.FilterIn == 0 || int(modeGiven&modeWant)&pres.FilterIn != 0) && (pres.FilterOut == 0 || int(modeGiven&modeWant)&pres.FilterOut == 0) } // userIsReader returns true if the user (specified by `uid`) may read the given topic. func (t *Topic) userIsReader(uid types.Uid) bool { modeWant, modeGiven := t.getPerUserAcs(uid) return (modeGiven & modeWant).IsReader() } // prepareBroadcastableMessage sets the topic field in `msg` depending on the uid and subscription type. func (t *Topic) prepareBroadcastableMessage(msg *ServerComMessage, uid types.Uid, isChanSub bool) { // We are only interested in broadcastable messages. if msg.Data == nil && msg.Pres == nil && msg.Info == nil { return } if (t.cat == types.TopicCatP2P && !uid.IsZero()) || (t.cat == types.TopicCatGrp && t.isChan) { // For p2p topics topic name is dependent on receiver. // Channel topics may be presented as grpXXX or chnXXX. var topicName string if isChanSub { topicName = types.GrpToChn(t.xoriginal) } else { topicName = t.original(uid) } switch { case msg.Data != nil: msg.Data.Topic = topicName case msg.Pres != nil: msg.Pres.Topic = topicName case msg.Info != nil: msg.Info.Topic = topicName } } // Send channel messages anonymously. if isChanSub && msg.Data != nil { msg.Data.From = "" } } // computePerUserAcsUnion computes want and given permissions unions over all topic's subscribers. func (t *Topic) computePerUserAcsUnion() { wantUnion := types.ModeNone givenUnion := types.ModeNone for _, pud := range t.perUser { if pud.isChan { continue } wantUnion |= pud.modeWant givenUnion |= pud.modeGiven } if t.isChan { // Apply standard channel permissions to channel topics. wantUnion |= types.ModeCChnReader givenUnion |= types.ModeCChnReader } t.modeWantUnion = wantUnion t.modeGivenUnion = givenUnion } // unregisterSession implements all logic following receipt of a leave // request via the Topic.unreg channel. func (t *Topic) unregisterSession(msg *ClientComMessage) { if t.currentCall != nil { shouldTerminateCall := false if msg.sess.isMultiplex() { // Check if any of the call party sessions is multiplexed over msg.sess. for _, p := range t.currentCall.parties { if p.sess.isProxy() && p.sess.multi == msg.sess { shouldTerminateCall = true break } } } else if _, found := t.currentCall.parties[msg.sess.sid]; found { // Normal session disconnecting from topic. Just terminate the call. shouldTerminateCall = true } if shouldTerminateCall { t.terminateCallInProgress(false) } } t.handleLeaveRequest(msg, msg.sess) if msg.init && msg.sess.inflightReqs != nil { // If it's a client initiated request. msg.sess.inflightReqs.Done() } // If there are no more subscriptions to this topic, start a kill timer if len(t.sessions) == 0 && t.cat != types.TopicCatSys { t.killTimer.Reset(idleMasterTopicTimeout) } } // registerSession handles a session join (registration) request // received via the Topic.reg channel. func (t *Topic) registerSession(msg *ClientComMessage) { // Request to add a connection to this topic if t.isInactive() { msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow())) } else if msg.sess.getSub(t.name) != nil { // Session is already subscribed to topic. Subscription is checked in session.go, // but there is a gap between topic creation/un-pausing and processing the // first subscription request, before the topic is linked to session: a client // may send several subscription requests in that gap. msg.sess.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp)) } else { // The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die // while processing the call. t.killTimer.Stop() if err := t.handleSubscription(msg); err == nil { if msg.Sub.Created { // Call plugins with the new topic pluginTopic(t, plgActCreate) } } else { if len(t.sessions) == 0 && t.cat != types.TopicCatSys { // Failed to subscribe, the topic is still inactive t.killTimer.Reset(idleMasterTopicTimeout) } logs.Warn.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, msg.sess.sid) } } if msg.sess.inflightReqs != nil { msg.sess.inflightReqs.Done() } } func (t *Topic) handleMetaGet(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) { if msg.MetaWhat&constMsgMetaDesc != 0 { if err := t.replyGetDesc(msg.sess, asUid, asChan, msg.Get.Desc, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Desc failed: %s", t.name, err) } } if msg.MetaWhat&constMsgMetaSub != 0 { if err := t.replyGetSub(msg.sess, asUid, authLevel, asChan, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Sub failed: %s", t.name, err) } } if msg.MetaWhat&constMsgMetaData != 0 { if err := t.replyGetData(msg.sess, asUid, asChan, msg.Get.Data, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Data failed: %s", t.name, err) } } if msg.MetaWhat&constMsgMetaDel != 0 { if err := t.replyGetDel(msg.sess, asUid, msg.Get.Del, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Del failed: %s", t.name, err) } } if msg.MetaWhat&constMsgMetaTags != 0 { if err := t.replyGetTags(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Tags failed: %s", t.name, err) } } if msg.MetaWhat&constMsgMetaCred != 0 { if err := t.replyGetCreds(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Creds failed: %s", t.name, err) } } if msg.MetaWhat&constMsgMetaAux != 0 { logs.Warn.Printf("topic[%s] handle getAux", t.name) if err := t.replyGetAux(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Get.Aux failed: %s", t.name, err) } } } func (t *Topic) handleMetaSet(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) { if msg.MetaWhat&constMsgMetaDesc != 0 { if err := t.replySetDesc(msg.sess, asUid, asChan, authLevel, msg); err == nil { // Notify plugins of the update pluginTopic(t, plgActUpd) } else { logs.Warn.Printf("topic[%s] meta.Set.Desc failed: %v", t.name, err) } } if msg.MetaWhat&constMsgMetaSub != 0 { if err := t.replySetSub(msg.sess, msg, asChan); err != nil { logs.Warn.Printf("topic[%s] meta.Set.Sub failed: %v", t.name, err) } } if msg.MetaWhat&constMsgMetaTags != 0 { if err := t.replySetTags(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Set.Tags failed: %v", t.name, err) } } if msg.MetaWhat&constMsgMetaCred != 0 { if err := t.replySetCred(msg.sess, asUid, authLevel, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Set.Cred failed: %v", t.name, err) } } if msg.MetaWhat&constMsgMetaAux != 0 { if err := t.replySetAux(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] meta.Set.Aux failed: %v", t.name, err) } } } func (t *Topic) handleMetaDel(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) { var err error switch msg.MetaWhat { case constMsgDelMsg: err = t.replyDelMsg(msg.sess, asUid, asChan, msg) case constMsgDelSub: err = t.replyDelSub(msg.sess, asUid, msg) case constMsgDelTopic: err = t.replyDelTopic(msg.sess, asUid, msg) case constMsgDelCred: err = t.replyDelCred(msg.sess, asUid, authLevel, msg) } if err != nil { logs.Warn.Printf("topic[%s] meta.Del failed: %v", t.name, err) } } // handleMeta implements logic handling meta requests // received via the Topic.meta channel. func (t *Topic) handleMeta(msg *ClientComMessage) { // Request to get/set topic metadata asUid := types.ParseUserId(msg.AsUser) authLevel := auth.Level(msg.AuthLvl) asChan, err := t.verifyChannelAccess(msg.Original) if err != nil { // User should not be able to address non-channel topic as channel. msg.sess.queueOut(ErrNotFoundReply(msg, types.TimeNow())) return } switch { case msg.Get != nil: // Get request t.handleMetaGet(msg, asUid, asChan, authLevel) case msg.Set != nil: // Set request t.handleMetaSet(msg, asUid, asChan, authLevel) case msg.Del != nil: // Del request t.handleMetaDel(msg, asUid, asChan, authLevel) } } func (t *Topic) handleSessionUpdate(upd *sessionUpdate, currentUA *string, uaTimer *time.Timer) { if upd.sess != nil { // 'me' & 'grp' only. Background session timed out and came online. t.sessToForeground(upd.sess) } else if *currentUA != upd.userAgent { if t.cat != types.TopicCatMe { logs.Warn.Panicln("invalid topic category in UA update", t.name) } // 'me' only. Process an update to user agent from one of the sessions. *currentUA = upd.userAgent uaTimer.Reset(uaTimerDelay) } } func (t *Topic) handleUATimerEvent(currentUA string) { // Publish user agent changes after a delay if currentUA == "" || currentUA == t.userAgent { return } t.userAgent = currentUA t.presUsersOfInterest("ua", t.userAgent) } func (t *Topic) handleTopicTimeout(hub *Hub, currentUA string, uaTimer, defrNotifTimer *time.Timer) { // Topic timeout hub.unreg <- &topicUnreg{rcptTo: t.name} defrNotifTimer.Stop() switch t.cat { case types.TopicCatMe: uaTimer.Stop() t.presUsersOfInterest("off", currentUA) case types.TopicCatGrp: t.presSubsOffline("off", nilPresParams, nilPresFilters, nilPresFilters, "", false) } } func (t *Topic) handleTopicTermination(sd *shutDown) { // Handle four cases: // 1. Topic is shutting down by timer due to inactivity (reason == StopNone) // 2. Topic is being deleted (reason == StopDeleted) // 3. System shutdown (reason == StopShutdown, done != nil). // 4. Cluster rehashing (reason == StopRehashing) switch sd.reason { case StopDeleted: if t.cat == types.TopicCatGrp { t.presSubsOffline("gone", nilPresParams, nilPresFilters, nilPresFilters, "", false) } // P2P users get "off+remove" earlier in the process // Inform plugins that the topic is deleted pluginTopic(t, plgActDel) case StopRehashing: // Must send individual messages to sessions because normal sending through the topic's // broadcast channel won't work - it will be shut down too soon. t.presSubsOnlineDirect("term", nilPresParams, nilPresFilters, "") } // In case of a system shutdown don't bother with notifications. They won't be delivered anyway. // Tell sessions to remove the topic for s := range t.sessions { s.detachSession(t.name) } if t.cat == types.TopicCatGrp { // Update topic subscriber count. if err := store.Topics.UpdateSubCnt(t.name); err != nil { logs.Warn.Println("topic update sub cnt:", err) } } usersRegisterTopic(t, false) // Report completion back to sender, if 'done' is not nil. if sd.done != nil { sd.done <- true } } func (t *Topic) runLocal(hub *Hub) { // Kills topic after a period of inactivity. t.killTimer = time.NewTimer(time.Hour) t.killTimer.Stop() // Notifies about user agent change. 'me' only uaTimer := time.NewTimer(time.Minute) var currentUA string uaTimer.Stop() // Ticker for deferred presence notifications. defrNotifTimer := time.NewTimer(time.Millisecond * 500) t.callEstablishmentTimer = time.NewTimer(time.Second) t.callEstablishmentTimer.Stop() for { select { case msg := <-t.reg: t.registerSession(msg) case msg := <-t.unreg: t.unregisterSession(msg) case msg := <-t.clientMsg: t.handleClientMsg(msg) case msg := <-t.serverMsg: t.handleServerMsg(msg) case meta := <-t.meta: t.handleMeta(meta) case upd := <-t.supd: t.handleSessionUpdate(upd, ¤tUA, uaTimer) case <-uaTimer.C: t.handleUATimerEvent(currentUA) case <-t.killTimer.C: t.handleTopicTimeout(hub, currentUA, uaTimer, defrNotifTimer) case <-t.callEstablishmentTimer.C: t.terminateCallInProgress(true) case sd := <-t.exit: t.handleTopicTermination(sd) return } } } // handleClientMsg is the top-level handler of messages received by the topic from sessions. func (t *Topic) handleClientMsg(msg *ClientComMessage) { if msg.Pub != nil { t.handlePubBroadcast(msg) } else if msg.Note != nil { t.handleNoteBroadcast(msg) } else { // TODO(gene): maybe remove this panic. logs.Err.Panic("topic: wrong client message type for broadcasting", t.name) } } // handleServerMsg is the top-level handler of messages generated at the server. func (t *Topic) handleServerMsg(msg *ServerComMessage) { // Server-generated message: {info} or {pres}. if t.isInactive() { // Ignore message - the topic is paused or being deleted. return } if msg.Pres != nil { t.handlePresence(msg) } else if msg.Info != nil { t.broadcastToSessions(msg) } else { // TODO(gene): maybe remove this panic. logs.Err.Panic("topic: wrong server message type for broadcasting", t.name) } } // Session subscribed to a topic, created == true if topic was just created and {pres} needs to be announced func (t *Topic) handleSubscription(msg *ClientComMessage) error { asUid := types.ParseUserId(msg.AsUser) authLevel := auth.Level(msg.AuthLvl) asChan, err := t.verifyChannelAccess(msg.Original) if err != nil { // User should not be able to address non-channel topic as channel. msg.sess.queueOut(ErrNotFoundReply(msg, types.TimeNow())) return err } if err := t.subscriptionReply(asChan, msg); err != nil { return err } msgsub := msg.Sub getWhat := 0 if msgsub.Get != nil { getWhat = parseMsgClientMeta(msgsub.Get.What) } if getWhat&constMsgMetaDesc != 0 { // Send get.desc as a {meta} packet. if err := t.replyGetDesc(msg.sess, asUid, asChan, msgsub.Get.Desc, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Desc failed: %v sid=%s", t.name, err, msg.sess.sid) } } if getWhat&constMsgMetaSub != 0 { // Send get.sub response as a separate {meta} packet if err := t.replyGetSub(msg.sess, asUid, authLevel, asChan, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Sub failed: %v sid=%s", t.name, err, msg.sess.sid) } } if getWhat&constMsgMetaTags != 0 { // Send get.tags response as a separate {meta} packet if err := t.replyGetTags(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Tags failed: %v sid=%s", t.name, err, msg.sess.sid) } } if getWhat&constMsgMetaCred != 0 { // Send get.tags response as a separate {meta} packet if err := t.replyGetCreds(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Cred failed: %v sid=%s", t.name, err, msg.sess.sid) } } if getWhat&constMsgMetaAux != 0 { // Send get.aux response as a separate {meta} packet if err := t.replyGetAux(msg.sess, asUid, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Aux failed: %v sid=%s", t.name, err, msg.sess.sid) } } if getWhat&constMsgMetaData != 0 { // Send get.data response as {data} packets if err := t.replyGetData(msg.sess, asUid, asChan, msgsub.Get.Data, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Data failed: %v sid=%s", t.name, err, msg.sess.sid) } } if getWhat&constMsgMetaDel != 0 { // Send get.del response as a separate {meta} packet if err := t.replyGetDel(msg.sess, asUid, msgsub.Get.Del, msg); err != nil { logs.Warn.Printf("topic[%s] handleSubscription Get.Del failed: %v sid=%s", t.name, err, msg.sess.sid) } } return nil } // handleLeaveRequest processes a session leave request. func (t *Topic) handleLeaveRequest(msg *ClientComMessage, sess *Session) { // Remove connection from topic; session may continue to function now := types.TimeNow() var asUid types.Uid var asChan bool if msg.init { asUid = types.ParseUserId(msg.AsUser) var err error asChan, err = t.verifyChannelAccess(msg.Original) if err != nil { // Group topic cannot be addressed as channel unless channel functionality is enabled. sess.queueOut(ErrNotFoundReply(msg, now)) } } if t.isInactive() { if !asUid.IsZero() && msg.init { sess.queueOut(ErrLockedReply(msg, now)) } return } // User wants to leave and unsubscribe. if msg.init && msg.Leave.Unsub { // asUid must not be Zero. if err := t.replyLeaveUnsub(sess, msg, asUid); err != nil { logs.Err.Println("failed to unsub", err, sess.sid) } return } // User wants to leave without unsubscribing. if pssd, _ := t.remSession(sess, asUid); pssd != nil { if !sess.isProxy() { sess.delSub(t.name) } if pssd.isChanSub != asChan { // Cannot address non-channel subscription as channel and vice versa. if msg.init { // Group topic cannot be addressed as channel unless channel functionality is enabled. sess.queueOut(ErrNotFoundReply(msg, now)) } return } var uid types.Uid if sess.isProxy() { // Multiplexing session, multiple UIDs. uid = asUid } else { // Simple session, single UID. uid = pssd.uid } var pud perUserData // uid may be zero when a proxy session is trying to terminate (it called unsubAll). if !uid.IsZero() { // UID not zero: one user removed. pud = t.perUser[uid] if !sess.background { pud.online-- t.perUser[uid] = pud } } else if len(pssd.muids) > 0 { // UID is zero: multiplexing session is dropped altogether. // Using new 'uid' and 'pud' variables. for _, uid := range pssd.muids { pud := t.perUser[uid] pud.online-- t.perUser[uid] = pud } } else if !sess.isCluster() { logs.Warn.Panic("cannot determine uid: leave req", msg, sess) } switch t.cat { case types.TopicCatMe: mrs := t.mostRecentSession() if mrs == nil { // Last session mrs = sess } else { // Change UA to the most recent live session and announce it. Don't block. select { case t.supd <- &sessionUpdate{userAgent: mrs.userAgent}: default: } } meUid := uid if meUid.IsZero() && len(pssd.muids) > 0 { // The entire multiplexing session is being dropped. Need to find owner's UID. // len(pssd.muids) could be zero if the session was a background session. meUid = pssd.muids[0] } if !meUid.IsZero() { // Update user's last online timestamp & user agent. Only one user can be subscribed to 'me' topic. if err := store.Users.UpdateLastSeen(meUid, mrs.userAgent, now); err != nil { logs.Warn.Println("user update last seen:", err) } } case types.TopicCatFnd: // FIXME: this does not work correctly in case of a multiplexing session. // Remove ephemeral query. t.fndRemovePublic(sess) case types.TopicCatGrp: // Subscriber is going offline in the topic: notify other subscribers who are currently online. readFilter := &presFilters{filterIn: types.ModeRead} if !uid.IsZero() { if pud.online == 0 { if asChan { // Simply delete record from perUserData delete(t.perUser, uid) } else { t.presSubsOnline("off", uid.UserId(), nilPresParams, readFilter, "") } } } else if len(pssd.muids) > 0 { for _, uid := range pssd.muids { if t.perUser[uid].online == 0 { if asChan { // delete record from perUserData delete(t.perUser, uid) } else { t.presSubsOnline("off", uid.UserId(), nilPresParams, readFilter, "") } } } } } if !uid.IsZero() { // Respond if contains an id. if msg.init { sess.queueOut(NoErrReply(msg, now)) } } } } // sessToForeground updates perUser online status accounting and fires due // deferred notifications for the provided session. func (t *Topic) sessToForeground(sess *Session) { s := sess if s.multi != nil { s = s.multi } if pssd, ok := t.sessions[s]; ok && !pssd.isChanSub { uid := pssd.uid if s.isMultiplex() { // If 's' is a multiplexing session, then sess is a proxy and it contains correct UID. // Add UID to the list of online users. uid = sess.uid pssd.muids = append(pssd.muids, uid) } // Mark user as online pud := t.perUser[uid] pud.online++ t.perUser[uid] = pud t.sendSubNotifications(uid, sess.sid, sess.userAgent) } } // Send immediate presence notification in response to a subscription. // Send push notification to the P2P counterpart. // In case of a new channel subscription subscribe user to an FCM topic. // These notifications are always sent immediately even if background is requested. func (t *Topic) sendImmediateSubNotifications(asUid types.Uid, acs *MsgAccessMode, sreg *ClientComMessage, now time.Time) { modeWant, _ := types.ParseAcs([]byte(acs.Want)) modeGiven, _ := types.ParseAcs([]byte(acs.Given)) mode := modeWant & modeGiven asChan := t.isChan && types.IsChannel(sreg.Original) if t.cat == types.TopicCatP2P { uid2 := t.p2pOtherUser(asUid) pud2 := t.perUser[uid2] mode2 := pud2.modeGiven & pud2.modeWant if pud2.deleted { mode2 = types.ModeInvalid } // Inform the other user that the topic was just created. if sreg.Sub.Created { t.presSingleUserOffline(uid2, mode2, "acs", &presParams{ dWant: pud2.modeWant.String(), dGiven: pud2.modeGiven.String(), actor: asUid.UserId(), }, "", false) } if sreg.Sub.Newsub { // Notify current user's 'me' topic to accept notifications from user2 t.presSingleUserOffline(asUid, mode, "?none+en", nilPresParams, "", false) // Initiate exchange of 'online' status with the other user. // We don't know if the current user is online in the 'me' topic, // so sending an '?unkn' status to user2. His 'me' topic // will reply with user2's status and request an actual status from user1. status := "?unkn" if mode2.IsPresencer() { // If user2 should receive notifications, enable it. status += "+en" } t.presSingleUserOffline(uid2, mode2, status, nilPresParams, "", false) // Also send a push notification to the other user. sendPush(t.pushForP2PSub(asUid, uid2, pud2.modeWant, pud2.modeGiven, now)) } } else if t.cat == types.TopicCatGrp && !asChan && sreg.Sub.Newsub { // For new group subscriptions, notify other group members. sendPush(t.pushForGroupSub(asUid, now)) } // newsub could be true only for p2p and group topics, no need to check topic category explicitly. if sreg.Sub.Newsub { // Notify creator's other sessions that the subscription (or the entire topic) was created. t.presSingleUserOffline(asUid, mode, "acs", &presParams{ dWant: acs.Want, dGiven: acs.Given, actor: asUid.UserId(), }, sreg.sess.sid, false) if asChan { t.channelSubUnsub(asUid, true) } } } // Send immediate or deferred presence notification in response to a subscription. // Not used by channels. func (t *Topic) sendSubNotifications(asUid types.Uid, sid, userAgent string) { switch t.cat { case types.TopicCatMe: // Notify user's contact that the given user is online now. if !t.isLoaded() { t.markLoaded() if err := t.loadContacts(asUid); err != nil { logs.Err.Println("topic: failed to load contacts", t.name, err.Error()) } // User online: notify users of interest without forcing response (no +en here). t.presUsersOfInterest("on", userAgent) } case types.TopicCatGrp: pud := t.perUser[asUid] if pud.isChan { // Not sendng notifications for channel readers. return } // Enable notifications for a new group topic, if appropriate. if !t.isLoaded() { t.markLoaded() status := "on" if (pud.modeGiven & pud.modeWant).IsPresencer() { status += "+en" } // Notify topic subscribers that the topic is online now. t.presSubsOffline(status, nilPresParams, nilPresFilters, nilPresFilters, "", false) } else if pud.online == 1 { // If this is the first session of the user in the topic. // Notify other online group members that the user is online now. t.presSubsOnline("on", asUid.UserId(), nilPresParams, &presFilters{filterIn: types.ModeRead}, sid) } } } // Saves a new message (defined by head, content and attachments) in the topic // in response to a client request (msg, asUid) and broadcasts it to the attached sessions. func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid, noEcho bool, attachments []string, head map[string]any, content any) error { pud, userFound := t.perUser[asUid] // Anyone is allowed to post to 'sys' topic. if t.cat != types.TopicCatSys { // If it's not 'sys' check write permission. if !(pud.modeWant & pud.modeGiven).IsWriter() { msg.sess.queueOut(ErrPermissionDenied(msg.Id, t.original(asUid), msg.Timestamp)) return types.ErrPermissionDenied } } if msg.sess != nil && msg.sess.uid != asUid { // The "sender" header contains ID of the user who sent the message on behalf of asUid. if head == nil { head = map[string]any{} } head["sender"] = msg.sess.uid.UserId() } else if head != nil { // Make sure the received Head does not include a fake "sender" header. delete(head, "sender") } markedReadBySender := false if err, unreadUpdated := store.Messages.Save( &types.Message{ ObjHeader: types.ObjHeader{CreatedAt: msg.Timestamp}, SeqId: t.lastID + 1, Topic: t.name, From: asUid.String(), Head: head, Content: content, }, attachments, (pud.modeGiven & pud.modeWant).IsReader()); err != nil { logs.Warn.Printf("topic[%s]: failed to save message: %v", t.name, err) msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp)) return err } else { markedReadBySender = unreadUpdated } t.lastID++ t.touched = msg.Timestamp if userFound { pud.readID = t.lastID pud.recvID = t.lastID t.perUser[asUid] = pud } if msg.Id != "" && msg.sess != nil { reply := NoErrAccepted(msg.Id, t.original(asUid), msg.Timestamp) reply.Ctrl.Params = map[string]any{"seq": t.lastID} msg.sess.queueOut(reply) } data := &ServerComMessage{ Data: &MsgServerData{ Topic: msg.Original, From: msg.AsUser, Timestamp: msg.Timestamp, SeqId: t.lastID, Head: head, Content: content, }, // Internal-only values. Id: msg.Id, RcptTo: msg.RcptTo, AsUser: msg.AsUser, Timestamp: msg.Timestamp, sess: msg.sess, } if noEcho { data.SkipSid = msg.sess.sid } // Message sent: notify offline 'R' subscrbers on 'me'. t.presSubsOffline("msg", &presParams{seqID: t.lastID, actor: msg.AsUser}, &presFilters{filterIn: types.ModeRead}, nilPresFilters, "", true) // Tell the plugins that a message was accepted for delivery pluginMessage(data.Data, plgActCreate) t.broadcastToSessions(data) // sendPush will update unread message count and send push notification. if pushRcpt := t.pushForData(asUid, data.Data, markedReadBySender); pushRcpt != nil { sendPush(pushRcpt) } return nil } // handlePubBroadcast fans out {pub} -> {data} messages to recipients in a master topic. // This is a NON-proxy broadcast. func (t *Topic) handlePubBroadcast(msg *ClientComMessage) { asUid := types.ParseUserId(msg.AsUser) if t.isInactive() { // Ignore broadcast - topic is paused or being deleted. msg.sess.queueOut(ErrLocked(msg.Id, t.original(asUid), msg.Timestamp)) return } if t.isReadOnly() { msg.sess.queueOut(ErrPermissionDenied(msg.Id, t.original(asUid), msg.Timestamp)) return } isCall := msg.Pub.Head != nil && msg.Pub.Head["webrtc"] != nil if isCall { if len(globals.iceServers) == 0 { msg.sess.queueOut(ErrNotImplementedReply(msg, types.TimeNow())) return } if t.cat != types.TopicCatP2P { msg.sess.queueOut(ErrPermissionDeniedReply(msg, types.TimeNow())) return } if t.currentCall != nil { msg.sess.queueOut(ErrCallBusyReply(msg, types.TimeNow())) return } } // Save to DB at master topic. var attachments []string if msg.Extra != nil && len(msg.Extra.Attachments) > 0 { attachments = msg.Extra.Attachments } if err := t.saveAndBroadcastMessage(msg, asUid, msg.Pub.NoEcho, attachments, msg.Pub.Head, msg.Pub.Content); err != nil { logs.Err.Printf("topic[%s]: failed to save messagge - %s", t.name, err) return } if isCall { t.handleCallInvite(msg, asUid) } } // handleNoteBroadcast fans out {note} -> {info} messages to recipients in a master topic. // This is a NON-proxy broadcast (at master topic). func (t *Topic) handleNoteBroadcast(msg *ClientComMessage) { if t.isInactive() { // Ignore broadcast - topic is paused or being deleted. return } if msg.Note.SeqId > t.lastID { // Drop bogus read notification return } asChan, err := t.verifyChannelAccess(msg.Original) if err != nil { // Silently drop invalid notification. return } asUid := types.ParseUserId(msg.AsUser) pud := t.perUser[asUid] mode := pud.modeGiven & pud.modeWant if pud.deleted { mode = types.ModeInvalid } switch msg.Note.What { case "kp", "kpa", "kpv": // Filter out "kp*" from users with no 'W' permission (or people without a subscription). if !mode.IsWriter() || t.isReadOnly() { return } case "read", "recv": // Filter out "read/recv" from users with no 'R' permission (or people without a subscription). if !mode.IsReader() { return } case "call": // Handle calls separately. t.handleCallEvent(msg) return } var read, recv, unread, seq int switch msg.Note.What { case "read": if msg.Note.SeqId <= pud.readID { // No need to report stale or bogus read status. return } // The number of unread messages has decreased, negative value. unread = pud.readID - msg.Note.SeqId pud.readID = msg.Note.SeqId if pud.readID > pud.recvID { pud.recvID = pud.readID } read = pud.readID seq = read case "recv": if msg.Note.SeqId <= pud.recvID { // Stale or bogus recv status. return } pud.recvID = msg.Note.SeqId if pud.readID > pud.recvID { pud.recvID = pud.readID } recv = pud.recvID seq = recv } if seq > 0 { topicName := t.name if asChan { topicName = msg.Note.Topic } upd := map[string]any{} if recv > 0 { upd["RecvSeqId"] = recv } if read > 0 { upd["ReadSeqId"] = read } if err := store.Subs.Update(topicName, asUid, upd); err != nil { logs.Warn.Printf("topic[%s]: failed to update SeqRead/Recv counter: %v", t.name, err) return } // Read/recv updated: notify user's other sessions of the change t.presPubMessageCount(asUid, mode, read, recv, msg.sess.sid) if read > 0 { // Send push notification to other user devices. sendPush(t.pushForReadRcpt(asUid, read, msg.Timestamp)) } // Update cached count of unread messages (not tracking unread messages fror channels). if !asChan { usersUpdateUnread(asUid, unread, true) } } if asChan { // No need to forward {note} to other subscribers in channels return } if seq > 0 { t.perUser[asUid] = pud } // Read/recv/kp: notify users offline in the topic on their 'me'. t.infoSubsOffline(asUid, msg.Note.What, seq, msg.sess.sid) info := &ServerComMessage{ Info: &MsgServerInfo{ Topic: msg.Original, From: msg.AsUser, What: msg.Note.What, SeqId: msg.Note.SeqId, }, RcptTo: msg.RcptTo, AsUser: msg.AsUser, Timestamp: msg.Timestamp, SkipSid: msg.sess.sid, sess: msg.sess, } t.broadcastToSessions(info) } // handlePresence fans out {pres} messages to recipients in topic. func (t *Topic) handlePresence(msg *ServerComMessage) { what := t.procPresReq(msg.Pres.Src, msg.Pres.What, msg.Pres.WantReply) if t.xoriginal != msg.Pres.Topic || what == "" { // This is just a request for status, don't forward it to sessions return } // "what" may have changed, i.e. unset or "+command" removed ("on+en" -> "on") msg.Pres.What = what t.broadcastToSessions(msg) } // broadcastToSessions writes message to attached sessions. func (t *Topic) broadcastToSessions(msg *ServerComMessage) { // List of sessions to be dropped. var dropSessions []*Session // Broadcast the message. Only {data}, {pres}, {info} are broadcastable. // {meta} and {ctrl} are sent to the session only for sess, pssd := range t.sessions { // Send all messages to multiplexing session. if !sess.isMultiplex() { if sess.sid == msg.SkipSid { continue } if msg.Pres != nil { // Skip notifying - already notified on topic. if msg.Pres.SkipTopic != "" && sess.getSub(msg.Pres.SkipTopic) != nil { continue } // Notification addressed to a single user only. if msg.Pres.SingleUser != "" && pssd.uid.UserId() != msg.Pres.SingleUser { continue } // Notification should skip a single user. if msg.Pres.ExcludeUser != "" && pssd.uid.UserId() == msg.Pres.ExcludeUser { continue } // Check presence filters if !t.passesPresenceFilters(msg.Pres, pssd.uid) { continue } } else { if msg.Info != nil { // Don't forward read receipts and key presses to channel readers and those without the R permission. // OK to forward with Src != "" because it's sent from another topic to 'me', permissions already // checked there. if msg.Info.Src == "" && (pssd.isChanSub || !t.userIsReader(pssd.uid)) { continue } // Skip notifying - already notified on topic. if msg.Info.SkipTopic != "" && sess.getSub(msg.Info.SkipTopic) != nil { continue } // Don't send key presses from one user's session to the other sessions of the same user. if msg.Info.What == "kp" && msg.Info.From == pssd.uid.UserId() { continue } } else if !t.userIsReader(pssd.uid) && !pssd.isChanSub { // Skip {data} if the user has no Read permission and not a channel reader. continue } } } else if pssd.isChanSub && types.IsChannel(sess.sid) { // If it's a chnX multiplexing session, check if there's a corresponding // grpX multiplexing session as we don't want to send the message to both. grpSid := types.ChnToGrp(sess.sid) if grpSess := globals.sessionStore.Get(grpSid); grpSess != nil && grpSess.isMultiplex() { // If grpX multiplexing session's attached to topic, skip this chnX session // (message will be routed to the topic proxy via the grpX session). if _, attached := t.sessions[grpSess]; attached { continue } } } // Make a copy of msg since messages sent to sessions differ. msgCopy := msg.copy() // Topic name may be different depending on the user to which the `sess` belongs. t.prepareBroadcastableMessage(msgCopy, pssd.uid, pssd.isChanSub) // Send message to session. if !sess.queueOut(msgCopy) { logs.Warn.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid) dropSessions = append(dropSessions, sess) } } // Drop "bad" sessions. for _, sess := range dropSessions { // The whole session is being dropped, so ClientComMessage.init is false. // keep redundant init: false so it can be searched for. t.unregisterSession(&ClientComMessage{sess: sess, init: false}) } } // subscriptionReply generates a response to a subscription request func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error { // The topic is already initialized by the Hub msgsub := msg.Sub // For newly created topics report topic creation time. var now time.Time if msgsub.Created { now = t.updated } else { now = types.TimeNow() } asUid := types.ParseUserId(msg.AsUser) if !msgsub.Newsub && (t.cat == types.TopicCatP2P || t.cat == types.TopicCatGrp) { // Check if this is a new subscription (P2P & GRP only. SLF, SYS are excluded here). pud, found := t.perUser[asUid] msgsub.Newsub = !found || pud.deleted } var private any var mode string if msgsub.Set != nil { if msgsub.Set.Sub != nil { if msgsub.Set.Sub.User != "" { msg.sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("user id must not be specified") } mode = msgsub.Set.Sub.Mode } if msgsub.Set.Desc != nil { private = msgsub.Set.Desc.Private } } var err error var modeChanged *MsgAccessMode // Create new subscription or modify an existing one. if modeChanged, err = t.thisUserSub(msg.sess, msg, asUid, asChan, mode, private); err != nil { return err } hasJoined := true if modeChanged != nil { if acs, err := types.ParseAcs([]byte(modeChanged.Mode)); err == nil { hasJoined = acs.IsJoiner() } } if hasJoined { // Subscription successfully created. Link topic to session. msg.sess.addSub(t.name, &Subscription{ broadcast: t.clientMsg, done: t.unreg, meta: t.meta, supd: t.supd, }) t.addSession(msg.sess, asUid, asChan) // The user is online in the topic. Increment the counter if notifications are not deferred. if !msg.sess.background { userData := t.perUser[asUid] userData.online++ t.perUser[asUid] = userData } if t.cat == types.TopicCatGrp && msgsub.Newsub { // Increment subscriber count for new group subscriptions only. t.subCnt++ } } params := map[string]any{} // Report back the assigned access mode. if modeChanged != nil { params["acs"] = modeChanged } toriginal := t.original(asUid) // When a group topic is created, it's given a temporary name by the client. // Then this name changes. Report back the original name here. if msgsub.Created && msg.Original != toriginal { params["tmpname"] = msg.Original // The new123ABC name is no longer useful after this. msg.Original = toriginal } if len(params) == 0 { // Don't send empty params '{}' msg.sess.queueOut(NoErr(msg.Id, toriginal, now)) } else { msg.sess.queueOut(NoErrParams(msg.Id, toriginal, now, params)) } // Some notifications are always sent immediately. if modeChanged != nil { t.sendImmediateSubNotifications(asUid, modeChanged, msg, now) } if !msg.sess.background && hasJoined { // Other notifications are also sent immediately for foreground sessions. t.sendSubNotifications(asUid, msg.sess.sid, msg.sess.userAgent) } return nil } // User requests or updates a self-subscription to a topic. Called as a // result of {sub} or {meta set=sub}. // Returns new access mode as *MsgAccessMode if user's access mode has changed, nil otherwise. // // sess - originating session // pkt - client message which triggered this request; {sub} or {set} // asUid - id of the user making the request // asChan - true if the user is subscribing to a channel topic // want - requested access mode // private - private value to assign to the subscription // background - presence notifications are deferred // // Handle these cases: // A. User is trying to subscribe for the first time (no subscription). // A.1 Normal user is subscribing to the topic. // A.2 Reader is joining the channel. // B. User is already subscribed, just joining without changing anything. // C. User is responding to an earlier invite (modeWant was "N" in subscription). // D. User is already subscribed, changing modeWant. // E. User is accepting ownership transfer (requesting ownership transfer is not permitted). // In case of a group topic the user may be a reader or a full subscriber. func (t *Topic) thisUserSub(sess *Session, pkt *ClientComMessage, asUid types.Uid, asChan bool, want string, private any) (*MsgAccessMode, error) { now := types.TimeNow() asLvl := auth.Level(pkt.AuthLvl) // Access mode values as they were before this request was processed. oldWant := types.ModeNone oldGiven := types.ModeNone // Parse access mode requested by the user modeWant := types.ModeUnset if want != "" { if err := modeWant.UnmarshalText([]byte(want)); err != nil { sess.queueOut(ErrMalformedReply(pkt, now)) return nil, err } } var err error // Check if it's an attempt at a new subscription to the topic / a first connection of a channel reader // (channel readers are not permanently cached). // It could be an actual subscription (IsJoiner() == true) or a ban (IsJoiner() == false). userData, existingSub := t.perUser[asUid] if !existingSub || userData.deleted { // New subscription or a not yet cached channel reader, either new or existing. // Check if the max number of subscriptions is already reached. if t.cat == types.TopicCatGrp && !asChan && t.subsCount() >= globals.maxSubscriberCount { sess.queueOut(ErrPolicyReply(pkt, now)) return nil, errors.New("max subscription count exceeded") } var sub *types.Subscription tname := t.name if t.cat == types.TopicCatP2P { // P2P could be here only if it was previously deleted. I.e. existingSub is always true for P2P. if modeWant != types.ModeUnset { userData.modeWant = modeWant } // If no modeWant is provided, leave existing one unchanged. // Make sure the user is not asking for unreasonable permissions userData.modeWant = (userData.modeWant & globals.typesModeCP2P) | types.ModeApprove } else if t.cat == types.TopicCatSys { if asLvl != auth.LevelRoot { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("subscription to 'sys' topic requires root access level") } // Assign default access levels userData.modeWant = types.ModeCSys userData.modeGiven = types.ModeCSys if modeWant != types.ModeUnset { userData.modeWant = (modeWant & types.ModeCSys) | types.ModeWrite | types.ModeJoin } } else if asChan { userData.isChan = true // Check if user is already subscribed. sub, err = store.Subs.Get(pkt.Original, asUid, false) if err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } // Given mode is immutable. oldGiven = types.ModeCChnReader userData.modeGiven = types.ModeCChnReader if sub != nil { // Subscription exists, read old access mode. oldWant = sub.ModeWant } else { // Subscription not found, use default. oldWant = types.ModeCChnReader } if modeWant != types.ModeUnset { // New access mode is explicitly assigned. userData.modeWant = (modeWant & types.ModeCChnReader) | types.ModeRead | types.ModeJoin } else { // Default: unchanged. userData.modeWant = oldWant } // User is subscribed to chnXXX, not grpXXX. tname = pkt.Original } else { // All other topic types. if !existingSub { // Check if the user has been subscribed previously and if so, use previous modeGiven. // Otherwise the user may delete subscription and resubscribe to avoid being blocked. sub, err = store.Subs.Get(t.name, asUid, true) if err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } if sub != nil { userData.modeGiven = sub.ModeGiven } else { // If no mode was previously given, give default access. userData.modeGiven = types.ModeUnset } } if userData.modeGiven == types.ModeUnset { // New user: default access. userData.modeGiven = t.accessFor(asLvl) } if modeWant == types.ModeUnset { // User wants default access mode. userData.modeWant = t.accessFor(asLvl) } else { userData.modeWant = modeWant } } // Reject new subscription: 'given' permissions have no 'J'. if !userData.modeGiven.IsJoiner() { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("subscription rejected due to permissions") } // Undelete. if userData.deleted { userData.deleted = false userData.delID, userData.readID, userData.recvID = 0, 0, 0 } if isNullValue(private) { private = nil } userData.private = private // Add subscription to database, if missing. if sub == nil || sub.DeletedAt != nil { sub = &types.Subscription{ User: asUid.String(), Topic: tname, ModeWant: userData.modeWant, ModeGiven: userData.modeGiven, Private: userData.private, } if err := store.Subs.Create(sub); err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } } else if asChan && userData.modeWant != oldWant { // Channel reader changed access mode, save changed mode to db. if err := store.Subs.Update(tname, asUid, map[string]any{"ModeWant": userData.modeWant}); err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } // Enable or disable fcm push notifications for the subsciption. t.channelSubUnsub(asUid, userData.modeWant.IsPresencer()) } if asChan { if userData.modeWant != oldWant { pluginSubscription(sub, plgActCreate) } else { pluginSubscription(sub, plgActUpd) } } else { // Add subscribed user to cache. usersRegisterUser(asUid, true) // Notify plugins of a new subscription pluginSubscription(sub, plgActCreate) } } else { // Process update to existing subscription. It could be an incomplete subscription for a new topic. if !userData.isChan && asChan { // A normal subscriber is trying to access topic as a channel. // Direct the subscriber to use non-channel topic name. sess.queueOut(InfoUseOtherReply(pkt, t.name, now)) return nil, types.ErrNotFound } var ownerChange bool // Save old access values oldWant = userData.modeWant oldGiven = userData.modeGiven if modeWant != types.ModeUnset { // Explicit modeWant is provided // Make sure the current owner cannot unset the owner flag or ban himself. if t.owner == asUid && (!modeWant.IsOwner() || !modeWant.IsJoiner()) { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("cannot unset ownership or self-ban the owner") } // Perform sanity checks if userData.modeGiven.IsOwner() { // Check for possible ownership transfer. Handle the following cases: // 1. Acceptance or rejection of the ownership transfer // 2. Owner changing own settings // Ownership transfer ownerChange = modeWant.IsOwner() && !userData.modeWant.IsOwner() // The owner should be able to grant himself any access permissions. if modeWant.IsOwner() && !userData.modeGiven.BetterEqual(modeWant) { userData.modeGiven |= modeWant } } else if modeWant.IsOwner() { // Ownership transfer can only be initiated by the owner. sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("non-owner cannot request ownership transfer") } else if t.cat == types.TopicCatGrp && userData.modeGiven.IsAdmin() && modeWant.IsAdmin() { // A group topic Admin should be able to grant himself any permissions except // ownership (checked previously) & hard-deleting messages. if !userData.modeGiven.BetterEqual(modeWant & ^types.ModeDelete) { userData.modeGiven |= (modeWant & ^types.ModeDelete) } } switch t.cat { case types.TopicCatP2P: // For P2P topics ignore requests exceeding the maximum allowed. Otherwise it will generate // a useless announcement. modeWant = (modeWant & globals.typesModeCP2P) | types.ModeApprove case types.TopicCatSys: // Anyone can always write to Sys topic. modeWant &= (modeWant & types.ModeCSys) | types.ModeWrite } } // If user has not requested a new access mode, provide one by default. if modeWant == types.ModeUnset { // If the user has self-banned before, un-self-ban. Otherwise do not make a change. if !oldWant.IsJoiner() { // Set permissions NO WORSE than default, but possibly better (admin or owner banned himself). userData.modeWant = userData.modeGiven | t.accessFor(asLvl) } } else if userData.modeWant != modeWant { // The user has provided a new modeWant and it' different from the one before userData.modeWant = modeWant } // Create a subscription object to notify plugins. sub := types.Subscription{ User: asUid.String(), Topic: t.name, } // Save changes to DB update := map[string]any{} if isNullValue(private) { update["Private"] = nil userData.private = nil sub.Private = private } else if private != nil { update["Private"] = private userData.private = private sub.Private = private } if userData.modeWant != oldWant { update["ModeWant"] = userData.modeWant sub.ModeWant = userData.modeWant } if userData.modeGiven != oldGiven { update["ModeGiven"] = userData.modeGiven sub.ModeGiven = userData.modeGiven } if len(update) > 0 { if err := store.Subs.Update(t.name, asUid, update); err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } pluginSubscription(&sub, plgActUpd) } // No transactions in RethinkDB, but two owners are better than none if ownerChange { oldOwnerData := t.perUser[t.owner] oldOwnerOldWant, oldOwnerOldGiven := oldOwnerData.modeWant, oldOwnerData.modeGiven oldOwnerData.modeGiven = (oldOwnerData.modeGiven & ^types.ModeOwner) oldOwnerData.modeWant = (oldOwnerData.modeWant & ^types.ModeOwner) if err := store.Subs.Update(t.name, t.owner, map[string]any{ "ModeWant": oldOwnerData.modeWant, "ModeGiven": oldOwnerData.modeGiven, }); err != nil { return nil, err } if err := store.Topics.OwnerChange(t.name, asUid); err != nil { return nil, err } t.perUser[t.owner] = oldOwnerData // Send presence notifications. t.notifySubChange(t.owner, asUid, false, oldOwnerOldWant, oldOwnerOldGiven, oldOwnerData.modeWant, oldOwnerData.modeGiven, "") t.owner = asUid } } if !asChan { // If topic is being muted, send "off" notification and disable updates. // Do it before applying the new permissions. if (oldWant & oldGiven).IsPresencer() && !(userData.modeWant & userData.modeGiven).IsPresencer() { if t.cat == types.TopicCatMe { t.presUsersOfInterest("off+dis", t.userAgent) } else { t.presSingleUserOffline(asUid, userData.modeWant&userData.modeGiven, "off+dis", nilPresParams, "", false) } } } // Apply changes. t.perUser[asUid] = userData var modeChanged *MsgAccessMode // Send presence notifications and update cached unread count. if oldWant != userData.modeWant || oldGiven != userData.modeGiven { if !asChan { oldReader := (oldWant & oldGiven).IsReader() newReader := (userData.modeWant & userData.modeGiven).IsReader() if oldReader && !newReader { // Decrement unread count usersUpdateUnread(asUid, userData.readID-t.lastID, true) } else if !oldReader && newReader { // Increment unread count usersUpdateUnread(asUid, t.lastID-userData.readID, true) } } // Notify actor of the changes in access mode. t.notifySubChange(asUid, asUid, asChan, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid) } if (pkt.Sub != nil && pkt.Sub.Newsub) || oldWant != userData.modeWant || oldGiven != userData.modeGiven { modeChanged = &MsgAccessMode{ Want: userData.modeWant.String(), Given: userData.modeGiven.String(), Mode: (userData.modeGiven & userData.modeWant).String(), } } if !userData.modeWant.IsJoiner() { // The user is self-banning from the topic. Re-subscription will unban. t.evictUser(asUid, false, "") // The callee will send NoErrOK return modeChanged, nil } if !userData.modeGiven.IsJoiner() { // User was banned sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("topic access denied; user is banned") } return modeChanged, nil } // anotherUserSub processes a request to initiate an invite or approve a subscription request from another user. // Returns changed == true if user's access mode has changed. // Handle these cases: // A. Sharer or Approver is inviting another user for the first time (no prior subscription) // B. Sharer or Approver is re-inviting another user (adjusting modeGiven, modeWant is still Unset) // C. Approver is changing modeGiven for another user, modeWant != Unset func (t *Topic) anotherUserSub(sess *Session, asUid, target types.Uid, asChan bool, pkt *ClientComMessage) (*MsgAccessMode, error) { now := types.TimeNow() set := pkt.Set // Check if approver actually has permission to manage sharing hostData, ok := t.perUser[asUid] // Access mode of the person who is executing this approval process hostMode := hostData.modeGiven & hostData.modeWant if !ok || !hostMode.IsSharer() { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("topic access denied; approver has no permission") } if asChan { // TODO: need to implement promoting reader to subscriber. Rejecting for now. sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("topic access denied: cannot subscribe reader to channel") } // Check if topic is suspended. if t.isReadOnly() { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("topic is suspended") } // Parse the access mode granted modeGiven := types.ModeUnset if set.Sub.Mode != "" { if err := modeGiven.UnmarshalText([]byte(set.Sub.Mode)); err != nil { sess.queueOut(ErrMalformedReply(pkt, now)) return nil, err } // Make sure the new permissions are reasonable in P2P topics: permissions no greater than allowed, // approver permission cannot be removed. if t.cat == types.TopicCatP2P { modeGiven = (modeGiven & globals.typesModeCP2P) | types.ModeApprove } } // Make sure only the owner & approvers can set non-default access mode if modeGiven != types.ModeUnset && !hostMode.IsAdmin() { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("sharer cannot set explicit modeGiven") } // Make sure no one but the owner can do an ownership transfer if modeGiven.IsOwner() && t.owner != asUid { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("attempt to transfer ownership by non-owner") } // Access mode values as they were before this request was processed. oldWant := types.ModeUnset oldGiven := types.ModeUnset // Check if it's a new invite. If so, save it to database as a subscription. // Saved subscription does not mean the user is allowed to post/read userData, existingSub := t.perUser[target] if !existingSub || userData.deleted { // Check if the max number of subscriptions is already reached. if t.cat == types.TopicCatGrp && t.subsCount() >= globals.maxSubscriberCount { sess.queueOut(ErrPolicyReply(pkt, now)) return nil, errors.New("max subscription count exceeded") } if modeGiven == types.ModeUnset { // Request to use default access mode for the new subscriptions. // Assuming LevelAuth. Approver should use non-default access if that is not suitable. modeGiven = t.accessFor(auth.LevelAuth) // Enable new subscription even if default is no joiner. modeGiven |= types.ModeJoin } var modeWant types.AccessMode // Check if the invitee has been subscribed previously and if so, use previous modeWant. // Otherwise the inviter may delete blocked subscription and reinvite to spam the user. sub, err := store.Subs.Get(t.name, target, true) if err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } if sub != nil { // Existing deleted subscription. modeWant = sub.ModeWant } else { // Get user's default access mode to be used as modeWant if user, err := store.Users.Get(target); err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } else if user == nil { sess.queueOut(ErrUserNotFoundReply(pkt, now)) return nil, errors.New("user not found") } else if user.State != types.StateOK { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("user is suspended") } else { // Don't ask by default for more permissions than the granted ones. modeWant = user.Access.Auth & modeGiven } } // Reject invitation: 'want' permissions have no 'J'. if !modeWant.IsJoiner() { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("invitation rejected due to permissions") } // Add subscription to database sub = &types.Subscription{ User: target.String(), Topic: t.name, ModeWant: modeWant, ModeGiven: modeGiven, } if err := store.Subs.Create(sub); err != nil { sess.queueOut(ErrUnknownReply(pkt, now)) return nil, err } userData = perUserData{ modeGiven: sub.ModeGiven, modeWant: sub.ModeWant, private: nil, } t.perUser[target] = userData t.computePerUserAcsUnion() // Cache user's record usersRegisterUser(target, true) // Notify plugins of a new subscription. pluginSubscription(sub, plgActCreate) // Send push notification for the new subscription. // TODO: maybe skip user's devices which were online when this event has happened. sendPush(t.pushForP2PSub(asUid, target, userData.modeWant, userData.modeGiven, now)) } else { // Action on an existing subscription: re-invite, change existing permission, confirm/decline request. oldGiven = userData.modeGiven oldWant = userData.modeWant if modeGiven == types.ModeUnset { // Request to re-send invite without changing the access mode modeGiven = userData.modeGiven } else if modeGiven != userData.modeGiven { // Changing the previously assigned value. // Cannot strip owner of ownership or ban the owner. if t.owner == target && (!modeGiven.IsOwner() || !modeGiven.IsJoiner()) { sess.queueOut(ErrPermissionDeniedReply(pkt, now)) return nil, errors.New("cannot stip ownership or ban the owner") } // Save changed value to database if err := store.Subs.Update(t.name, target, map[string]any{"ModeGiven": modeGiven}); err != nil { return nil, err } userData.modeGiven = modeGiven t.perUser[target] = userData } } var modeChanged *MsgAccessMode // Access mode has changed. if oldGiven != userData.modeGiven { oldReader := (oldWant & oldGiven).IsReader() newReader := (userData.modeWant & userData.modeGiven).IsReader() if oldReader && !newReader { // Decrement unread count usersUpdateUnread(target, userData.readID-t.lastID, true) } else if !oldReader && newReader { // Increment unread count usersUpdateUnread(target, t.lastID-userData.readID, true) } t.notifySubChange(target, asUid, false, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid) modeChanged = &MsgAccessMode{ Given: userData.modeGiven.String(), Want: userData.modeWant.String(), Mode: (userData.modeGiven & userData.modeWant).String(), } } if !userData.modeGiven.IsJoiner() { // The user is banned from the topic. t.evictUser(target, false, "") } return modeChanged, nil } // replyGetDesc is a response to a get.desc request on a topic, sent to just the session as a {meta} packet func (t *Topic) replyGetDesc(sess *Session, asUid types.Uid, _ bool, opts *MsgGetOpts, msg *ClientComMessage) error { now := types.TimeNow() id := msg.Id if opts != nil && (opts.User != "" || opts.Limit != 0) { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("invalid GetDesc query") } // Check if user requested modified data ifUpdated := opts == nil || opts.IfModifiedSince == nil || opts.IfModifiedSince.Before(t.updated) desc := &MsgTopicDesc{} if opts == nil || opts.IfModifiedSince == nil { // Send CreatedAt only when the user requests full information (nothing is cached at the client). desc.CreatedAt = &t.created } if !t.updated.IsZero() { desc.UpdatedAt = &t.updated } pud, full := t.perUser[asUid] full = full || t.cat == types.TopicCatMe if t.cat == types.TopicCatGrp { desc.IsChan = t.isChan desc.SubCnt = t.subCnt logs.Info.Println("replyGetDesc: grp topic", t.name, "subs", t.subCnt) } if ifUpdated { if t.public != nil || t.trusted != nil { // Not a p2p topic. desc.Public = t.public desc.Trusted = t.trusted } else if full && t.cat == types.TopicCatP2P { // FIXME: when a P2P participant updates desc at 'me', these cached values are not updated. desc.Public = pud.public desc.Trusted = pud.trusted } } // Request may come from a subscriber (full == true) or a stranger. // Give subscriber a fuller description than to a stranger/channel reader. if full { if t.cat == types.TopicCatP2P { // For p2p topics default access mode makes no sense: only participants have access to topic. // Don't report it. } else if t.cat == types.TopicCatMe || (pud.modeGiven & pud.modeWant).IsSharer() { desc.DefaultAcs = &MsgDefaultAcsMode{ Auth: t.accessAuth.String(), Anon: t.accessAnon.String(), } } desc.Acs = &MsgAccessMode{ Want: pud.modeWant.String(), Given: pud.modeGiven.String(), Mode: (pud.modeGiven & pud.modeWant).String(), } if t.cat == types.TopicCatMe && sess.authLvl == auth.LevelRoot { // If 'me' is in memory then user account is invariably not suspended. desc.State = types.StateOK.String() } if (pud.modeGiven & pud.modeWant).IsPresencer() { switch t.cat { case types.TopicCatGrp: desc.Online = t.isOnline() case types.TopicCatP2P: // This is the timestamp when the other user logged off last time. // It does not change while the topic is loaded into memory and that's OK most of the time // because to stay in memory at least one of the users must be connected to topic. // FIXME(gene): it breaks when user A stays active in one session and connects-disconnects // from another session. The second session will not see correct LastSeen time and UserAgent. if pud.lastSeen != nil { desc.LastSeen = &MsgLastSeenInfo{ When: pud.lastSeen, UserAgent: pud.lastUA, } } } } if ifUpdated { desc.Private = pud.private } // Don't report message IDs to users without Read access. if (pud.modeGiven & pud.modeWant).IsReader() { desc.SeqId = t.lastID if !t.touched.IsZero() { desc.TouchedAt = &t.touched } // Make sure reported values are sane: // t.delID <= pud.delID; t.readID <= t.recvID <= t.lastID desc.DelId = max(pud.delID, t.delID) desc.ReadSeqId = pud.readID desc.RecvSeqId = max(pud.recvID, pud.readID) } else { // Send some sane value of touched. desc.TouchedAt = &t.updated } } sess.queueOut(&ServerComMessage{ Meta: &MsgServerMeta{ Id: id, Topic: msg.Original, Desc: desc, Timestamp: &now, }, }) return nil } // replySetDesc updates topic metadata, saves it to DB, replies to the caller as {ctrl} message, // generates {pres} update if necessary. func (t *Topic) replySetDesc(sess *Session, asUid types.Uid, asChan bool, authLevel auth.Level, msg *ClientComMessage) error { now := types.TimeNow() assignAccess := func(upd map[string]any, mode *MsgDefaultAcsMode) error { if mode == nil { return nil } if auth, anon, err := parseTopicAccess(mode, types.ModeUnset, types.ModeUnset); err != nil { return err } else if auth.IsOwner() || anon.IsOwner() { return errors.New("default 'owner' access is not permitted") } else { access := types.DefaultAccess{Auth: t.accessAuth, Anon: t.accessAnon} if auth != types.ModeUnset { if t.cat == types.TopicCatMe { auth &= types.ModeCAuth if auth != types.ModeNone { // This is the default access mode for P2P topics. // It must be either an N or must include an A permission. auth |= types.ModeApprove } } access.Auth = auth } if anon != types.ModeUnset { if t.cat == types.TopicCatMe { anon &= globals.typesModeCP2P if anon != types.ModeNone { anon |= types.ModeApprove } } access.Anon = anon } if access.Auth != t.accessAuth || access.Anon != t.accessAnon { upd["Access"] = access } } return nil } assignGenericValues := func(upd map[string]any, what string, dst, src any) (changed bool) { if dst, changed = mergeInterfaces(dst, src); changed { upd[what] = dst } return } // DefaultAccess and/or Public have chanegd var sendCommon bool // Private has changed var sendPriv bool var err error // Change to the main object (user or topic). core := make(map[string]any) // Change to subscription. sub := make(map[string]any) if set := msg.Set; set.Desc != nil { if set.Desc.Trusted != nil && authLevel != auth.LevelRoot { // Only ROOT can change Trusted. sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("attempt to change Trusted by non-root") } switch t.cat { case types.TopicCatMe: // Update current user err = assignAccess(core, set.Desc.DefaultAcs) sendCommon = assignGenericValues(core, "Public", t.public, set.Desc.Public) sendCommon = assignGenericValues(core, "Trusted", t.trusted, set.Desc.Trusted) || sendCommon case types.TopicCatFnd: // set.Desc.DefaultAcs is ignored. if set.Desc.Trusted != nil { // 'fnd' does not support Trusted. sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("attempt to assign Trusted in fnd topic") } // Do not send presence if fnd.Public has changed. assignGenericValues(core, "Public", t.fndGetPublic(sess), set.Desc.Public) case types.TopicCatP2P: // Reject direct changes to P2P topics. if set.Desc.Public != nil || set.Desc.Trusted != nil || set.Desc.DefaultAcs != nil { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("incorrect attempt to change metadata of a p2p topic") } case types.TopicCatGrp: // Update group topic if t.owner == asUid { err = assignAccess(core, set.Desc.DefaultAcs) sendCommon = assignGenericValues(core, "Public", t.public, set.Desc.Public) sendCommon = assignGenericValues(core, "Trusted", t.trusted, set.Desc.Trusted) || sendCommon } else if set.Desc.DefaultAcs != nil || set.Desc.Public != nil || set.Desc.Trusted != nil { // This is a request from non-owner sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("attempt to change public or permissions by non-owner") } } if err != nil { sess.queueOut(ErrMalformedReply(msg, now)) return err } sendPriv = assignGenericValues(sub, "Private", t.perUser[asUid].private, set.Desc.Private) } if len(core)+len(sub) == 0 { sess.queueOut(InfoNotModifiedReply(msg, now)) return errors.New("{set} generated no update to DB") } if len(core) > 0 { core["UpdatedAt"] = now switch t.cat { case types.TopicCatMe: err = store.Users.Update(asUid, core) case types.TopicCatFnd: // The only value to be stored in topic is Public, and Public for fnd is not saved according to specs. default: err = store.Topics.Update(t.name, core) } } if err == nil && len(sub) > 0 { tname := t.name if asChan { tname = types.GrpToChn(tname) } err = store.Subs.Update(tname, asUid, sub) } if err != nil { sess.queueOut(ErrUnknownReply(msg, now)) return err } if len(core) > 0 && msg.Extra != nil && len(msg.Extra.Attachments) > 0 { if err := store.Files.LinkAttachments(t.name, types.ZeroUid, msg.Extra.Attachments); err != nil { logs.Warn.Printf("topic[%s] failed to link avatar attachment: %v", t.name, err) // This is not a critical error, continue execution. } } // Update values cached in the topic object switch t.cat { case types.TopicCatMe, types.TopicCatGrp: if tmp, ok := core["Access"]; ok { access := tmp.(types.DefaultAccess) t.accessAuth = access.Auth t.accessAnon = access.Anon } if public, ok := core["Public"]; ok { t.public = public } if trusted, ok := core["Trusted"]; ok { t.trusted = trusted } case types.TopicCatFnd: // Assign per-session fnd.Public. t.fndSetPublic(sess, core["Public"]) } pud := t.perUser[asUid] mode := pud.modeGiven & pud.modeWant if private, ok := sub["Private"]; ok { pud.private = private t.perUser[asUid] = pud } if sendCommon || sendPriv { // t.public/t.trusted, t.accessAuth/Anon have changed, make an announcement if sendCommon { if t.cat == types.TopicCatMe { t.presUsersOfInterest("upd", "") } else { // Notify all subscribers on 'me' except the user who made the change and blocked users. // The user who made the change will be notified separately (see below). filter := &presFilters{excludeUser: asUid.UserId(), filterIn: types.ModeJoin} t.presSubsOffline("upd", nilPresParams, filter, filter, sess.sid, false) } t.updated = now } // Notify user's other sessions. t.presSingleUserOffline(asUid, mode, "upd", nilPresParams, sess.sid, false) } sess.queueOut(NoErrReply(msg, now)) return nil } // replyGetSub is a response to a get.sub request on a topic - load a list of subscriptions/subscribers, // send it just to the session as a {meta} packet func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level, asChan bool, msg *ClientComMessage) error { now := types.TimeNow() id := msg.Id incomingReqTs := msg.Timestamp var req *MsgGetOpts if msg.Sub != nil { req = msg.Sub.Get.Sub } else { req = msg.Get.Sub } if req != nil && (req.SinceId != 0 || req.BeforeId != 0) { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("invalid MsgGetOpts query") } var err error var ifModified time.Time if req != nil && req.IfModifiedSince != nil { ifModified = *req.IfModifiedSince } userData := t.perUser[asUid] var subs []types.Subscription switch t.cat { case types.TopicCatMe: if req != nil { // If topic is provided, it could be in the form of user ID 'usrAbCd'. // Convert it to P2P topic name. Likewise for Self topic 'slf' -> 'slfAbcD'. if uid2 := types.ParseUserId(req.Topic); !uid2.IsZero() { req.Topic = uid2.P2PName(asUid) } if req.Topic == "slf" { req.Topic = asUid.SlfName() } } // Fetch user's subscriptions, with Topic.Public+Topic.Trusted denormalized into subscription. if ifModified.IsZero() { // No cache management. Skip deleted subscriptions. subs, err = store.Users.GetTopics(asUid, msgOpts2storeOpts(req)) } else { // User manages cache. Include deleted subscriptions too. subs, err = store.Users.GetTopicsAny(asUid, msgOpts2storeOpts(req)) // Returned subscriptions do not contain topics which are online now but otherwise unchanged. // We need to add these topic to the list otherwise the user would see them as offline. selected := map[string]struct{}{} for i := range subs { sub := &subs[i] with := sub.GetWith() if with != "" { selected[with] = struct{}{} } else { selected[sub.Topic] = struct{}{} } } // Add dummy subscriptions for missing online topics. for topic, psd := range t.perSubs { _, present := selected[topic] if !present && psd.online { sub := types.Subscription{Topic: topic} sub.SetWith(topic) sub.SetDummy(true) subs = append(subs, sub) } } } case types.TopicCatFnd: // Select public or private query. Public is set interactively and has priority. query := t.fndGetPublic(sess) if query == "" { query, _ = userData.private.(string) } // Empty queries are ignored with "NoContent". if query != "" { query, subs, err = pluginFind(asUid, query) if err == nil && subs == nil && query != "" { if and, opt, err := parseSearchQuery(query); err == nil { var req [][]string for _, tag := range and { rewritten := rewriteTag(tag, sess.countryCode) if len(rewritten) > 0 { req = append(req, rewritten) } } opt = rewriteTagSlice(opt, sess.countryCode) // Check if the query contains terms that the user is not allowed to use. if restr, _, _ := stringSliceDelta(t.tags, filterTags(append(types.FlattenDoubleSlice(req), opt...), globals.maskedTagNS)); len(restr) > 0 { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("attempt to search by restricted tags") } // Ordinary users: find only active topics and accounts. // Root users: find all topics and accounts, including suspended and soft-deleted. subs, err = store.Users.FindSubs(asUid, globals.aliasTagNS, req, opt, sess.authLvl != auth.LevelRoot) if err != nil { sess.queueOut(decodeStoreErrorExplicitTs(err, id, msg.Original, now, incomingReqTs, nil)) return err } } } } case types.TopicCatP2P: // TODO(gene): don't load subs from DB, use perUserData - it already contains subscriptions. // No need to load Public for p2p topics. if ifModified.IsZero() { // No cache management. Skip deleted subscriptions. subs, err = store.Topics.GetSubs(t.name, msgOpts2storeOpts(req)) } else { // User manages cache. Include deleted subscriptions too. subs, err = store.Topics.GetSubsAny(t.name, msgOpts2storeOpts(req)) } case types.TopicCatGrp: topicName := t.name if asChan { // In case of a channel allow fetching the subscription of the current user only. if req == nil { req = &MsgGetOpts{} } req.User = asUid.UserId() // Channel subscribers are using chnXXX topic name rather than grpXXX. topicName = msg.Original } // Include sub.Public. if ifModified.IsZero() { // No cache management. Skip deleted subscriptions. subs, err = store.Topics.GetUsers(topicName, msgOpts2storeOpts(req)) } else { // User manages cache. Include deleted subscriptions too. subs, err = store.Topics.GetUsersAny(topicName, msgOpts2storeOpts(req)) } // Do nothing for all other topic types, like 'sys', 'slf'. } if err != nil { sess.queueOut(decodeStoreErrorExplicitTs(err, id, msg.Original, now, incomingReqTs, nil)) return err } if len(subs) == 0 { // Inform the client that there are no subscriptions. sess.queueOut(NoContentParamsReply(msg, now, map[string]any{"what": "sub"})) return nil } meta := &MsgServerMeta{ Id: id, Topic: msg.Original, Sub: make([]MsgTopicSub, 0, len(subs)), Timestamp: &now} presencer := (userData.modeGiven & userData.modeWant).IsPresencer() sharer := (userData.modeGiven & userData.modeWant).IsSharer() for i := range subs { sub := &subs[i] // Indicator if the requester has provided a cut off date for ts of pub & priv updates. var sendPubPriv bool var banned bool var mts MsgTopicSub deleted := sub.DeletedAt != nil if ifModified.IsZero() { sendPubPriv = true } else { // Skip sending deleted subscriptions if they were deleted before the cut off date. // If they are freshly deleted send minimum info if deleted { if !sub.DeletedAt.After(ifModified) { continue } mts.DeletedAt = sub.DeletedAt } sendPubPriv = !deleted && sub.UpdatedAt.After(ifModified) } uid := types.ParseUid(sub.User) subMode := sub.ModeGiven & sub.ModeWant isReader := subMode.IsReader() if t.cat == types.TopicCatMe { // Mark subscriptions that the user does not care about. if !subMode.IsJoiner() { banned = true } // Reporting user's subscriptions to other topics. P2P topic name is the // UID of the other user. with := sub.GetWith() if with != "" { mts.Topic = with mts.Online = t.perSubs[with].online && !deleted && presencer } else if strings.HasPrefix(sub.Topic, "slf") { mts.Topic = "slf" // Not reporting Online as it makes no sense for slf. } else { mts.Topic = sub.Topic mts.Online = t.perSubs[sub.Topic].online && !deleted && presencer } if !deleted && !banned { if isReader { touchedAt := sub.GetTouchedAt() if touchedAt.IsZero() { mts.TouchedAt = nil } else { mts.TouchedAt = &touchedAt } mts.SeqId = sub.GetSeqId() mts.DelId = sub.DelId } else if !sub.UpdatedAt.IsZero() { mts.TouchedAt = &sub.UpdatedAt } lastSeen := sub.GetLastSeen() if lastSeen != nil && !mts.Online { mts.LastSeen = &MsgLastSeenInfo{ When: lastSeen, UserAgent: sub.GetUserAgent(), } } mts.SubCnt = sub.GetSubCnt() } } else { // Mark subscriptions that the user does not care about. if t.cat == types.TopicCatGrp && !subMode.IsJoiner() { banned = true } // Reporting subscribers to fnd, a group or a p2p topic mts.User = uid.UserId() if t.cat == types.TopicCatFnd { mts.Topic = sub.Topic } if !deleted { if uid == asUid && isReader && !banned { // Report deleted ID for own subscriptions only mts.DelId = sub.DelId } if t.cat == types.TopicCatGrp { pud := t.perUser[uid] mts.Online = pud.online > 0 && presencer } } } if !deleted { if !sub.UpdatedAt.IsZero() { mts.UpdatedAt = &sub.UpdatedAt } if isReader && !banned { mts.ReadSeqId = sub.ReadSeqId mts.RecvSeqId = sub.RecvSeqId } if t.cat != types.TopicCatFnd { // p2p and grp if !sub.IsDummy() && (sharer || uid == asUid || subMode.IsAdmin()) { // If user is not a sharer, the access mode of other ordinary users if not accessible. // Own and admin permissions only are visible to non-sharers. mts.Acs.Mode = subMode.String() mts.Acs.Want = sub.ModeWant.String() mts.Acs.Given = sub.ModeGiven.String() } } else { // Topic 'fnd' // sub.ModeXXX may be defined by the plugin. if sub.ModeGiven.IsDefined() && sub.ModeWant.IsDefined() { mts.Acs.Mode = subMode.String() mts.Acs.Want = sub.ModeWant.String() mts.Acs.Given = sub.ModeGiven.String() } else if types.IsChannel(sub.Topic) { mts.Acs.Mode = types.ModeCChnReader.String() } else if defacs := sub.GetDefaultAccess(); defacs != nil { switch authLevel { case auth.LevelAnon: mts.Acs.Mode = defacs.Anon.String() case auth.LevelAuth, auth.LevelRoot: mts.Acs.Mode = defacs.Auth.String() } } mts.SubCnt = sub.GetSubCnt() } // Returning public and private only if they have changed since ifModified if sendPubPriv { // 'sub' has nil 'public'/'trusted' in P2P topics which is OK. mts.Public = sub.GetPublic() mts.Trusted = sub.GetTrusted() // Reporting 'private' only if it's user's own subscription. if uid == asUid { mts.Private = sub.Private } } // Always reporting 'private' for fnd topic. if t.cat == types.TopicCatFnd { mts.Private = sub.Private } } meta.Sub = append(meta.Sub, mts) } sess.queueOut(&ServerComMessage{Meta: meta}) return nil } // replySetSub is a response to new subscription request or an update to a subscription {set.sub}: // update topic metadata cache, save/update subs, reply to the caller as {ctrl} message, // generate a presence notification, if appropriate. func (t *Topic) replySetSub(sess *Session, pkt *ClientComMessage, asChan bool) error { now := types.TimeNow() asUid := types.ParseUserId(pkt.AsUser) set := pkt.Set var target types.Uid if target = types.ParseUserId(set.Sub.User); target.IsZero() && set.Sub.User != "" { // Invalid user ID sess.queueOut(ErrMalformedReply(pkt, now)) return errors.New("invalid user id") } // if set.User is not set, request is for the current user if target.IsZero() { target = asUid } var err error var modeChanged *MsgAccessMode if target == asUid { // Request new subscription or modify own subscription modeChanged, err = t.thisUserSub(sess, pkt, asUid, asChan, set.Sub.Mode, nil) } else { // Request to approve/change someone's subscription modeChanged, err = t.anotherUserSub(sess, asUid, target, asChan, pkt) } if err != nil { return err } var resp *ServerComMessage if modeChanged != nil { // Report resulting access mode. params := map[string]any{"acs": modeChanged} if target != asUid { params["user"] = target.UserId() } resp = NoErrParamsReply(pkt, now, params) } else { resp = InfoNotModifiedReply(pkt, now) } sess.queueOut(resp) return nil } // replyGetData is a response to a get.data request - load a list of stored messages, send them to session as {data} // response goes to a single session rather than all sessions in a topic func (t *Topic) replyGetData(sess *Session, asUid types.Uid, asChan bool, req *MsgGetOpts, msg *ClientComMessage) error { now := types.TimeNow() toriginal := t.original(asUid) if req != nil && (req.IfModifiedSince != nil || req.User != "" || req.Topic != "") { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("invalid MsgGetOpts query") } // Check if the user has permission to read the topic data count := 0 if userData := t.perUser[asUid]; (userData.modeGiven & userData.modeWant).IsReader() { // Read messages from DB messages, err := store.Messages.GetAll(t.name, asUid, msgOpts2storeOpts(req)) if err != nil { sess.queueOut(ErrUnknownReply(msg, now)) return err } // Push the list of messages to the client as {data}. if messages != nil { count = len(messages) if count > 0 { outgoingMessages := make([]*ServerComMessage, count) for i := range messages { mm := &messages[i] from := "" if !asChan { // Don't show sender for channel readers from = types.ParseUid(mm.From).UserId() } outgoingMessages[i] = &ServerComMessage{ Data: &MsgServerData{ Topic: toriginal, Head: mm.Head, SeqId: mm.SeqId, From: from, Timestamp: mm.CreatedAt, Content: mm.Content, }, } } sess.queueOutBatch(outgoingMessages) } } } else { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("attempt to get messages by non-reader") } // Inform the requester that all the data has been served. if count == 0 { sess.queueOut(NoContentParamsReply(msg, now, map[string]any{"what": "data"})) } else { sess.queueOut(NoErrDeliveredParams(msg.Id, msg.Original, now, map[string]any{"what": "data", "count": count})) } return nil } // replyGetTags returns topics' tags - tokens used for discovery. func (t *Topic) replyGetTags(sess *Session, asUid types.Uid, msg *ClientComMessage) error { now := types.TimeNow() if t.cat == types.TopicCatFnd { // Fnd: checking for alias availability. // Checking public (session) data only. if tag := t.fndGetPublic(sess); tag != "" { var found string tag, subs, err := pluginFind(asUid, tag) if err == nil { if subs == nil { if prefix, _ := validateTag(tag); prefix != "" { // Check only if a fully-qualified tag was sent. Otherwise ignore the request. found, err = store.Users.FindOne(tag) } } else { // The plugin returned a list of topics. Send the first one. found = subs[0].Topic } } if err != nil { sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil)) return err } if found != "" { sess.queueOut(&ServerComMessage{ Meta: &MsgServerMeta{ Id: msg.Id, Topic: msg.Original, Timestamp: &now, Tags: []string{found}, }, }) return nil } } // Inform the requester that there are no tags. sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "tags"})) return nil } if t.cat != types.TopicCatMe && t.cat != types.TopicCatGrp { sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("invalid topic category for getting tags") } if t.cat == types.TopicCatGrp && t.owner != asUid { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("request for tags from non-owner") } if len(t.tags) > 0 { sess.queueOut(&ServerComMessage{ Meta: &MsgServerMeta{ Id: msg.Id, Topic: t.original(asUid), Timestamp: &now, Tags: t.tags, }, }) return nil } // Inform the requester that there are no tags. sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "tags"})) return nil } // replySetTags updates topic's tags - tokens used for discovery. func (t *Topic) replySetTags(sess *Session, asUid types.Uid, msg *ClientComMessage) error { now := types.TimeNow() if t.cat != types.TopicCatMe && t.cat != types.TopicCatGrp { sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("invalid topic category to assign tags") } if t.cat == types.TopicCatGrp && t.owner != asUid { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("tags update by non-owner") } tags := normalizeTags(msg.Set.Tags, globals.maxTagCount) if len(tags) == 0 { sess.queueOut(InfoNotModifiedReply(msg, now)) return nil } if !restrictedTagsEqual(t.tags, tags, globals.immutableTagNS) { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("attempt to mutate restricted tags") } if hasDuplicateNamespaceTags(tags, globals.aliasTagNS) { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("duplicate unique tags") } added, removed, _ := stringSliceDelta(t.tags, tags) if t.cat == types.TopicCatMe && len(added) > 0 { // User tags must all be prefixed. Users are not rearchable by generic tags. var prefixed []string for _, tag := range added { if prefix, _ := validateTag(tag); prefix != "" { prefixed = append(prefixed, prefix) } } added = prefixed } if len(added) == 0 && len(removed) == 0 { sess.queueOut(InfoNotModifiedReply(msg, now)) return nil } // Remove unprefixed tags if unique := filterTags(added, map[string]bool{globals.aliasTagNS: true}); len(unique) > 0 { // Check for global uniqueness. // It's not inside a transaction, so a race may happen. for _, tag := range unique { result, err := store.Users.FindOne(tag) if err != nil { sess.queueOut(ErrUnknownReply(msg, now)) return err } if result != "" { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("globally duplicate unique tags") } } } update := map[string]any{"Tags": tags, "UpdatedAt": now} var err error switch t.cat { case types.TopicCatMe: err = store.Users.Update(asUid, update) case types.TopicCatGrp: err = store.Topics.Update(t.name, update) } if err != nil { sess.queueOut(ErrUnknownReply(msg, now)) return err } t.tags = tags t.presSubsOnline("tags", "", nilPresParams, &presFilters{singleUser: asUid.UserId()}, sess.sid) params := make(map[string]any) if len(added) > 0 { params["added"] = len(added) } if len(removed) > 0 { params["removed"] = len(removed) } sess.queueOut(NoErrParamsReply(msg, now, params)) return nil } // replyGetCreds returns user's credentials such as email and phone numbers. func (t *Topic) replyGetCreds(sess *Session, asUid types.Uid, msg *ClientComMessage) error { now := types.TimeNow() id := msg.Id if t.cat != types.TopicCatMe { sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("invalid topic category for getting credentials") } screds, err := store.Users.GetAllCreds(asUid, "", false) if err != nil { sess.queueOut(decodeStoreErrorExplicitTs(err, id, msg.Original, now, msg.Timestamp, nil)) return err } if len(screds) > 0 { creds := make([]*MsgCredServer, len(screds)) for i, sc := range screds { creds[i] = &MsgCredServer{Method: sc.Method, Value: sc.Value, Done: sc.Done} } sess.queueOut(&ServerComMessage{ Meta: &MsgServerMeta{ Id: id, Topic: t.original(asUid), Timestamp: &now, Cred: creds, }, }) return nil } // Inform the requester that there are no credentials. sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "creds"})) return nil } // replySetCred adds or validates user credentials such as email and phone numbers. func (t *Topic) replySetCred(sess *Session, asUid types.Uid, authLevel auth.Level, msg *ClientComMessage) error { now := types.TimeNow() set := msg.Set if t.cat != types.TopicCatMe { sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("invalid topic category for updating credentials") } var err error var tags []string creds := []MsgCredClient{*set.Cred} if set.Cred.Response != "" { // Credential is being validated. Return an arror if response is invalid. _, tags, err = validatedCreds(asUid, authLevel, creds, true) } else { // Credential is being added or updated. tmpToken, _, _ := store.Store.GetLogicalAuthHandler("token").GenSecret(&auth.Rec{ Uid: asUid, AuthLevel: auth.LevelNone, Lifetime: auth.Duration(time.Hour * 24), Features: auth.FeatureNoLogin, }) _, tags, err = addCreds(asUid, creds, nil, sess.lang, tmpToken) } if tags != nil { t.tags = tags t.presSubsOnline("tags", "", nilPresParams, nilPresFilters, "") } sess.queueOut(decodeStoreErrorExplicitTs(err, set.Id, t.original(asUid), now, msg.Timestamp, nil)) return err } // replyGetAux returns topic's auxiliary set of key-value pairs. func (t *Topic) replyGetAux(sess *Session, asUid types.Uid, msg *ClientComMessage) error { now := types.TimeNow() if t.cat != types.TopicCatP2P && t.cat != types.TopicCatGrp && t.cat != types.TopicCatSlf { sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("invalid topic category to query aux") } if len(t.aux) > 0 { sess.queueOut(&ServerComMessage{ Meta: &MsgServerMeta{ Id: msg.Id, Topic: t.original(asUid), Timestamp: &now, Aux: t.aux, }, }) return nil } // Inform the requester that there are no tags. sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "aux"})) return nil } // replyGetAux returns topic's auxiliary set of key-value pairs. func (t *Topic) replySetAux(sess *Session, asUid types.Uid, msg *ClientComMessage) error { now := types.TimeNow() if t.cat != types.TopicCatP2P && t.cat != types.TopicCatGrp && t.cat != types.TopicCatSlf { sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("invalid topic category to assign aux") } if userData := t.perUser[asUid]; !(userData.modeGiven & userData.modeWant).IsAdmin() { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("aux update by non-admin") } if aux, changed := mergeMaps(copyMap(t.aux), msg.Set.Aux); changed { err := store.Topics.Update(t.name, map[string]any{"Aux": aux, "UpdatedAt": now}) if err == nil { t.aux = aux t.presSubsOnline("aux", "", nilPresParams, nilPresFilters, sess.sid) } sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Set.Id, t.original(asUid), now, msg.Timestamp, nil)) return err } sess.queueOut(InfoNotModifiedReply(msg, now)) return nil } // replyGetDel is a response to a get[what=del] request: load a list of deleted message ids, send them to // a session as {meta} // response goes to a single session rather than all sessions in a topic func (t *Topic) replyGetDel(sess *Session, asUid types.Uid, req *MsgGetOpts, msg *ClientComMessage) error { now := types.TimeNow() toriginal := t.original(asUid) id := msg.Id incomingReqTs := msg.Timestamp if req != nil && (req.IfModifiedSince != nil || req.User != "" || req.Topic != "") { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("invalid MsgGetOpts query") } // Check if the user has permission to read the topic data and the request is valid. if userData := t.perUser[asUid]; (userData.modeGiven & userData.modeWant).IsReader() { ranges, delID, err := store.Messages.GetDeleted(t.name, asUid, msgOpts2storeOpts(req)) if err != nil { sess.queueOut(ErrUnknownReply(msg, now)) return err } if len(ranges) > 0 { sess.queueOut(&ServerComMessage{ Meta: &MsgServerMeta{ Id: id, Topic: toriginal, Del: &MsgDelValues{ DelId: delID, DelSeq: rangeDeserialize(ranges), }, Timestamp: &now, }, }) return nil } } sess.queueOut(NoContentParams(id, toriginal, now, incomingReqTs, map[string]string{"what": "del"})) return nil } // replyDelMsg deletes (soft or hard) messages in response to del.msg packet. func (t *Topic) replyDelMsg(sess *Session, asUid types.Uid, asChan bool, msg *ClientComMessage) error { now := types.TimeNow() if asChan { // Do not allow channel readers delete messages. sess.queueOut(ErrOperationNotAllowedReply(msg, now)) return errors.New("channel readers cannot delete messages") } del := msg.Del pud := t.perUser[asUid] if !(pud.modeGiven & pud.modeWant).IsDeleter() { // User must have an R permission: if the user cannot read messages, he has // no business of deleting them. if !(pud.modeGiven & pud.modeWant).IsReader() { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("del.msg: permission denied") } // User has just the R permission, cannot hard-delete messages, silently // switching to soft-deleting del.Hard = false } var err error var ranges []types.Range if len(del.DelSeq) == 0 { err = errors.New("del.msg: no IDs to delete") } else { count := 0 for _, dq := range del.DelSeq { if dq.LowId > t.lastID || dq.LowId < 0 || dq.HiId < 0 || (dq.HiId > 0 && dq.LowId > dq.HiId) || (dq.LowId == 0 && dq.HiId == 0) { err = errors.New("del.msg: invalid entry in list") break } if dq.HiId > t.lastID { // Range is inclusive - exclusive [low, hi), // to delete all messages hi must be lastId + 1 dq.HiId = t.lastID + 1 } else if dq.LowId == dq.HiId || dq.LowId+1 == dq.HiId { dq.HiId = 0 } if dq.HiId == 0 { count++ } else { count += dq.HiId - dq.LowId } ranges = append(ranges, types.Range{Low: dq.LowId, Hi: dq.HiId}) } if err == nil { // Sort by Low ascending then by Hi descending. sort.Sort(types.RangeSorter(ranges)) // Collapse overlapping ranges ranges = types.RangeSorter(ranges).Normalize() } if count > defaultMaxDeleteCount && len(ranges) > 1 { err = errors.New("del.msg: too many messages to delete") } } if err != nil { sess.queueOut(ErrMalformedReply(msg, now)) return err } forUser := asUid var age time.Duration if del.Hard { forUser = types.ZeroUid age = globals.msgDeleteAge } if err = store.Messages.DeleteList(t.name, t.delID+1, forUser, age, ranges); err != nil { sess.queueOut(ErrUnknownReply(msg, now)) return err } // Increment Delete transaction ID t.delID++ dr := rangeDeserialize(ranges) if del.Hard { for uid, pud := range t.perUser { pud.delID = t.delID t.perUser[uid] = pud // Update unread counters for all users who may have had these messages as unread if (pud.modeGiven & pud.modeWant).IsReader() { // Calculate how many unread messages were deleted for this user unreadDeleted := calculateUnreadInRanges(pud.readID, t.lastID, ranges) if unreadDeleted > 0 { // Decrease unread count (negative value) usersUpdateUnread(uid, -unreadDeleted, true) } } } // Broadcast the change to all, online and offline, exclude the session making the change. params := &presParams{delID: t.delID, delSeq: dr, actor: asUid.UserId()} filters := &presFilters{filterIn: types.ModeRead} t.presSubsOnline("del", params.actor, params, filters, sess.sid) t.presSubsOffline("del", params, filters, nilPresFilters, sess.sid, true) } else { pud := t.perUser[asUid] pud.delID = t.delID t.perUser[asUid] = pud // Notify user's other sessions t.presPubMessageDelete(asUid, pud.modeGiven&pud.modeWant, t.delID, dr, sess.sid) } sess.queueOut(NoErrParamsReply(msg, now, map[string]int{"del": t.delID})) return nil } // Handle request to delete the topic {del what="topic"}. // 1. If requester is the owner then it should have been handled at the hub, log an error. // 2. If requester is not the owner, treat it like {leave unsub=true}. func (t *Topic) replyDelTopic(sess *Session, asUid types.Uid, msg *ClientComMessage) error { if t.owner != asUid { return t.replyLeaveUnsub(sess, msg, asUid) } // This is an indication of a bug. logs.Err.Println("replyDelTopic called by owner (SHOULD NOT HAPPEN!)") return nil } // Delete credential func (t *Topic) replyDelCred(sess *Session, asUid types.Uid, authLvl auth.Level, msg *ClientComMessage) error { now := types.TimeNow() incomingReqTs := msg.Timestamp del := msg.Del if t.cat != types.TopicCatMe { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("del.cred: invalid topic category") } if del.Cred == nil || del.Cred.Method == "" { sess.queueOut(ErrMalformedReply(msg, now)) return errors.New("del.cred: missing method") } tags, err := deleteCred(asUid, authLvl, del.Cred) if tags != nil { // Check if anything has been actually removed. _, removed, _ := stringSliceDelta(t.tags, tags) if len(removed) > 0 { t.tags = tags t.presSubsOnline("tags", "", nilPresParams, nilPresFilters, "") } } else if err == nil { sess.queueOut(InfoNoActionReply(msg, now)) return nil } sess.queueOut(decodeStoreErrorExplicitTs(err, del.Id, del.Topic, now, incomingReqTs, nil)) return err } // Delete subscription. func (t *Topic) replyDelSub(sess *Session, asUid types.Uid, msg *ClientComMessage) error { now := types.TimeNow() del := msg.Del asChan, err := t.verifyChannelAccess(msg.Original) if err != nil { // User should not be able to address non-channel topic as channel. sess.queueOut(ErrNotFoundReply(msg, now)) return types.ErrNotFound } if asChan { // Don't allow channel readers to delete self-subscription. Use leave-unsub or del-topic. sess.queueOut(ErrPermissionDeniedReply(msg, now)) return errors.New("channel access denied: cannot delete subscription") } // Get ID of the affected user uid := types.ParseUserId(del.User) pud := t.perUser[asUid] if !(pud.modeGiven & pud.modeWant).IsAdmin() { err = errors.New("del.sub: permission denied") } else if uid.IsZero() || uid == asUid { // Cannot delete self-subscription. User [leave unsub] or [delete topic] err = errors.New("del.sub: cannot delete self-subscription") } else if t.cat == types.TopicCatP2P { // Don't try to delete the other P2P user err = errors.New("del.sub: cannot apply to a P2P topic") } if err != nil { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return err } pud, ok := t.perUser[uid] if !ok { sess.queueOut(InfoNoActionReply(msg, now)) return errors.New("del.sub: user not found") } // Check if the user being ejected is the owner. if (pud.modeGiven & pud.modeWant).IsOwner() { err = errors.New("del.sub: cannot evict topic owner") } else if !pud.modeWant.IsJoiner() { // If the user has banned the topic, subscription should not be deleted. Otherwise user may be re-invited // which defeats the purpose of banning. err = errors.New("del.sub: cannot delete banned subscription") } if err != nil { sess.queueOut(ErrPermissionDeniedReply(msg, now)) return err } // Delete user's subscription from the database if err := store.Subs.Delete(t.name, uid); err != nil { if err == types.ErrNotFound { sess.queueOut(InfoNoActionReply(msg, now)) } else { sess.queueOut(ErrUnknownReply(msg, now)) return err } } else { sess.queueOut(NoErrReply(msg, now)) } // Update cached unread count: negative value if (pud.modeWant & pud.modeGiven).IsReader() { usersUpdateUnread(uid, pud.readID-t.lastID, true) } // ModeUnset signifies deleted subscription as opposite to ModeNone - no access. t.notifySubChange(uid, asUid, false, pud.modeWant, pud.modeGiven, types.ModeUnset, types.ModeUnset, sess.sid) t.evictUser(uid, true, "") // Notify plugins. pluginSubscription(&types.Subscription{Topic: t.name, User: uid.String()}, plgActDel) // If all P2P users were deleted, suspend the topic to let it shut down. if t.cat == types.TopicCatP2P && t.subsCount() == 0 { t.markPaused(true) globals.hub.unreg <- &topicUnreg{del: true, sess: nil, rcptTo: t.name, pkt: nil} } return nil } // replyLeaveUnsub is a request to unsubscribe user and detach all user's sessions from topic. func (t *Topic) replyLeaveUnsub(sess *Session, msg *ClientComMessage, asUid types.Uid) error { now := types.TimeNow() if asUid.IsZero() { panic("replyLeaveUnsub: zero asUid") } if t.owner == asUid { if msg.init { sess.queueOut(ErrPermissionDeniedReply(msg, now)) } return errors.New("replyLeaveUnsub: owner cannot unsubscribe") } var err error var asChan bool if msg.init { asChan, err = t.verifyChannelAccess(msg.Original) if err != nil { sess.queueOut(ErrNotFoundReply(msg, now)) return errors.New("replyLeaveUnsub: incorrect addressing of channel") } } pud := t.perUser[asUid] // Delete user's subscription from the database; msg could be nil, so cannot use msg.Original. if pud.isChan { // Handle channel reader. err = store.Subs.Delete(types.GrpToChn(t.name), asUid) } else { // Handle subscriber. err = store.Subs.Delete(t.name, asUid) } if err != nil { if msg.init { if err == types.ErrNotFound { sess.queueOut(InfoNoActionReply(msg, now)) err = nil } else { sess.queueOut(ErrUnknownReply(msg, now)) } } return err } if msg.init { sess.queueOut(NoErrReply(msg, now)) } var oldWant types.AccessMode var oldGiven types.AccessMode if !asChan { // Update cached unread count: negative value if (pud.modeWant & pud.modeGiven).IsReader() { usersUpdateUnread(asUid, pud.readID-t.lastID, true) } oldWant, oldGiven = pud.modeWant, pud.modeGiven } else { oldWant, oldGiven = types.ModeCChnReader, types.ModeCChnReader // Unsubscribe user's devices from the channel (FCM topic). t.channelSubUnsub(asUid, false) } // Send prsence notifictions to admins, other users, and user's other sessions. t.notifySubChange(asUid, asUid, asChan, oldWant, oldGiven, types.ModeUnset, types.ModeUnset, sess.sid) // Evict all user's sessions, clear cached data, send notifications. t.evictUser(asUid, true, sess.sid) // Notify plugins. pluginSubscription(&types.Subscription{Topic: t.name, User: asUid.String()}, plgActDel) if t.cat == types.TopicCatGrp { // Decrement group's cached member count. t.subCnt-- } // If all P2P users were deleted, suspend the topic to let it shut down. if t.cat == types.TopicCatP2P && t.subsCount() == 0 { t.markPaused(true) globals.hub.unreg <- &topicUnreg{del: true, sess: nil, rcptTo: t.name, pkt: nil} } return nil } // evictUser evicts all given user's sessions from the topic and clears user's cached data, if appropriate. func (t *Topic) evictUser(uid types.Uid, unsub bool, skip string) { now := types.TimeNow() pud, ok := t.perUser[uid] // Detach user from topic if unsub { if t.cat == types.TopicCatP2P { // P2P: mark user as deleted pud.online = 0 pud.deleted = true t.perUser[uid] = pud } else if ok { // Grp: delete per-user data delete(t.perUser, uid) t.computePerUserAcsUnion() if !pud.isChan { usersRegisterUser(uid, false) } } } else if ok { if pud.isChan { delete(t.perUser, uid) // No need to call computePerUserAcsUnion because removal of a channel reader does not change union permissions. // No need to unregister user as we ignore unread channel messages. } else { // Clear online status pud.online = 0 t.perUser[uid] = pud } } // Detach all user's sessions msg := NoErrEvicted("", t.original(uid), now) msg.Ctrl.Params = map[string]any{"unsub": unsub} msg.SkipSid = skip msg.uid = uid msg.AsUser = uid.UserId() for s := range t.sessions { if pssd, removed := t.remSession(s, uid); pssd != nil { if removed { s.detachSession(t.name) } if s.sid != skip { s.queueOut(msg) } } } } // User's subscription to a topic has changed, send presence notifications. // 1. New subscription // 2. Deleted subscription // 3. Permissions changed // Sending to // (a) Topic admins online on topic itself. // (b) Topic admins offline on 'me' if approval is needed. // (c) If subscription is deleted, 'gone' to target. // (d) 'off' to topic members online if deleted or muted. // (e) To target user. func (t *Topic) notifySubChange(uid, actor types.Uid, isChan bool, oldWant, oldGiven, newWant, newGiven types.AccessMode, skip string) { unsub := newWant == types.ModeUnset || newGiven == types.ModeUnset target := uid.UserId() dWant := types.ModeNone.String() if newWant.IsDefined() { if oldWant.IsDefined() && !oldWant.IsZero() { dWant = oldWant.Delta(newWant) } else { dWant = newWant.String() } } dGiven := types.ModeNone.String() if newGiven.IsDefined() { if oldGiven.IsDefined() && !oldGiven.IsZero() { dGiven = oldGiven.Delta(newGiven) } else { dGiven = newGiven.String() } } params := &presParams{ target: target, actor: actor.UserId(), dWant: dWant, dGiven: dGiven, } filterSharers := &presFilters{ filterIn: types.ModeCSharer, excludeUser: target, } // Announce the change in permissions to the admins who are online in the topic, exclude the target // and exclude the actor's session. t.presSubsOnline("acs", target, params, filterSharers, skip) // If it's a new subscription or if the user asked for permissions in excess of what was granted, // announce the request to topic admins on 'me' so they can approve the request. The notification // is not sent to the target user or the actor's session. if newWant.BetterThan(newGiven) || oldWant == types.ModeNone { t.presSubsOffline("acs", params, filterSharers, filterSharers, skip, true) } // Handling of muting/unmuting. // Case A: subscription deleted. // Case B: subscription muted only. if unsub { // Subscription deleted. // In case of a P2P topic subscribe/unsubscribe users from each other's notifications. if t.cat == types.TopicCatP2P { uid2 := t.p2pOtherUser(uid) // Remove user1's subscription to user2 and notify user1's other sessions that he is gone. t.presSingleUserOffline(uid, newWant&newGiven, "gone", nilPresParams, skip, false) // Tell user2 that user1 is offline but let him keep sending updates in case user1 resubscribes. presSingleUserOfflineOffline(uid2, target, "off", nilPresParams, "") } else if t.cat == types.TopicCatGrp && !isChan { // Notify all sharers that the user is offline now. t.presSubsOnline("off", uid.UserId(), nilPresParams, filterSharers, skip) // Notify target that the subscription is gone. presSingleUserOfflineOffline(uid, t.name, "gone", nilPresParams, skip) } } else { // Subscription altered. if !(newWant & newGiven).IsPresencer() && (oldWant & oldGiven).IsPresencer() { // Subscription just muted. var source string if t.cat == types.TopicCatP2P { source = t.p2pOtherUser(uid).UserId() } else if t.cat == types.TopicCatGrp && !isChan { source = t.name } if source != "" { // Tell user1 to start discarding updates from muted topic/user. presSingleUserOfflineOffline(uid, source, "off+dis", nilPresParams, "") } } else if (newWant & newGiven).IsPresencer() && !(oldWant & oldGiven).IsPresencer() { // Subscription un-muted. // Notify subscriber of topic's online status. if t.cat == types.TopicCatGrp && !isChan { t.presSingleUserOffline(uid, newWant&newGiven, "?unkn+en", nilPresParams, "", false) } else if t.cat == types.TopicCatMe { // User is visible online now, notify subscribers. t.presUsersOfInterest("on+en", t.userAgent) } } // Notify target that permissions have changed. // Notify sessions online in the topic. t.presSubsOnlineDirect("acs", params, &presFilters{singleUser: target}, skip) // Notify target's other sessions on 'me'. t.presSingleUserOffline(uid, newWant&newGiven, "acs", params, skip, true) } } // FIXME: this won't work correctly with multiplexing sessions. func (t *Topic) mostRecentSession() *Session { var sess *Session var latest int64 for s := range t.sessions { sessionLastAction := atomic.LoadInt64(&s.lastAction) if sessionLastAction > latest { sess = s latest = sessionLastAction } } return sess } const ( // Topic is fully initialized. topicStatusLoaded = 0x1 // Topic is paused: all packets are rejected. topicStatusPaused = 0x2 // Topic is in the process of being deleted. This is irrecoverable. topicStatusMarkedDeleted = 0x10 // Topic is suspended: read-only mode. topicStatusReadOnly = 0x20 ) // statusChangeBits sets or removes given bits from t.status func (t *Topic) statusChangeBits(bits int32, set bool) { for { oldStatus := atomic.LoadInt32(&t.status) newStatus := oldStatus if set { newStatus |= bits } else { newStatus &= ^bits } if newStatus == oldStatus { break } if atomic.CompareAndSwapInt32(&t.status, oldStatus, newStatus) { break } } } // markLoaded indicates that topic subscribers have been loaded into memory. func (t *Topic) markLoaded() { t.statusChangeBits(topicStatusLoaded, true) } // markPaused pauses or unpauses the topic. When the topic is paused all // messages are rejected. func (t *Topic) markPaused(pause bool) { t.statusChangeBits(topicStatusPaused, pause) } // markDeleted marks topic as being deleted. func (t *Topic) markDeleted() { t.statusChangeBits(topicStatusMarkedDeleted, true) } // markReadOnly suspends/un-suspends the topic: adds or removes the 'read-only' flag. func (t *Topic) markReadOnly(readOnly bool) { t.statusChangeBits(topicStatusReadOnly, readOnly) } // isInactive checks if topic is paused or being deleted. func (t *Topic) isInactive() bool { return (atomic.LoadInt32(&t.status) & (topicStatusPaused | topicStatusMarkedDeleted)) != 0 } func (t *Topic) isReadOnly() bool { return (atomic.LoadInt32(&t.status) & topicStatusReadOnly) != 0 } func (t *Topic) isLoaded() bool { return (atomic.LoadInt32(&t.status) & topicStatusLoaded) != 0 } func (t *Topic) isDeleted() bool { return (atomic.LoadInt32(&t.status) & topicStatusMarkedDeleted) != 0 } // Get topic name suitable for the given client func (t *Topic) original(uid types.Uid) string { if t.cat == types.TopicCatP2P { if pud, ok := t.perUser[uid]; ok { return pud.topicName } panic("Invalid P2P topic") } if t.cat == types.TopicCatGrp && t.isChan { if t.perUser[uid].isChan { // This is a channel reader. return types.GrpToChn(t.xoriginal) } } return t.xoriginal } // Get ID of the other user in a P2P topic func (t *Topic) p2pOtherUser(uid types.Uid) types.Uid { if t.cat == types.TopicCatP2P { // Try to find user in subscribers. for u2 := range t.perUser { if u2.Compare(uid) != 0 { return u2 } } } // Even when one user is deleted, the subscription must be restored // before p2pOtherUser is called. panic("Not a valid P2P topic") } // Get per-session value of fnd.Public func (t *Topic) fndGetPublic(sess *Session) string { if t.cat == types.TopicCatFnd { if t.public == nil { return "" } if pubmap, ok := t.public.(map[string]any); ok { if public, ok := pubmap[sess.sid].(string); ok { return public } return "" } panic("Invalid Fnd.Public type") } panic("Not Fnd topic") } // Assign per-session fnd.Public. Returns true if value has been changed. func (t *Topic) fndSetPublic(sess *Session, public any) bool { if t.cat != types.TopicCatFnd { panic("Not Fnd topic") } var pubmap map[string]any var ok bool if t.public != nil { if pubmap, ok = t.public.(map[string]any); !ok { // This could only happen if fnd.public is assigned outside of this function. panic("Invalid Fnd.Public type") } } if pubmap == nil { pubmap = make(map[string]any) } if public != nil { pubmap[sess.sid] = public } else { ok = (pubmap[sess.sid] != nil) delete(pubmap, sess.sid) if len(pubmap) == 0 { pubmap = nil } } t.public = pubmap return ok } // Remove per-session value of fnd.Public. func (t *Topic) fndRemovePublic(sess *Session) { if t.public == nil { return } // FIXME: case of a multiplexing session won't work correctly. // Maybe handle it at the proxy topic. if pubmap, ok := t.public.(map[string]any); ok { delete(pubmap, sess.sid) return } panic("Invalid Fnd.Public type") } func (t *Topic) accessFor(authLvl auth.Level) types.AccessMode { return selectAccessMode(authLvl, t.accessAnon, t.accessAuth, getDefaultAccess(t.cat, true, false)) } // subsCount returns the number of topic subscribers. This method is different from subCnt with respect to channels: // * subsCount counts subscribers + attached channel users. // * subCnt counts all subscribers (including all channel users). func (t *Topic) subsCount() int { if t.cat == types.TopicCatP2P { count := 0 for uid := range t.perUser { if !t.perUser[uid].deleted { count++ } } return count } return len(t.perUser) } // Add session record. 'user' may be different from sess.uid. func (t *Topic) addSession(sess *Session, asUid types.Uid, isChanSub bool) { s := sess if sess.multi != nil { s = s.multi } if pssd, ok := t.sessions[s]; ok { // Subscription already exists. if s.isMultiplex() && !sess.background { // This slice is expected to be relatively short. // Not doing anything fancy here like maps or sorting. pssd.muids = append(pssd.muids, asUid) t.sessions[s] = pssd } // Maybe panic here. return } if s.isMultiplex() { if sess.background { t.sessions[s] = perSessionData{} } else { t.sessions[s] = perSessionData{muids: []types.Uid{asUid}, isChanSub: isChanSub} } } else { t.sessions[s] = perSessionData{uid: asUid, isChanSub: isChanSub} } } // Disconnects session from topic if either one of the following is true: // * 's' is an ordinary session AND ('asUid' is zero OR 'asUid' matches subscribed user). // * 's' is a multiplexing session and it's being dropped all together ('asUid' is zero ). // If 's' is a multiplexing session and asUid is not zero, it's removed from the list of session // users 'muids'. // Returns perSessionData if it was found and true if session was actually detached from topic. func (t *Topic) remSession(sess *Session, asUid types.Uid) (*perSessionData, bool) { s := sess if sess.multi != nil { s = s.multi } pssd, ok := t.sessions[s] if !ok { // Session not found at all. return nil, false } if pssd.uid == asUid || asUid.IsZero() { delete(t.sessions, s) return &pssd, true } for i := range pssd.muids { if pssd.muids[i] == asUid { pssd.muids[i] = pssd.muids[len(pssd.muids)-1] pssd.muids = pssd.muids[:len(pssd.muids)-1] t.sessions[s] = pssd if len(pssd.muids) == 0 { delete(t.sessions, s) return &pssd, true } return &pssd, false } } return nil, false } // Check if topic has any online (non-background) users. func (t *Topic) isOnline() bool { // Find at least one non-background session. for s, pssd := range t.sessions { if s.isMultiplex() && len(pssd.muids) > 0 { return true } if !s.background { return true } } return false } // Verifies if topic can be access by the provided name: access any topic as non-channel, access channel as channel. // Returns true if access is for channel, false if not and error if access is invalid. func (t *Topic) verifyChannelAccess(asTopic string) (bool, error) { if !types.IsChannel(asTopic) { return false, nil } if t.isChan { return true, nil } return false, types.ErrNotFound } // Infer topic category from name. func topicCat(name string) types.TopicCat { return types.GetTopicCat(name) } // Generate the name of the group topic as a "grp" followed by random-looking // unique string. func genTopicName() string { return "grp" + store.Store.GetUidString() } // Convert expanded (routable) topic name into name suitable for sending to the user. // For example p2pAbCDef123 -> usrAbCDef func topicNameForUser(name string, uid types.Uid, isChan bool) string { switch topicCat(name) { case types.TopicCatMe: return "me" case types.TopicCatFnd: return "fnd" case types.TopicCatP2P: topic, _ := types.P2PNameForUser(uid, name) return topic case types.TopicCatGrp: if isChan { return types.GrpToChn(name) } } return name } // calculateUnreadInRanges calculates how many unread messages are within the given ranges. // unreadStart is the first unread message SeqId (readID + 1), unreadEnd is the last possible message SeqId. // Assumes ranges are sorted by Low ascending. func calculateUnreadInRanges(readID, lastID int, ranges []types.Range) int { if readID >= lastID { // No unread messages return 0 } unreadStart := readID + 1 unreadEnd := lastID // Sum up unread messages. count := 0 for i := 0; i < len(ranges); i++ { rangeStart := ranges[i].Low rangeEnd := ranges[i].Hi if rangeEnd == 0 { rangeEnd = rangeStart + 1 } // Find the first range where rangeEnd > readID if rangeEnd <= readID { continue } // Find intersection of [unreadStart, unreadEnd] and [rangeStart, rangeEnd) intersectionStart := max(unreadStart, rangeStart) intersectionEnd := min(unreadEnd+1, rangeEnd) // +1 because unreadEnd is inclusive if intersectionStart < intersectionEnd { count += intersectionEnd - intersectionStart } } return count }