Files
2025-11-08 11:11:14 +03:00

4044 lines
121 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/******************************************************************************
*
* Description :
* An isolated communication channel (chat room, 1:1 conversation) for
* usually multiple users. There is no communication across topics.
*
*****************************************************************************/
package main
import (
"errors"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/tinode/chat/server/auth"
"github.com/tinode/chat/server/logs"
"github.com/tinode/chat/server/store"
"github.com/tinode/chat/server/store/types"
)
// Topic is an isolated communication channel
type Topic struct {
// Еxpanded/unique name of the topic.
name string
// For single-user topics session-specific topic name, such as 'me',
// otherwise the same as 'name'.
xoriginal string
// Topic category
cat types.TopicCat
// Name of the master node for this topic if isProxy is true.
masterNode string
// Time when the topic was first created.
created time.Time
// Time when the topic was last updated.
updated time.Time
// Time of the last outgoing message.
touched time.Time
// Server-side ID of the last data message
lastID int
// ID of the deletion operation. Not an ID of the message.
delID int
// Total count of subscribers (excluding deleted).
// This is different from subsCount() for channels.
subCnt int
// Last published userAgent ('me' topic only)
userAgent string
// User ID of the topic owner/creator. Could be zero.
owner types.Uid
// Default access mode
accessAuth types.AccessMode
accessAnon types.AccessMode
// Topic discovery tags
tags []string
// Auxiliary set of key-value pairs
aux map[string]any
// Topic's public data
public any
// Topic's trusted data
trusted any
// Topic's per-subscriber data
perUser map[types.Uid]perUserData
// Union of permissions across all users (used by proxy sessions with uid = 0).
// These are used by master topics only (in the proxy-master topic context)
// as a coarse-grained attempt to perform acs checks since proxy sessions "impersonate"
// multiple normal sessions (uids) which may have different uids.
modeWantUnion types.AccessMode
modeGivenUnion types.AccessMode
// User's contact list (not nil for 'me' topic only).
// The map keys are UserIds for P2P topics and grpXXX for group topics.
perSubs map[string]perSubsData
// Sessions attached to this topic. The UID kept here may not match Session.uid if session is
// subscribed on behalf of another user.
sessions map[*Session]perSessionData
// Present video call data. Null when there's no call in progress or being established.
// Only available for p2p topics.
currentCall *videoCall
// Channel for receiving client messages from sessions or other topics, buffered = 256.
clientMsg chan *ClientComMessage
// Channel for receiving server messages generated on the server or received from other cluster nodes, buffered = 64.
serverMsg chan *ServerComMessage
// Channel for receiving {get}/{set}/{del} requests, buffered = 64
meta chan *ClientComMessage
// Subscribe requests from sessions, buffered = 256
reg chan *ClientComMessage
// Unsubscribe requests from sessions, buffered = 256
unreg chan *ClientComMessage
// Session updates: background sessions coming online, User Agent changes. Buffered = 32
supd chan *sessionUpdate
// Channel to terminate topic -- either the topic is deleted or system is being shut down. Buffered = 1.
exit chan *shutDown
// Channel to receive topic master responses (used only by proxy topics).
proxy chan *ClusterResp
// Channel to receive topic proxy service requests, e.g. sending deferred notifications.
master chan *ClusterSessUpdate
// Flag which tells topic lifecycle status: new, ready, paused, marked for deletion.
status int32
// Channel functionality is enabled for the group topic.
isChan bool
// If isProxy == true, the actual topic is hosted by another cluster member.
// The topic should:
// 1. forward all messages to master
// 2. route replies from the master to sessions.
// 3. disconnect sessions at master's request.
// 4. shut down the topic at master's request.
// 5. aggregate access permissions on behalf of attached sessions.
isProxy bool
// Countdown timer for destroying the topic when there are no more attached sessions to it.
killTimer *time.Timer
// Countdown timer for terminating iniatated (but not established) calls.
callEstablishmentTimer *time.Timer
}
// perUserData holds topic's cache of per-subscriber data
type perUserData struct {
// Count of subscription online and announced (presence not deferred).
online int
// Last t.lastId reported by user through {pres} as received or read
recvID int
readID int
// ID of the latest Delete operation
delID int
private any
modeWant types.AccessMode
modeGiven types.AccessMode
// P2P only:
public any
trusted any
lastSeen *time.Time
lastUA string
topicName string
deleted bool
// The user is a channel subscriber.
isChan bool
}
// perSubsData holds user's (on 'me' topic) cache of subscription data
type perSubsData struct {
// The other user's/topic's online status as seen by this user.
online bool
// True if we care about the updates from the other user/topic: (want&given).IsPresencer().
// Does not affect sending notifications from this user to other users.
enabled bool
}
// Data related to a subscription of a session to a topic.
type perSessionData struct {
// ID of the subscribed user (asUid); not necessarily the session owner.
// Could be zero for multiplexed sessions in cluster.
uid types.Uid
// This is a channel subscription
isChanSub bool
// IDs of subscribed users in a multiplexing session.
muids []types.Uid
}
// Reasons why topic is being shut down.
const (
// StopNone no reason given/default.
StopNone = iota
// StopShutdown terminated due to system shutdown.
StopShutdown
// StopDeleted terminated due to being deleted.
StopDeleted
// StopRehashing terminated due to cluster rehashing (moved to a different node).
StopRehashing
)
// Topic shutdown
type shutDown struct {
// Channel to report back completion of topic shutdown. Could be nil
done chan<- bool
// Topic is being deleted as opposite to total system shutdown
reason int
}
// Session update: user agent change or background session becoming normal.
// If sess is nil then user agent change, otherwise bg to fg update.
type sessionUpdate struct {
sess *Session
userAgent string
}
var (
nilPresParams = &presParams{}
nilPresFilters = &presFilters{}
)
func (t *Topic) run(hub *Hub) {
if !t.isProxy {
t.runLocal(hub)
} else {
t.runProxy(hub)
}
}
// getPerUserAcs returns `want` and `given` permissions for the given user id.
func (t *Topic) getPerUserAcs(uid types.Uid) (types.AccessMode, types.AccessMode) {
if uid.IsZero() {
// For zero uids (typically for proxy sessions), return the union of all permissions.
return t.modeWantUnion, t.modeGivenUnion
}
pud := t.perUser[uid]
return pud.modeWant, pud.modeGiven
}
// passesPresenceFilters applies presence filters to `msg`
// depending on per-user want and given acls for the provided `uid`.
func (t *Topic) passesPresenceFilters(pres *MsgServerPres, uid types.Uid) bool {
modeWant, modeGiven := t.getPerUserAcs(uid)
// "gone" and "acs" notifications are sent even if the topic is muted.
return ((modeGiven & modeWant).IsPresencer() || pres.What == "gone" || pres.What == "acs") &&
(pres.FilterIn == 0 || int(modeGiven&modeWant)&pres.FilterIn != 0) &&
(pres.FilterOut == 0 || int(modeGiven&modeWant)&pres.FilterOut == 0)
}
// userIsReader returns true if the user (specified by `uid`) may read the given topic.
func (t *Topic) userIsReader(uid types.Uid) bool {
modeWant, modeGiven := t.getPerUserAcs(uid)
return (modeGiven & modeWant).IsReader()
}
// prepareBroadcastableMessage sets the topic field in `msg` depending on the uid and subscription type.
func (t *Topic) prepareBroadcastableMessage(msg *ServerComMessage, uid types.Uid, isChanSub bool) {
// We are only interested in broadcastable messages.
if msg.Data == nil && msg.Pres == nil && msg.Info == nil {
return
}
if (t.cat == types.TopicCatP2P && !uid.IsZero()) || (t.cat == types.TopicCatGrp && t.isChan) {
// For p2p topics topic name is dependent on receiver.
// Channel topics may be presented as grpXXX or chnXXX.
var topicName string
if isChanSub {
topicName = types.GrpToChn(t.xoriginal)
} else {
topicName = t.original(uid)
}
switch {
case msg.Data != nil:
msg.Data.Topic = topicName
case msg.Pres != nil:
msg.Pres.Topic = topicName
case msg.Info != nil:
msg.Info.Topic = topicName
}
}
// Send channel messages anonymously.
if isChanSub && msg.Data != nil {
msg.Data.From = ""
}
}
// computePerUserAcsUnion computes want and given permissions unions over all topic's subscribers.
func (t *Topic) computePerUserAcsUnion() {
wantUnion := types.ModeNone
givenUnion := types.ModeNone
for _, pud := range t.perUser {
if pud.isChan {
continue
}
wantUnion |= pud.modeWant
givenUnion |= pud.modeGiven
}
if t.isChan {
// Apply standard channel permissions to channel topics.
wantUnion |= types.ModeCChnReader
givenUnion |= types.ModeCChnReader
}
t.modeWantUnion = wantUnion
t.modeGivenUnion = givenUnion
}
// unregisterSession implements all logic following receipt of a leave
// request via the Topic.unreg channel.
func (t *Topic) unregisterSession(msg *ClientComMessage) {
if t.currentCall != nil {
shouldTerminateCall := false
if msg.sess.isMultiplex() {
// Check if any of the call party sessions is multiplexed over msg.sess.
for _, p := range t.currentCall.parties {
if p.sess.isProxy() && p.sess.multi == msg.sess {
shouldTerminateCall = true
break
}
}
} else if _, found := t.currentCall.parties[msg.sess.sid]; found {
// Normal session disconnecting from topic. Just terminate the call.
shouldTerminateCall = true
}
if shouldTerminateCall {
t.terminateCallInProgress(false)
}
}
t.handleLeaveRequest(msg, msg.sess)
if msg.init && msg.sess.inflightReqs != nil {
// If it's a client initiated request.
msg.sess.inflightReqs.Done()
}
// If there are no more subscriptions to this topic, start a kill timer
if len(t.sessions) == 0 && t.cat != types.TopicCatSys {
t.killTimer.Reset(idleMasterTopicTimeout)
}
}
// registerSession handles a session join (registration) request
// received via the Topic.reg channel.
func (t *Topic) registerSession(msg *ClientComMessage) {
// Request to add a connection to this topic
if t.isInactive() {
msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow()))
} else if msg.sess.getSub(t.name) != nil {
// Session is already subscribed to topic. Subscription is checked in session.go,
// but there is a gap between topic creation/un-pausing and processing the
// first subscription request, before the topic is linked to session: a client
// may send several subscription requests in that gap.
msg.sess.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp))
} else {
// The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die
// while processing the call.
t.killTimer.Stop()
if err := t.handleSubscription(msg); err == nil {
if msg.Sub.Created {
// Call plugins with the new topic
pluginTopic(t, plgActCreate)
}
} else {
if len(t.sessions) == 0 && t.cat != types.TopicCatSys {
// Failed to subscribe, the topic is still inactive
t.killTimer.Reset(idleMasterTopicTimeout)
}
logs.Warn.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, msg.sess.sid)
}
}
if msg.sess.inflightReqs != nil {
msg.sess.inflightReqs.Done()
}
}
func (t *Topic) handleMetaGet(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) {
if msg.MetaWhat&constMsgMetaDesc != 0 {
if err := t.replyGetDesc(msg.sess, asUid, asChan, msg.Get.Desc, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Desc failed: %s", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaSub != 0 {
if err := t.replyGetSub(msg.sess, asUid, authLevel, asChan, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Sub failed: %s", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaData != 0 {
if err := t.replyGetData(msg.sess, asUid, asChan, msg.Get.Data, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Data failed: %s", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaDel != 0 {
if err := t.replyGetDel(msg.sess, asUid, msg.Get.Del, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Del failed: %s", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaTags != 0 {
if err := t.replyGetTags(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Tags failed: %s", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaCred != 0 {
if err := t.replyGetCreds(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Creds failed: %s", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaAux != 0 {
logs.Warn.Printf("topic[%s] handle getAux", t.name)
if err := t.replyGetAux(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Get.Aux failed: %s", t.name, err)
}
}
}
func (t *Topic) handleMetaSet(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) {
if msg.MetaWhat&constMsgMetaDesc != 0 {
if err := t.replySetDesc(msg.sess, asUid, asChan, authLevel, msg); err == nil {
// Notify plugins of the update
pluginTopic(t, plgActUpd)
} else {
logs.Warn.Printf("topic[%s] meta.Set.Desc failed: %v", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaSub != 0 {
if err := t.replySetSub(msg.sess, msg, asChan); err != nil {
logs.Warn.Printf("topic[%s] meta.Set.Sub failed: %v", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaTags != 0 {
if err := t.replySetTags(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Set.Tags failed: %v", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaCred != 0 {
if err := t.replySetCred(msg.sess, asUid, authLevel, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Set.Cred failed: %v", t.name, err)
}
}
if msg.MetaWhat&constMsgMetaAux != 0 {
if err := t.replySetAux(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] meta.Set.Aux failed: %v", t.name, err)
}
}
}
func (t *Topic) handleMetaDel(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) {
var err error
switch msg.MetaWhat {
case constMsgDelMsg:
err = t.replyDelMsg(msg.sess, asUid, asChan, msg)
case constMsgDelSub:
err = t.replyDelSub(msg.sess, asUid, msg)
case constMsgDelTopic:
err = t.replyDelTopic(msg.sess, asUid, msg)
case constMsgDelCred:
err = t.replyDelCred(msg.sess, asUid, authLevel, msg)
}
if err != nil {
logs.Warn.Printf("topic[%s] meta.Del failed: %v", t.name, err)
}
}
// handleMeta implements logic handling meta requests
// received via the Topic.meta channel.
func (t *Topic) handleMeta(msg *ClientComMessage) {
// Request to get/set topic metadata
asUid := types.ParseUserId(msg.AsUser)
authLevel := auth.Level(msg.AuthLvl)
asChan, err := t.verifyChannelAccess(msg.Original)
if err != nil {
// User should not be able to address non-channel topic as channel.
msg.sess.queueOut(ErrNotFoundReply(msg, types.TimeNow()))
return
}
switch {
case msg.Get != nil:
// Get request
t.handleMetaGet(msg, asUid, asChan, authLevel)
case msg.Set != nil:
// Set request
t.handleMetaSet(msg, asUid, asChan, authLevel)
case msg.Del != nil:
// Del request
t.handleMetaDel(msg, asUid, asChan, authLevel)
}
}
func (t *Topic) handleSessionUpdate(upd *sessionUpdate, currentUA *string, uaTimer *time.Timer) {
if upd.sess != nil {
// 'me' & 'grp' only. Background session timed out and came online.
t.sessToForeground(upd.sess)
} else if *currentUA != upd.userAgent {
if t.cat != types.TopicCatMe {
logs.Warn.Panicln("invalid topic category in UA update", t.name)
}
// 'me' only. Process an update to user agent from one of the sessions.
*currentUA = upd.userAgent
uaTimer.Reset(uaTimerDelay)
}
}
func (t *Topic) handleUATimerEvent(currentUA string) {
// Publish user agent changes after a delay
if currentUA == "" || currentUA == t.userAgent {
return
}
t.userAgent = currentUA
t.presUsersOfInterest("ua", t.userAgent)
}
func (t *Topic) handleTopicTimeout(hub *Hub, currentUA string, uaTimer, defrNotifTimer *time.Timer) {
// Topic timeout
hub.unreg <- &topicUnreg{rcptTo: t.name}
defrNotifTimer.Stop()
switch t.cat {
case types.TopicCatMe:
uaTimer.Stop()
t.presUsersOfInterest("off", currentUA)
case types.TopicCatGrp:
t.presSubsOffline("off", nilPresParams, nilPresFilters, nilPresFilters, "", false)
}
}
func (t *Topic) handleTopicTermination(sd *shutDown) {
// Handle four cases:
// 1. Topic is shutting down by timer due to inactivity (reason == StopNone)
// 2. Topic is being deleted (reason == StopDeleted)
// 3. System shutdown (reason == StopShutdown, done != nil).
// 4. Cluster rehashing (reason == StopRehashing)
switch sd.reason {
case StopDeleted:
if t.cat == types.TopicCatGrp {
t.presSubsOffline("gone", nilPresParams, nilPresFilters, nilPresFilters, "", false)
}
// P2P users get "off+remove" earlier in the process
// Inform plugins that the topic is deleted
pluginTopic(t, plgActDel)
case StopRehashing:
// Must send individual messages to sessions because normal sending through the topic's
// broadcast channel won't work - it will be shut down too soon.
t.presSubsOnlineDirect("term", nilPresParams, nilPresFilters, "")
}
// In case of a system shutdown don't bother with notifications. They won't be delivered anyway.
// Tell sessions to remove the topic
for s := range t.sessions {
s.detachSession(t.name)
}
if t.cat == types.TopicCatGrp {
// Update topic subscriber count.
if err := store.Topics.UpdateSubCnt(t.name); err != nil {
logs.Warn.Println("topic update sub cnt:", err)
}
}
usersRegisterTopic(t, false)
// Report completion back to sender, if 'done' is not nil.
if sd.done != nil {
sd.done <- true
}
}
func (t *Topic) runLocal(hub *Hub) {
// Kills topic after a period of inactivity.
t.killTimer = time.NewTimer(time.Hour)
t.killTimer.Stop()
// Notifies about user agent change. 'me' only
uaTimer := time.NewTimer(time.Minute)
var currentUA string
uaTimer.Stop()
// Ticker for deferred presence notifications.
defrNotifTimer := time.NewTimer(time.Millisecond * 500)
t.callEstablishmentTimer = time.NewTimer(time.Second)
t.callEstablishmentTimer.Stop()
for {
select {
case msg := <-t.reg:
t.registerSession(msg)
case msg := <-t.unreg:
t.unregisterSession(msg)
case msg := <-t.clientMsg:
t.handleClientMsg(msg)
case msg := <-t.serverMsg:
t.handleServerMsg(msg)
case meta := <-t.meta:
t.handleMeta(meta)
case upd := <-t.supd:
t.handleSessionUpdate(upd, &currentUA, uaTimer)
case <-uaTimer.C:
t.handleUATimerEvent(currentUA)
case <-t.killTimer.C:
t.handleTopicTimeout(hub, currentUA, uaTimer, defrNotifTimer)
case <-t.callEstablishmentTimer.C:
t.terminateCallInProgress(true)
case sd := <-t.exit:
t.handleTopicTermination(sd)
return
}
}
}
// handleClientMsg is the top-level handler of messages received by the topic from sessions.
func (t *Topic) handleClientMsg(msg *ClientComMessage) {
if msg.Pub != nil {
t.handlePubBroadcast(msg)
} else if msg.Note != nil {
t.handleNoteBroadcast(msg)
} else {
// TODO(gene): maybe remove this panic.
logs.Err.Panic("topic: wrong client message type for broadcasting", t.name)
}
}
// handleServerMsg is the top-level handler of messages generated at the server.
func (t *Topic) handleServerMsg(msg *ServerComMessage) {
// Server-generated message: {info} or {pres}.
if t.isInactive() {
// Ignore message - the topic is paused or being deleted.
return
}
if msg.Pres != nil {
t.handlePresence(msg)
} else if msg.Info != nil {
t.broadcastToSessions(msg)
} else {
// TODO(gene): maybe remove this panic.
logs.Err.Panic("topic: wrong server message type for broadcasting", t.name)
}
}
// Session subscribed to a topic, created == true if topic was just created and {pres} needs to be announced
func (t *Topic) handleSubscription(msg *ClientComMessage) error {
asUid := types.ParseUserId(msg.AsUser)
authLevel := auth.Level(msg.AuthLvl)
asChan, err := t.verifyChannelAccess(msg.Original)
if err != nil {
// User should not be able to address non-channel topic as channel.
msg.sess.queueOut(ErrNotFoundReply(msg, types.TimeNow()))
return err
}
if err := t.subscriptionReply(asChan, msg); err != nil {
return err
}
msgsub := msg.Sub
getWhat := 0
if msgsub.Get != nil {
getWhat = parseMsgClientMeta(msgsub.Get.What)
}
if getWhat&constMsgMetaDesc != 0 {
// Send get.desc as a {meta} packet.
if err := t.replyGetDesc(msg.sess, asUid, asChan, msgsub.Get.Desc, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Desc failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
if getWhat&constMsgMetaSub != 0 {
// Send get.sub response as a separate {meta} packet
if err := t.replyGetSub(msg.sess, asUid, authLevel, asChan, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Sub failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
if getWhat&constMsgMetaTags != 0 {
// Send get.tags response as a separate {meta} packet
if err := t.replyGetTags(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Tags failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
if getWhat&constMsgMetaCred != 0 {
// Send get.tags response as a separate {meta} packet
if err := t.replyGetCreds(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Cred failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
if getWhat&constMsgMetaAux != 0 {
// Send get.aux response as a separate {meta} packet
if err := t.replyGetAux(msg.sess, asUid, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Aux failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
if getWhat&constMsgMetaData != 0 {
// Send get.data response as {data} packets
if err := t.replyGetData(msg.sess, asUid, asChan, msgsub.Get.Data, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Data failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
if getWhat&constMsgMetaDel != 0 {
// Send get.del response as a separate {meta} packet
if err := t.replyGetDel(msg.sess, asUid, msgsub.Get.Del, msg); err != nil {
logs.Warn.Printf("topic[%s] handleSubscription Get.Del failed: %v sid=%s", t.name, err, msg.sess.sid)
}
}
return nil
}
// handleLeaveRequest processes a session leave request.
func (t *Topic) handleLeaveRequest(msg *ClientComMessage, sess *Session) {
// Remove connection from topic; session may continue to function
now := types.TimeNow()
var asUid types.Uid
var asChan bool
if msg.init {
asUid = types.ParseUserId(msg.AsUser)
var err error
asChan, err = t.verifyChannelAccess(msg.Original)
if err != nil {
// Group topic cannot be addressed as channel unless channel functionality is enabled.
sess.queueOut(ErrNotFoundReply(msg, now))
}
}
if t.isInactive() {
if !asUid.IsZero() && msg.init {
sess.queueOut(ErrLockedReply(msg, now))
}
return
}
// User wants to leave and unsubscribe.
if msg.init && msg.Leave.Unsub {
// asUid must not be Zero.
if err := t.replyLeaveUnsub(sess, msg, asUid); err != nil {
logs.Err.Println("failed to unsub", err, sess.sid)
}
return
}
// User wants to leave without unsubscribing.
if pssd, _ := t.remSession(sess, asUid); pssd != nil {
if !sess.isProxy() {
sess.delSub(t.name)
}
if pssd.isChanSub != asChan {
// Cannot address non-channel subscription as channel and vice versa.
if msg.init {
// Group topic cannot be addressed as channel unless channel functionality is enabled.
sess.queueOut(ErrNotFoundReply(msg, now))
}
return
}
var uid types.Uid
if sess.isProxy() {
// Multiplexing session, multiple UIDs.
uid = asUid
} else {
// Simple session, single UID.
uid = pssd.uid
}
var pud perUserData
// uid may be zero when a proxy session is trying to terminate (it called unsubAll).
if !uid.IsZero() {
// UID not zero: one user removed.
pud = t.perUser[uid]
if !sess.background {
pud.online--
t.perUser[uid] = pud
}
} else if len(pssd.muids) > 0 {
// UID is zero: multiplexing session is dropped altogether.
// Using new 'uid' and 'pud' variables.
for _, uid := range pssd.muids {
pud := t.perUser[uid]
pud.online--
t.perUser[uid] = pud
}
} else if !sess.isCluster() {
logs.Warn.Panic("cannot determine uid: leave req", msg, sess)
}
switch t.cat {
case types.TopicCatMe:
mrs := t.mostRecentSession()
if mrs == nil {
// Last session
mrs = sess
} else {
// Change UA to the most recent live session and announce it. Don't block.
select {
case t.supd <- &sessionUpdate{userAgent: mrs.userAgent}:
default:
}
}
meUid := uid
if meUid.IsZero() && len(pssd.muids) > 0 {
// The entire multiplexing session is being dropped. Need to find owner's UID.
// len(pssd.muids) could be zero if the session was a background session.
meUid = pssd.muids[0]
}
if !meUid.IsZero() {
// Update user's last online timestamp & user agent. Only one user can be subscribed to 'me' topic.
if err := store.Users.UpdateLastSeen(meUid, mrs.userAgent, now); err != nil {
logs.Warn.Println("user update last seen:", err)
}
}
case types.TopicCatFnd:
// FIXME: this does not work correctly in case of a multiplexing session.
// Remove ephemeral query.
t.fndRemovePublic(sess)
case types.TopicCatGrp:
// Subscriber is going offline in the topic: notify other subscribers who are currently online.
readFilter := &presFilters{filterIn: types.ModeRead}
if !uid.IsZero() {
if pud.online == 0 {
if asChan {
// Simply delete record from perUserData
delete(t.perUser, uid)
} else {
t.presSubsOnline("off", uid.UserId(), nilPresParams, readFilter, "")
}
}
} else if len(pssd.muids) > 0 {
for _, uid := range pssd.muids {
if t.perUser[uid].online == 0 {
if asChan {
// delete record from perUserData
delete(t.perUser, uid)
} else {
t.presSubsOnline("off", uid.UserId(), nilPresParams, readFilter, "")
}
}
}
}
}
if !uid.IsZero() {
// Respond if contains an id.
if msg.init {
sess.queueOut(NoErrReply(msg, now))
}
}
}
}
// sessToForeground updates perUser online status accounting and fires due
// deferred notifications for the provided session.
func (t *Topic) sessToForeground(sess *Session) {
s := sess
if s.multi != nil {
s = s.multi
}
if pssd, ok := t.sessions[s]; ok && !pssd.isChanSub {
uid := pssd.uid
if s.isMultiplex() {
// If 's' is a multiplexing session, then sess is a proxy and it contains correct UID.
// Add UID to the list of online users.
uid = sess.uid
pssd.muids = append(pssd.muids, uid)
}
// Mark user as online
pud := t.perUser[uid]
pud.online++
t.perUser[uid] = pud
t.sendSubNotifications(uid, sess.sid, sess.userAgent)
}
}
// Send immediate presence notification in response to a subscription.
// Send push notification to the P2P counterpart.
// In case of a new channel subscription subscribe user to an FCM topic.
// These notifications are always sent immediately even if background is requested.
func (t *Topic) sendImmediateSubNotifications(asUid types.Uid, acs *MsgAccessMode, sreg *ClientComMessage, now time.Time) {
modeWant, _ := types.ParseAcs([]byte(acs.Want))
modeGiven, _ := types.ParseAcs([]byte(acs.Given))
mode := modeWant & modeGiven
asChan := t.isChan && types.IsChannel(sreg.Original)
if t.cat == types.TopicCatP2P {
uid2 := t.p2pOtherUser(asUid)
pud2 := t.perUser[uid2]
mode2 := pud2.modeGiven & pud2.modeWant
if pud2.deleted {
mode2 = types.ModeInvalid
}
// Inform the other user that the topic was just created.
if sreg.Sub.Created {
t.presSingleUserOffline(uid2, mode2, "acs", &presParams{
dWant: pud2.modeWant.String(),
dGiven: pud2.modeGiven.String(),
actor: asUid.UserId(),
}, "", false)
}
if sreg.Sub.Newsub {
// Notify current user's 'me' topic to accept notifications from user2
t.presSingleUserOffline(asUid, mode, "?none+en", nilPresParams, "", false)
// Initiate exchange of 'online' status with the other user.
// We don't know if the current user is online in the 'me' topic,
// so sending an '?unkn' status to user2. His 'me' topic
// will reply with user2's status and request an actual status from user1.
status := "?unkn"
if mode2.IsPresencer() {
// If user2 should receive notifications, enable it.
status += "+en"
}
t.presSingleUserOffline(uid2, mode2, status, nilPresParams, "", false)
// Also send a push notification to the other user.
sendPush(t.pushForP2PSub(asUid, uid2, pud2.modeWant, pud2.modeGiven, now))
}
} else if t.cat == types.TopicCatGrp && !asChan && sreg.Sub.Newsub {
// For new group subscriptions, notify other group members.
sendPush(t.pushForGroupSub(asUid, now))
}
// newsub could be true only for p2p and group topics, no need to check topic category explicitly.
if sreg.Sub.Newsub {
// Notify creator's other sessions that the subscription (or the entire topic) was created.
t.presSingleUserOffline(asUid, mode, "acs",
&presParams{
dWant: acs.Want,
dGiven: acs.Given,
actor: asUid.UserId(),
},
sreg.sess.sid, false)
if asChan {
t.channelSubUnsub(asUid, true)
}
}
}
// Send immediate or deferred presence notification in response to a subscription.
// Not used by channels.
func (t *Topic) sendSubNotifications(asUid types.Uid, sid, userAgent string) {
switch t.cat {
case types.TopicCatMe:
// Notify user's contact that the given user is online now.
if !t.isLoaded() {
t.markLoaded()
if err := t.loadContacts(asUid); err != nil {
logs.Err.Println("topic: failed to load contacts", t.name, err.Error())
}
// User online: notify users of interest without forcing response (no +en here).
t.presUsersOfInterest("on", userAgent)
}
case types.TopicCatGrp:
pud := t.perUser[asUid]
if pud.isChan {
// Not sendng notifications for channel readers.
return
}
// Enable notifications for a new group topic, if appropriate.
if !t.isLoaded() {
t.markLoaded()
status := "on"
if (pud.modeGiven & pud.modeWant).IsPresencer() {
status += "+en"
}
// Notify topic subscribers that the topic is online now.
t.presSubsOffline(status, nilPresParams, nilPresFilters, nilPresFilters, "", false)
} else if pud.online == 1 {
// If this is the first session of the user in the topic.
// Notify other online group members that the user is online now.
t.presSubsOnline("on", asUid.UserId(), nilPresParams,
&presFilters{filterIn: types.ModeRead}, sid)
}
}
}
// Saves a new message (defined by head, content and attachments) in the topic
// in response to a client request (msg, asUid) and broadcasts it to the attached sessions.
func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid, noEcho bool, attachments []string, head map[string]any, content any) error {
pud, userFound := t.perUser[asUid]
// Anyone is allowed to post to 'sys' topic.
if t.cat != types.TopicCatSys {
// If it's not 'sys' check write permission.
if !(pud.modeWant & pud.modeGiven).IsWriter() {
msg.sess.queueOut(ErrPermissionDenied(msg.Id, t.original(asUid), msg.Timestamp))
return types.ErrPermissionDenied
}
}
if msg.sess != nil && msg.sess.uid != asUid {
// The "sender" header contains ID of the user who sent the message on behalf of asUid.
if head == nil {
head = map[string]any{}
}
head["sender"] = msg.sess.uid.UserId()
} else if head != nil {
// Make sure the received Head does not include a fake "sender" header.
delete(head, "sender")
}
markedReadBySender := false
if err, unreadUpdated := store.Messages.Save(
&types.Message{
ObjHeader: types.ObjHeader{CreatedAt: msg.Timestamp},
SeqId: t.lastID + 1,
Topic: t.name,
From: asUid.String(),
Head: head,
Content: content,
}, attachments, (pud.modeGiven & pud.modeWant).IsReader()); err != nil {
logs.Warn.Printf("topic[%s]: failed to save message: %v", t.name, err)
msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp))
return err
} else {
markedReadBySender = unreadUpdated
}
t.lastID++
t.touched = msg.Timestamp
if userFound {
pud.readID = t.lastID
pud.recvID = t.lastID
t.perUser[asUid] = pud
}
if msg.Id != "" && msg.sess != nil {
reply := NoErrAccepted(msg.Id, t.original(asUid), msg.Timestamp)
reply.Ctrl.Params = map[string]any{"seq": t.lastID}
msg.sess.queueOut(reply)
}
data := &ServerComMessage{
Data: &MsgServerData{
Topic: msg.Original,
From: msg.AsUser,
Timestamp: msg.Timestamp,
SeqId: t.lastID,
Head: head,
Content: content,
},
// Internal-only values.
Id: msg.Id,
RcptTo: msg.RcptTo,
AsUser: msg.AsUser,
Timestamp: msg.Timestamp,
sess: msg.sess,
}
if noEcho {
data.SkipSid = msg.sess.sid
}
// Message sent: notify offline 'R' subscrbers on 'me'.
t.presSubsOffline("msg", &presParams{seqID: t.lastID, actor: msg.AsUser},
&presFilters{filterIn: types.ModeRead}, nilPresFilters, "", true)
// Tell the plugins that a message was accepted for delivery
pluginMessage(data.Data, plgActCreate)
t.broadcastToSessions(data)
// sendPush will update unread message count and send push notification.
if pushRcpt := t.pushForData(asUid, data.Data, markedReadBySender); pushRcpt != nil {
sendPush(pushRcpt)
}
return nil
}
// handlePubBroadcast fans out {pub} -> {data} messages to recipients in a master topic.
// This is a NON-proxy broadcast.
func (t *Topic) handlePubBroadcast(msg *ClientComMessage) {
asUid := types.ParseUserId(msg.AsUser)
if t.isInactive() {
// Ignore broadcast - topic is paused or being deleted.
msg.sess.queueOut(ErrLocked(msg.Id, t.original(asUid), msg.Timestamp))
return
}
if t.isReadOnly() {
msg.sess.queueOut(ErrPermissionDenied(msg.Id, t.original(asUid), msg.Timestamp))
return
}
isCall := msg.Pub.Head != nil && msg.Pub.Head["webrtc"] != nil
if isCall {
if len(globals.iceServers) == 0 {
msg.sess.queueOut(ErrNotImplementedReply(msg, types.TimeNow()))
return
}
if t.cat != types.TopicCatP2P {
msg.sess.queueOut(ErrPermissionDeniedReply(msg, types.TimeNow()))
return
}
if t.currentCall != nil {
msg.sess.queueOut(ErrCallBusyReply(msg, types.TimeNow()))
return
}
}
// Save to DB at master topic.
var attachments []string
if msg.Extra != nil && len(msg.Extra.Attachments) > 0 {
attachments = msg.Extra.Attachments
}
if err := t.saveAndBroadcastMessage(msg, asUid, msg.Pub.NoEcho, attachments, msg.Pub.Head, msg.Pub.Content); err != nil {
logs.Err.Printf("topic[%s]: failed to save messagge - %s", t.name, err)
return
}
if isCall {
t.handleCallInvite(msg, asUid)
}
}
// handleNoteBroadcast fans out {note} -> {info} messages to recipients in a master topic.
// This is a NON-proxy broadcast (at master topic).
func (t *Topic) handleNoteBroadcast(msg *ClientComMessage) {
if t.isInactive() {
// Ignore broadcast - topic is paused or being deleted.
return
}
if msg.Note.SeqId > t.lastID {
// Drop bogus read notification
return
}
asChan, err := t.verifyChannelAccess(msg.Original)
if err != nil {
// Silently drop invalid notification.
return
}
asUid := types.ParseUserId(msg.AsUser)
pud := t.perUser[asUid]
mode := pud.modeGiven & pud.modeWant
if pud.deleted {
mode = types.ModeInvalid
}
switch msg.Note.What {
case "kp", "kpa", "kpv":
// Filter out "kp*" from users with no 'W' permission (or people without a subscription).
if !mode.IsWriter() || t.isReadOnly() {
return
}
case "read", "recv":
// Filter out "read/recv" from users with no 'R' permission (or people without a subscription).
if !mode.IsReader() {
return
}
case "call":
// Handle calls separately.
t.handleCallEvent(msg)
return
}
var read, recv, unread, seq int
switch msg.Note.What {
case "read":
if msg.Note.SeqId <= pud.readID {
// No need to report stale or bogus read status.
return
}
// The number of unread messages has decreased, negative value.
unread = pud.readID - msg.Note.SeqId
pud.readID = msg.Note.SeqId
if pud.readID > pud.recvID {
pud.recvID = pud.readID
}
read = pud.readID
seq = read
case "recv":
if msg.Note.SeqId <= pud.recvID {
// Stale or bogus recv status.
return
}
pud.recvID = msg.Note.SeqId
if pud.readID > pud.recvID {
pud.recvID = pud.readID
}
recv = pud.recvID
seq = recv
}
if seq > 0 {
topicName := t.name
if asChan {
topicName = msg.Note.Topic
}
upd := map[string]any{}
if recv > 0 {
upd["RecvSeqId"] = recv
}
if read > 0 {
upd["ReadSeqId"] = read
}
if err := store.Subs.Update(topicName, asUid, upd); err != nil {
logs.Warn.Printf("topic[%s]: failed to update SeqRead/Recv counter: %v", t.name, err)
return
}
// Read/recv updated: notify user's other sessions of the change
t.presPubMessageCount(asUid, mode, read, recv, msg.sess.sid)
if read > 0 {
// Send push notification to other user devices.
sendPush(t.pushForReadRcpt(asUid, read, msg.Timestamp))
}
// Update cached count of unread messages (not tracking unread messages fror channels).
if !asChan {
usersUpdateUnread(asUid, unread, true)
}
}
if asChan {
// No need to forward {note} to other subscribers in channels
return
}
if seq > 0 {
t.perUser[asUid] = pud
}
// Read/recv/kp: notify users offline in the topic on their 'me'.
t.infoSubsOffline(asUid, msg.Note.What, seq, msg.sess.sid)
info := &ServerComMessage{
Info: &MsgServerInfo{
Topic: msg.Original,
From: msg.AsUser,
What: msg.Note.What,
SeqId: msg.Note.SeqId,
},
RcptTo: msg.RcptTo,
AsUser: msg.AsUser,
Timestamp: msg.Timestamp,
SkipSid: msg.sess.sid,
sess: msg.sess,
}
t.broadcastToSessions(info)
}
// handlePresence fans out {pres} messages to recipients in topic.
func (t *Topic) handlePresence(msg *ServerComMessage) {
what := t.procPresReq(msg.Pres.Src, msg.Pres.What, msg.Pres.WantReply)
if t.xoriginal != msg.Pres.Topic || what == "" {
// This is just a request for status, don't forward it to sessions
return
}
// "what" may have changed, i.e. unset or "+command" removed ("on+en" -> "on")
msg.Pres.What = what
t.broadcastToSessions(msg)
}
// broadcastToSessions writes message to attached sessions.
func (t *Topic) broadcastToSessions(msg *ServerComMessage) {
// List of sessions to be dropped.
var dropSessions []*Session
// Broadcast the message. Only {data}, {pres}, {info} are broadcastable.
// {meta} and {ctrl} are sent to the session only
for sess, pssd := range t.sessions {
// Send all messages to multiplexing session.
if !sess.isMultiplex() {
if sess.sid == msg.SkipSid {
continue
}
if msg.Pres != nil {
// Skip notifying - already notified on topic.
if msg.Pres.SkipTopic != "" && sess.getSub(msg.Pres.SkipTopic) != nil {
continue
}
// Notification addressed to a single user only.
if msg.Pres.SingleUser != "" && pssd.uid.UserId() != msg.Pres.SingleUser {
continue
}
// Notification should skip a single user.
if msg.Pres.ExcludeUser != "" && pssd.uid.UserId() == msg.Pres.ExcludeUser {
continue
}
// Check presence filters
if !t.passesPresenceFilters(msg.Pres, pssd.uid) {
continue
}
} else {
if msg.Info != nil {
// Don't forward read receipts and key presses to channel readers and those without the R permission.
// OK to forward with Src != "" because it's sent from another topic to 'me', permissions already
// checked there.
if msg.Info.Src == "" && (pssd.isChanSub || !t.userIsReader(pssd.uid)) {
continue
}
// Skip notifying - already notified on topic.
if msg.Info.SkipTopic != "" && sess.getSub(msg.Info.SkipTopic) != nil {
continue
}
// Don't send key presses from one user's session to the other sessions of the same user.
if msg.Info.What == "kp" && msg.Info.From == pssd.uid.UserId() {
continue
}
} else if !t.userIsReader(pssd.uid) && !pssd.isChanSub {
// Skip {data} if the user has no Read permission and not a channel reader.
continue
}
}
} else if pssd.isChanSub && types.IsChannel(sess.sid) {
// If it's a chnX multiplexing session, check if there's a corresponding
// grpX multiplexing session as we don't want to send the message to both.
grpSid := types.ChnToGrp(sess.sid)
if grpSess := globals.sessionStore.Get(grpSid); grpSess != nil && grpSess.isMultiplex() {
// If grpX multiplexing session's attached to topic, skip this chnX session
// (message will be routed to the topic proxy via the grpX session).
if _, attached := t.sessions[grpSess]; attached {
continue
}
}
}
// Make a copy of msg since messages sent to sessions differ.
msgCopy := msg.copy()
// Topic name may be different depending on the user to which the `sess` belongs.
t.prepareBroadcastableMessage(msgCopy, pssd.uid, pssd.isChanSub)
// Send message to session.
if !sess.queueOut(msgCopy) {
logs.Warn.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid)
dropSessions = append(dropSessions, sess)
}
}
// Drop "bad" sessions.
for _, sess := range dropSessions {
// The whole session is being dropped, so ClientComMessage.init is false.
// keep redundant init: false so it can be searched for.
t.unregisterSession(&ClientComMessage{sess: sess, init: false})
}
}
// subscriptionReply generates a response to a subscription request
func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error {
// The topic is already initialized by the Hub
msgsub := msg.Sub
// For newly created topics report topic creation time.
var now time.Time
if msgsub.Created {
now = t.updated
} else {
now = types.TimeNow()
}
asUid := types.ParseUserId(msg.AsUser)
if !msgsub.Newsub && (t.cat == types.TopicCatP2P || t.cat == types.TopicCatGrp) {
// Check if this is a new subscription (P2P & GRP only. SLF, SYS are excluded here).
pud, found := t.perUser[asUid]
msgsub.Newsub = !found || pud.deleted
}
var private any
var mode string
if msgsub.Set != nil {
if msgsub.Set.Sub != nil {
if msgsub.Set.Sub.User != "" {
msg.sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("user id must not be specified")
}
mode = msgsub.Set.Sub.Mode
}
if msgsub.Set.Desc != nil {
private = msgsub.Set.Desc.Private
}
}
var err error
var modeChanged *MsgAccessMode
// Create new subscription or modify an existing one.
if modeChanged, err = t.thisUserSub(msg.sess, msg, asUid, asChan, mode, private); err != nil {
return err
}
hasJoined := true
if modeChanged != nil {
if acs, err := types.ParseAcs([]byte(modeChanged.Mode)); err == nil {
hasJoined = acs.IsJoiner()
}
}
if hasJoined {
// Subscription successfully created. Link topic to session.
msg.sess.addSub(t.name, &Subscription{
broadcast: t.clientMsg,
done: t.unreg,
meta: t.meta,
supd: t.supd,
})
t.addSession(msg.sess, asUid, asChan)
// The user is online in the topic. Increment the counter if notifications are not deferred.
if !msg.sess.background {
userData := t.perUser[asUid]
userData.online++
t.perUser[asUid] = userData
}
if t.cat == types.TopicCatGrp && msgsub.Newsub {
// Increment subscriber count for new group subscriptions only.
t.subCnt++
}
}
params := map[string]any{}
// Report back the assigned access mode.
if modeChanged != nil {
params["acs"] = modeChanged
}
toriginal := t.original(asUid)
// When a group topic is created, it's given a temporary name by the client.
// Then this name changes. Report back the original name here.
if msgsub.Created && msg.Original != toriginal {
params["tmpname"] = msg.Original
// The new123ABC name is no longer useful after this.
msg.Original = toriginal
}
if len(params) == 0 {
// Don't send empty params '{}'
msg.sess.queueOut(NoErr(msg.Id, toriginal, now))
} else {
msg.sess.queueOut(NoErrParams(msg.Id, toriginal, now, params))
}
// Some notifications are always sent immediately.
if modeChanged != nil {
t.sendImmediateSubNotifications(asUid, modeChanged, msg, now)
}
if !msg.sess.background && hasJoined {
// Other notifications are also sent immediately for foreground sessions.
t.sendSubNotifications(asUid, msg.sess.sid, msg.sess.userAgent)
}
return nil
}
// User requests or updates a self-subscription to a topic. Called as a
// result of {sub} or {meta set=sub}.
// Returns new access mode as *MsgAccessMode if user's access mode has changed, nil otherwise.
//
// sess - originating session
// pkt - client message which triggered this request; {sub} or {set}
// asUid - id of the user making the request
// asChan - true if the user is subscribing to a channel topic
// want - requested access mode
// private - private value to assign to the subscription
// background - presence notifications are deferred
//
// Handle these cases:
// A. User is trying to subscribe for the first time (no subscription).
// A.1 Normal user is subscribing to the topic.
// A.2 Reader is joining the channel.
// B. User is already subscribed, just joining without changing anything.
// C. User is responding to an earlier invite (modeWant was "N" in subscription).
// D. User is already subscribed, changing modeWant.
// E. User is accepting ownership transfer (requesting ownership transfer is not permitted).
// In case of a group topic the user may be a reader or a full subscriber.
func (t *Topic) thisUserSub(sess *Session, pkt *ClientComMessage, asUid types.Uid, asChan bool, want string,
private any) (*MsgAccessMode, error) {
now := types.TimeNow()
asLvl := auth.Level(pkt.AuthLvl)
// Access mode values as they were before this request was processed.
oldWant := types.ModeNone
oldGiven := types.ModeNone
// Parse access mode requested by the user
modeWant := types.ModeUnset
if want != "" {
if err := modeWant.UnmarshalText([]byte(want)); err != nil {
sess.queueOut(ErrMalformedReply(pkt, now))
return nil, err
}
}
var err error
// Check if it's an attempt at a new subscription to the topic / a first connection of a channel reader
// (channel readers are not permanently cached).
// It could be an actual subscription (IsJoiner() == true) or a ban (IsJoiner() == false).
userData, existingSub := t.perUser[asUid]
if !existingSub || userData.deleted {
// New subscription or a not yet cached channel reader, either new or existing.
// Check if the max number of subscriptions is already reached.
if t.cat == types.TopicCatGrp && !asChan && t.subsCount() >= globals.maxSubscriberCount {
sess.queueOut(ErrPolicyReply(pkt, now))
return nil, errors.New("max subscription count exceeded")
}
var sub *types.Subscription
tname := t.name
if t.cat == types.TopicCatP2P {
// P2P could be here only if it was previously deleted. I.e. existingSub is always true for P2P.
if modeWant != types.ModeUnset {
userData.modeWant = modeWant
}
// If no modeWant is provided, leave existing one unchanged.
// Make sure the user is not asking for unreasonable permissions
userData.modeWant = (userData.modeWant & globals.typesModeCP2P) | types.ModeApprove
} else if t.cat == types.TopicCatSys {
if asLvl != auth.LevelRoot {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("subscription to 'sys' topic requires root access level")
}
// Assign default access levels
userData.modeWant = types.ModeCSys
userData.modeGiven = types.ModeCSys
if modeWant != types.ModeUnset {
userData.modeWant = (modeWant & types.ModeCSys) | types.ModeWrite | types.ModeJoin
}
} else if asChan {
userData.isChan = true
// Check if user is already subscribed.
sub, err = store.Subs.Get(pkt.Original, asUid, false)
if err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
// Given mode is immutable.
oldGiven = types.ModeCChnReader
userData.modeGiven = types.ModeCChnReader
if sub != nil {
// Subscription exists, read old access mode.
oldWant = sub.ModeWant
} else {
// Subscription not found, use default.
oldWant = types.ModeCChnReader
}
if modeWant != types.ModeUnset {
// New access mode is explicitly assigned.
userData.modeWant = (modeWant & types.ModeCChnReader) | types.ModeRead | types.ModeJoin
} else {
// Default: unchanged.
userData.modeWant = oldWant
}
// User is subscribed to chnXXX, not grpXXX.
tname = pkt.Original
} else {
// All other topic types.
if !existingSub {
// Check if the user has been subscribed previously and if so, use previous modeGiven.
// Otherwise the user may delete subscription and resubscribe to avoid being blocked.
sub, err = store.Subs.Get(t.name, asUid, true)
if err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
if sub != nil {
userData.modeGiven = sub.ModeGiven
} else {
// If no mode was previously given, give default access.
userData.modeGiven = types.ModeUnset
}
}
if userData.modeGiven == types.ModeUnset {
// New user: default access.
userData.modeGiven = t.accessFor(asLvl)
}
if modeWant == types.ModeUnset {
// User wants default access mode.
userData.modeWant = t.accessFor(asLvl)
} else {
userData.modeWant = modeWant
}
}
// Reject new subscription: 'given' permissions have no 'J'.
if !userData.modeGiven.IsJoiner() {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("subscription rejected due to permissions")
}
// Undelete.
if userData.deleted {
userData.deleted = false
userData.delID, userData.readID, userData.recvID = 0, 0, 0
}
if isNullValue(private) {
private = nil
}
userData.private = private
// Add subscription to database, if missing.
if sub == nil || sub.DeletedAt != nil {
sub = &types.Subscription{
User: asUid.String(),
Topic: tname,
ModeWant: userData.modeWant,
ModeGiven: userData.modeGiven,
Private: userData.private,
}
if err := store.Subs.Create(sub); err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
} else if asChan && userData.modeWant != oldWant {
// Channel reader changed access mode, save changed mode to db.
if err := store.Subs.Update(tname, asUid,
map[string]any{"ModeWant": userData.modeWant}); err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
// Enable or disable fcm push notifications for the subsciption.
t.channelSubUnsub(asUid, userData.modeWant.IsPresencer())
}
if asChan {
if userData.modeWant != oldWant {
pluginSubscription(sub, plgActCreate)
} else {
pluginSubscription(sub, plgActUpd)
}
} else {
// Add subscribed user to cache.
usersRegisterUser(asUid, true)
// Notify plugins of a new subscription
pluginSubscription(sub, plgActCreate)
}
} else {
// Process update to existing subscription. It could be an incomplete subscription for a new topic.
if !userData.isChan && asChan {
// A normal subscriber is trying to access topic as a channel.
// Direct the subscriber to use non-channel topic name.
sess.queueOut(InfoUseOtherReply(pkt, t.name, now))
return nil, types.ErrNotFound
}
var ownerChange bool
// Save old access values
oldWant = userData.modeWant
oldGiven = userData.modeGiven
if modeWant != types.ModeUnset {
// Explicit modeWant is provided
// Make sure the current owner cannot unset the owner flag or ban himself.
if t.owner == asUid && (!modeWant.IsOwner() || !modeWant.IsJoiner()) {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("cannot unset ownership or self-ban the owner")
}
// Perform sanity checks
if userData.modeGiven.IsOwner() {
// Check for possible ownership transfer. Handle the following cases:
// 1. Acceptance or rejection of the ownership transfer
// 2. Owner changing own settings
// Ownership transfer
ownerChange = modeWant.IsOwner() && !userData.modeWant.IsOwner()
// The owner should be able to grant himself any access permissions.
if modeWant.IsOwner() && !userData.modeGiven.BetterEqual(modeWant) {
userData.modeGiven |= modeWant
}
} else if modeWant.IsOwner() {
// Ownership transfer can only be initiated by the owner.
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("non-owner cannot request ownership transfer")
} else if t.cat == types.TopicCatGrp && userData.modeGiven.IsAdmin() && modeWant.IsAdmin() {
// A group topic Admin should be able to grant himself any permissions except
// ownership (checked previously) & hard-deleting messages.
if !userData.modeGiven.BetterEqual(modeWant & ^types.ModeDelete) {
userData.modeGiven |= (modeWant & ^types.ModeDelete)
}
}
switch t.cat {
case types.TopicCatP2P:
// For P2P topics ignore requests exceeding the maximum allowed. Otherwise it will generate
// a useless announcement.
modeWant = (modeWant & globals.typesModeCP2P) | types.ModeApprove
case types.TopicCatSys:
// Anyone can always write to Sys topic.
modeWant &= (modeWant & types.ModeCSys) | types.ModeWrite
}
}
// If user has not requested a new access mode, provide one by default.
if modeWant == types.ModeUnset {
// If the user has self-banned before, un-self-ban. Otherwise do not make a change.
if !oldWant.IsJoiner() {
// Set permissions NO WORSE than default, but possibly better (admin or owner banned himself).
userData.modeWant = userData.modeGiven | t.accessFor(asLvl)
}
} else if userData.modeWant != modeWant {
// The user has provided a new modeWant and it' different from the one before
userData.modeWant = modeWant
}
// Create a subscription object to notify plugins.
sub := types.Subscription{
User: asUid.String(),
Topic: t.name,
}
// Save changes to DB
update := map[string]any{}
if isNullValue(private) {
update["Private"] = nil
userData.private = nil
sub.Private = private
} else if private != nil {
update["Private"] = private
userData.private = private
sub.Private = private
}
if userData.modeWant != oldWant {
update["ModeWant"] = userData.modeWant
sub.ModeWant = userData.modeWant
}
if userData.modeGiven != oldGiven {
update["ModeGiven"] = userData.modeGiven
sub.ModeGiven = userData.modeGiven
}
if len(update) > 0 {
if err := store.Subs.Update(t.name, asUid, update); err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
pluginSubscription(&sub, plgActUpd)
}
// No transactions in RethinkDB, but two owners are better than none
if ownerChange {
oldOwnerData := t.perUser[t.owner]
oldOwnerOldWant, oldOwnerOldGiven := oldOwnerData.modeWant, oldOwnerData.modeGiven
oldOwnerData.modeGiven = (oldOwnerData.modeGiven & ^types.ModeOwner)
oldOwnerData.modeWant = (oldOwnerData.modeWant & ^types.ModeOwner)
if err := store.Subs.Update(t.name, t.owner,
map[string]any{
"ModeWant": oldOwnerData.modeWant,
"ModeGiven": oldOwnerData.modeGiven,
}); err != nil {
return nil, err
}
if err := store.Topics.OwnerChange(t.name, asUid); err != nil {
return nil, err
}
t.perUser[t.owner] = oldOwnerData
// Send presence notifications.
t.notifySubChange(t.owner, asUid, false,
oldOwnerOldWant, oldOwnerOldGiven, oldOwnerData.modeWant, oldOwnerData.modeGiven, "")
t.owner = asUid
}
}
if !asChan {
// If topic is being muted, send "off" notification and disable updates.
// Do it before applying the new permissions.
if (oldWant & oldGiven).IsPresencer() && !(userData.modeWant & userData.modeGiven).IsPresencer() {
if t.cat == types.TopicCatMe {
t.presUsersOfInterest("off+dis", t.userAgent)
} else {
t.presSingleUserOffline(asUid, userData.modeWant&userData.modeGiven,
"off+dis", nilPresParams, "", false)
}
}
}
// Apply changes.
t.perUser[asUid] = userData
var modeChanged *MsgAccessMode
// Send presence notifications and update cached unread count.
if oldWant != userData.modeWant || oldGiven != userData.modeGiven {
if !asChan {
oldReader := (oldWant & oldGiven).IsReader()
newReader := (userData.modeWant & userData.modeGiven).IsReader()
if oldReader && !newReader {
// Decrement unread count
usersUpdateUnread(asUid, userData.readID-t.lastID, true)
} else if !oldReader && newReader {
// Increment unread count
usersUpdateUnread(asUid, t.lastID-userData.readID, true)
}
}
// Notify actor of the changes in access mode.
t.notifySubChange(asUid, asUid, asChan, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid)
}
if (pkt.Sub != nil && pkt.Sub.Newsub) || oldWant != userData.modeWant || oldGiven != userData.modeGiven {
modeChanged = &MsgAccessMode{
Want: userData.modeWant.String(),
Given: userData.modeGiven.String(),
Mode: (userData.modeGiven & userData.modeWant).String(),
}
}
if !userData.modeWant.IsJoiner() {
// The user is self-banning from the topic. Re-subscription will unban.
t.evictUser(asUid, false, "")
// The callee will send NoErrOK
return modeChanged, nil
}
if !userData.modeGiven.IsJoiner() {
// User was banned
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("topic access denied; user is banned")
}
return modeChanged, nil
}
// anotherUserSub processes a request to initiate an invite or approve a subscription request from another user.
// Returns changed == true if user's access mode has changed.
// Handle these cases:
// A. Sharer or Approver is inviting another user for the first time (no prior subscription)
// B. Sharer or Approver is re-inviting another user (adjusting modeGiven, modeWant is still Unset)
// C. Approver is changing modeGiven for another user, modeWant != Unset
func (t *Topic) anotherUserSub(sess *Session, asUid, target types.Uid, asChan bool,
pkt *ClientComMessage) (*MsgAccessMode, error) {
now := types.TimeNow()
set := pkt.Set
// Check if approver actually has permission to manage sharing
hostData, ok := t.perUser[asUid]
// Access mode of the person who is executing this approval process
hostMode := hostData.modeGiven & hostData.modeWant
if !ok || !hostMode.IsSharer() {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("topic access denied; approver has no permission")
}
if asChan {
// TODO: need to implement promoting reader to subscriber. Rejecting for now.
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("topic access denied: cannot subscribe reader to channel")
}
// Check if topic is suspended.
if t.isReadOnly() {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("topic is suspended")
}
// Parse the access mode granted
modeGiven := types.ModeUnset
if set.Sub.Mode != "" {
if err := modeGiven.UnmarshalText([]byte(set.Sub.Mode)); err != nil {
sess.queueOut(ErrMalformedReply(pkt, now))
return nil, err
}
// Make sure the new permissions are reasonable in P2P topics: permissions no greater than allowed,
// approver permission cannot be removed.
if t.cat == types.TopicCatP2P {
modeGiven = (modeGiven & globals.typesModeCP2P) | types.ModeApprove
}
}
// Make sure only the owner & approvers can set non-default access mode
if modeGiven != types.ModeUnset && !hostMode.IsAdmin() {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("sharer cannot set explicit modeGiven")
}
// Make sure no one but the owner can do an ownership transfer
if modeGiven.IsOwner() && t.owner != asUid {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("attempt to transfer ownership by non-owner")
}
// Access mode values as they were before this request was processed.
oldWant := types.ModeUnset
oldGiven := types.ModeUnset
// Check if it's a new invite. If so, save it to database as a subscription.
// Saved subscription does not mean the user is allowed to post/read
userData, existingSub := t.perUser[target]
if !existingSub || userData.deleted {
// Check if the max number of subscriptions is already reached.
if t.cat == types.TopicCatGrp && t.subsCount() >= globals.maxSubscriberCount {
sess.queueOut(ErrPolicyReply(pkt, now))
return nil, errors.New("max subscription count exceeded")
}
if modeGiven == types.ModeUnset {
// Request to use default access mode for the new subscriptions.
// Assuming LevelAuth. Approver should use non-default access if that is not suitable.
modeGiven = t.accessFor(auth.LevelAuth)
// Enable new subscription even if default is no joiner.
modeGiven |= types.ModeJoin
}
var modeWant types.AccessMode
// Check if the invitee has been subscribed previously and if so, use previous modeWant.
// Otherwise the inviter may delete blocked subscription and reinvite to spam the user.
sub, err := store.Subs.Get(t.name, target, true)
if err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
if sub != nil {
// Existing deleted subscription.
modeWant = sub.ModeWant
} else {
// Get user's default access mode to be used as modeWant
if user, err := store.Users.Get(target); err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
} else if user == nil {
sess.queueOut(ErrUserNotFoundReply(pkt, now))
return nil, errors.New("user not found")
} else if user.State != types.StateOK {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("user is suspended")
} else {
// Don't ask by default for more permissions than the granted ones.
modeWant = user.Access.Auth & modeGiven
}
}
// Reject invitation: 'want' permissions have no 'J'.
if !modeWant.IsJoiner() {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("invitation rejected due to permissions")
}
// Add subscription to database
sub = &types.Subscription{
User: target.String(),
Topic: t.name,
ModeWant: modeWant,
ModeGiven: modeGiven,
}
if err := store.Subs.Create(sub); err != nil {
sess.queueOut(ErrUnknownReply(pkt, now))
return nil, err
}
userData = perUserData{
modeGiven: sub.ModeGiven,
modeWant: sub.ModeWant,
private: nil,
}
t.perUser[target] = userData
t.computePerUserAcsUnion()
// Cache user's record
usersRegisterUser(target, true)
// Notify plugins of a new subscription.
pluginSubscription(sub, plgActCreate)
// Send push notification for the new subscription.
// TODO: maybe skip user's devices which were online when this event has happened.
sendPush(t.pushForP2PSub(asUid, target, userData.modeWant, userData.modeGiven, now))
} else {
// Action on an existing subscription: re-invite, change existing permission, confirm/decline request.
oldGiven = userData.modeGiven
oldWant = userData.modeWant
if modeGiven == types.ModeUnset {
// Request to re-send invite without changing the access mode
modeGiven = userData.modeGiven
} else if modeGiven != userData.modeGiven {
// Changing the previously assigned value.
// Cannot strip owner of ownership or ban the owner.
if t.owner == target && (!modeGiven.IsOwner() || !modeGiven.IsJoiner()) {
sess.queueOut(ErrPermissionDeniedReply(pkt, now))
return nil, errors.New("cannot stip ownership or ban the owner")
}
// Save changed value to database
if err := store.Subs.Update(t.name, target,
map[string]any{"ModeGiven": modeGiven}); err != nil {
return nil, err
}
userData.modeGiven = modeGiven
t.perUser[target] = userData
}
}
var modeChanged *MsgAccessMode
// Access mode has changed.
if oldGiven != userData.modeGiven {
oldReader := (oldWant & oldGiven).IsReader()
newReader := (userData.modeWant & userData.modeGiven).IsReader()
if oldReader && !newReader {
// Decrement unread count
usersUpdateUnread(target, userData.readID-t.lastID, true)
} else if !oldReader && newReader {
// Increment unread count
usersUpdateUnread(target, t.lastID-userData.readID, true)
}
t.notifySubChange(target, asUid, false,
oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid)
modeChanged = &MsgAccessMode{
Given: userData.modeGiven.String(),
Want: userData.modeWant.String(),
Mode: (userData.modeGiven & userData.modeWant).String(),
}
}
if !userData.modeGiven.IsJoiner() {
// The user is banned from the topic.
t.evictUser(target, false, "")
}
return modeChanged, nil
}
// replyGetDesc is a response to a get.desc request on a topic, sent to just the session as a {meta} packet
func (t *Topic) replyGetDesc(sess *Session, asUid types.Uid, _ bool, opts *MsgGetOpts, msg *ClientComMessage) error {
now := types.TimeNow()
id := msg.Id
if opts != nil && (opts.User != "" || opts.Limit != 0) {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("invalid GetDesc query")
}
// Check if user requested modified data
ifUpdated := opts == nil || opts.IfModifiedSince == nil || opts.IfModifiedSince.Before(t.updated)
desc := &MsgTopicDesc{}
if opts == nil || opts.IfModifiedSince == nil {
// Send CreatedAt only when the user requests full information (nothing is cached at the client).
desc.CreatedAt = &t.created
}
if !t.updated.IsZero() {
desc.UpdatedAt = &t.updated
}
pud, full := t.perUser[asUid]
full = full || t.cat == types.TopicCatMe
if t.cat == types.TopicCatGrp {
desc.IsChan = t.isChan
desc.SubCnt = t.subCnt
logs.Info.Println("replyGetDesc: grp topic", t.name, "subs", t.subCnt)
}
if ifUpdated {
if t.public != nil || t.trusted != nil {
// Not a p2p topic.
desc.Public = t.public
desc.Trusted = t.trusted
} else if full && t.cat == types.TopicCatP2P {
// FIXME: when a P2P participant updates desc at 'me', these cached values are not updated.
desc.Public = pud.public
desc.Trusted = pud.trusted
}
}
// Request may come from a subscriber (full == true) or a stranger.
// Give subscriber a fuller description than to a stranger/channel reader.
if full {
if t.cat == types.TopicCatP2P {
// For p2p topics default access mode makes no sense: only participants have access to topic.
// Don't report it.
} else if t.cat == types.TopicCatMe || (pud.modeGiven & pud.modeWant).IsSharer() {
desc.DefaultAcs = &MsgDefaultAcsMode{
Auth: t.accessAuth.String(),
Anon: t.accessAnon.String(),
}
}
desc.Acs = &MsgAccessMode{
Want: pud.modeWant.String(),
Given: pud.modeGiven.String(),
Mode: (pud.modeGiven & pud.modeWant).String(),
}
if t.cat == types.TopicCatMe && sess.authLvl == auth.LevelRoot {
// If 'me' is in memory then user account is invariably not suspended.
desc.State = types.StateOK.String()
}
if (pud.modeGiven & pud.modeWant).IsPresencer() {
switch t.cat {
case types.TopicCatGrp:
desc.Online = t.isOnline()
case types.TopicCatP2P:
// This is the timestamp when the other user logged off last time.
// It does not change while the topic is loaded into memory and that's OK most of the time
// because to stay in memory at least one of the users must be connected to topic.
// FIXME(gene): it breaks when user A stays active in one session and connects-disconnects
// from another session. The second session will not see correct LastSeen time and UserAgent.
if pud.lastSeen != nil {
desc.LastSeen = &MsgLastSeenInfo{
When: pud.lastSeen,
UserAgent: pud.lastUA,
}
}
}
}
if ifUpdated {
desc.Private = pud.private
}
// Don't report message IDs to users without Read access.
if (pud.modeGiven & pud.modeWant).IsReader() {
desc.SeqId = t.lastID
if !t.touched.IsZero() {
desc.TouchedAt = &t.touched
}
// Make sure reported values are sane:
// t.delID <= pud.delID; t.readID <= t.recvID <= t.lastID
desc.DelId = max(pud.delID, t.delID)
desc.ReadSeqId = pud.readID
desc.RecvSeqId = max(pud.recvID, pud.readID)
} else {
// Send some sane value of touched.
desc.TouchedAt = &t.updated
}
}
sess.queueOut(&ServerComMessage{
Meta: &MsgServerMeta{
Id: id,
Topic: msg.Original,
Desc: desc,
Timestamp: &now,
},
})
return nil
}
// replySetDesc updates topic metadata, saves it to DB, replies to the caller as {ctrl} message,
// generates {pres} update if necessary.
func (t *Topic) replySetDesc(sess *Session, asUid types.Uid, asChan bool,
authLevel auth.Level, msg *ClientComMessage) error {
now := types.TimeNow()
assignAccess := func(upd map[string]any, mode *MsgDefaultAcsMode) error {
if mode == nil {
return nil
}
if auth, anon, err := parseTopicAccess(mode, types.ModeUnset, types.ModeUnset); err != nil {
return err
} else if auth.IsOwner() || anon.IsOwner() {
return errors.New("default 'owner' access is not permitted")
} else {
access := types.DefaultAccess{Auth: t.accessAuth, Anon: t.accessAnon}
if auth != types.ModeUnset {
if t.cat == types.TopicCatMe {
auth &= types.ModeCAuth
if auth != types.ModeNone {
// This is the default access mode for P2P topics.
// It must be either an N or must include an A permission.
auth |= types.ModeApprove
}
}
access.Auth = auth
}
if anon != types.ModeUnset {
if t.cat == types.TopicCatMe {
anon &= globals.typesModeCP2P
if anon != types.ModeNone {
anon |= types.ModeApprove
}
}
access.Anon = anon
}
if access.Auth != t.accessAuth || access.Anon != t.accessAnon {
upd["Access"] = access
}
}
return nil
}
assignGenericValues := func(upd map[string]any, what string, dst, src any) (changed bool) {
if dst, changed = mergeInterfaces(dst, src); changed {
upd[what] = dst
}
return
}
// DefaultAccess and/or Public have chanegd
var sendCommon bool
// Private has changed
var sendPriv bool
var err error
// Change to the main object (user or topic).
core := make(map[string]any)
// Change to subscription.
sub := make(map[string]any)
if set := msg.Set; set.Desc != nil {
if set.Desc.Trusted != nil && authLevel != auth.LevelRoot {
// Only ROOT can change Trusted.
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("attempt to change Trusted by non-root")
}
switch t.cat {
case types.TopicCatMe:
// Update current user
err = assignAccess(core, set.Desc.DefaultAcs)
sendCommon = assignGenericValues(core, "Public", t.public, set.Desc.Public)
sendCommon = assignGenericValues(core, "Trusted", t.trusted, set.Desc.Trusted) || sendCommon
case types.TopicCatFnd:
// set.Desc.DefaultAcs is ignored.
if set.Desc.Trusted != nil {
// 'fnd' does not support Trusted.
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("attempt to assign Trusted in fnd topic")
}
// Do not send presence if fnd.Public has changed.
assignGenericValues(core, "Public", t.fndGetPublic(sess), set.Desc.Public)
case types.TopicCatP2P:
// Reject direct changes to P2P topics.
if set.Desc.Public != nil || set.Desc.Trusted != nil || set.Desc.DefaultAcs != nil {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("incorrect attempt to change metadata of a p2p topic")
}
case types.TopicCatGrp:
// Update group topic
if t.owner == asUid {
err = assignAccess(core, set.Desc.DefaultAcs)
sendCommon = assignGenericValues(core, "Public", t.public, set.Desc.Public)
sendCommon = assignGenericValues(core, "Trusted", t.trusted, set.Desc.Trusted) || sendCommon
} else if set.Desc.DefaultAcs != nil || set.Desc.Public != nil || set.Desc.Trusted != nil {
// This is a request from non-owner
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("attempt to change public or permissions by non-owner")
}
}
if err != nil {
sess.queueOut(ErrMalformedReply(msg, now))
return err
}
sendPriv = assignGenericValues(sub, "Private", t.perUser[asUid].private, set.Desc.Private)
}
if len(core)+len(sub) == 0 {
sess.queueOut(InfoNotModifiedReply(msg, now))
return errors.New("{set} generated no update to DB")
}
if len(core) > 0 {
core["UpdatedAt"] = now
switch t.cat {
case types.TopicCatMe:
err = store.Users.Update(asUid, core)
case types.TopicCatFnd:
// The only value to be stored in topic is Public, and Public for fnd is not saved according to specs.
default:
err = store.Topics.Update(t.name, core)
}
}
if err == nil && len(sub) > 0 {
tname := t.name
if asChan {
tname = types.GrpToChn(tname)
}
err = store.Subs.Update(tname, asUid, sub)
}
if err != nil {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
if len(core) > 0 && msg.Extra != nil && len(msg.Extra.Attachments) > 0 {
if err := store.Files.LinkAttachments(t.name, types.ZeroUid, msg.Extra.Attachments); err != nil {
logs.Warn.Printf("topic[%s] failed to link avatar attachment: %v", t.name, err)
// This is not a critical error, continue execution.
}
}
// Update values cached in the topic object
switch t.cat {
case types.TopicCatMe, types.TopicCatGrp:
if tmp, ok := core["Access"]; ok {
access := tmp.(types.DefaultAccess)
t.accessAuth = access.Auth
t.accessAnon = access.Anon
}
if public, ok := core["Public"]; ok {
t.public = public
}
if trusted, ok := core["Trusted"]; ok {
t.trusted = trusted
}
case types.TopicCatFnd:
// Assign per-session fnd.Public.
t.fndSetPublic(sess, core["Public"])
}
pud := t.perUser[asUid]
mode := pud.modeGiven & pud.modeWant
if private, ok := sub["Private"]; ok {
pud.private = private
t.perUser[asUid] = pud
}
if sendCommon || sendPriv {
// t.public/t.trusted, t.accessAuth/Anon have changed, make an announcement
if sendCommon {
if t.cat == types.TopicCatMe {
t.presUsersOfInterest("upd", "")
} else {
// Notify all subscribers on 'me' except the user who made the change and blocked users.
// The user who made the change will be notified separately (see below).
filter := &presFilters{excludeUser: asUid.UserId(), filterIn: types.ModeJoin}
t.presSubsOffline("upd", nilPresParams, filter, filter, sess.sid, false)
}
t.updated = now
}
// Notify user's other sessions.
t.presSingleUserOffline(asUid, mode, "upd", nilPresParams, sess.sid, false)
}
sess.queueOut(NoErrReply(msg, now))
return nil
}
// replyGetSub is a response to a get.sub request on a topic - load a list of subscriptions/subscribers,
// send it just to the session as a {meta} packet
func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level, asChan bool, msg *ClientComMessage) error {
now := types.TimeNow()
id := msg.Id
incomingReqTs := msg.Timestamp
var req *MsgGetOpts
if msg.Sub != nil {
req = msg.Sub.Get.Sub
} else {
req = msg.Get.Sub
}
if req != nil && (req.SinceId != 0 || req.BeforeId != 0) {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("invalid MsgGetOpts query")
}
var err error
var ifModified time.Time
if req != nil && req.IfModifiedSince != nil {
ifModified = *req.IfModifiedSince
}
userData := t.perUser[asUid]
var subs []types.Subscription
switch t.cat {
case types.TopicCatMe:
if req != nil {
// If topic is provided, it could be in the form of user ID 'usrAbCd'.
// Convert it to P2P topic name. Likewise for Self topic 'slf' -> 'slfAbcD'.
if uid2 := types.ParseUserId(req.Topic); !uid2.IsZero() {
req.Topic = uid2.P2PName(asUid)
}
if req.Topic == "slf" {
req.Topic = asUid.SlfName()
}
}
// Fetch user's subscriptions, with Topic.Public+Topic.Trusted denormalized into subscription.
if ifModified.IsZero() {
// No cache management. Skip deleted subscriptions.
subs, err = store.Users.GetTopics(asUid, msgOpts2storeOpts(req))
} else {
// User manages cache. Include deleted subscriptions too.
subs, err = store.Users.GetTopicsAny(asUid, msgOpts2storeOpts(req))
// Returned subscriptions do not contain topics which are online now but otherwise unchanged.
// We need to add these topic to the list otherwise the user would see them as offline.
selected := map[string]struct{}{}
for i := range subs {
sub := &subs[i]
with := sub.GetWith()
if with != "" {
selected[with] = struct{}{}
} else {
selected[sub.Topic] = struct{}{}
}
}
// Add dummy subscriptions for missing online topics.
for topic, psd := range t.perSubs {
_, present := selected[topic]
if !present && psd.online {
sub := types.Subscription{Topic: topic}
sub.SetWith(topic)
sub.SetDummy(true)
subs = append(subs, sub)
}
}
}
case types.TopicCatFnd:
// Select public or private query. Public is set interactively and has priority.
query := t.fndGetPublic(sess)
if query == "" {
query, _ = userData.private.(string)
}
// Empty queries are ignored with "NoContent".
if query != "" {
query, subs, err = pluginFind(asUid, query)
if err == nil && subs == nil && query != "" {
if and, opt, err := parseSearchQuery(query); err == nil {
var req [][]string
for _, tag := range and {
rewritten := rewriteTag(tag, sess.countryCode)
if len(rewritten) > 0 {
req = append(req, rewritten)
}
}
opt = rewriteTagSlice(opt, sess.countryCode)
// Check if the query contains terms that the user is not allowed to use.
if restr, _, _ := stringSliceDelta(t.tags,
filterTags(append(types.FlattenDoubleSlice(req), opt...), globals.maskedTagNS)); len(restr) > 0 {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("attempt to search by restricted tags")
}
// Ordinary users: find only active topics and accounts.
// Root users: find all topics and accounts, including suspended and soft-deleted.
subs, err = store.Users.FindSubs(asUid, globals.aliasTagNS, req, opt, sess.authLvl != auth.LevelRoot)
if err != nil {
sess.queueOut(decodeStoreErrorExplicitTs(err, id, msg.Original, now, incomingReqTs, nil))
return err
}
}
}
}
case types.TopicCatP2P:
// TODO(gene): don't load subs from DB, use perUserData - it already contains subscriptions.
// No need to load Public for p2p topics.
if ifModified.IsZero() {
// No cache management. Skip deleted subscriptions.
subs, err = store.Topics.GetSubs(t.name, msgOpts2storeOpts(req))
} else {
// User manages cache. Include deleted subscriptions too.
subs, err = store.Topics.GetSubsAny(t.name, msgOpts2storeOpts(req))
}
case types.TopicCatGrp:
topicName := t.name
if asChan {
// In case of a channel allow fetching the subscription of the current user only.
if req == nil {
req = &MsgGetOpts{}
}
req.User = asUid.UserId()
// Channel subscribers are using chnXXX topic name rather than grpXXX.
topicName = msg.Original
}
// Include sub.Public.
if ifModified.IsZero() {
// No cache management. Skip deleted subscriptions.
subs, err = store.Topics.GetUsers(topicName, msgOpts2storeOpts(req))
} else {
// User manages cache. Include deleted subscriptions too.
subs, err = store.Topics.GetUsersAny(topicName, msgOpts2storeOpts(req))
}
// Do nothing for all other topic types, like 'sys', 'slf'.
}
if err != nil {
sess.queueOut(decodeStoreErrorExplicitTs(err, id, msg.Original, now, incomingReqTs, nil))
return err
}
if len(subs) == 0 {
// Inform the client that there are no subscriptions.
sess.queueOut(NoContentParamsReply(msg, now, map[string]any{"what": "sub"}))
return nil
}
meta := &MsgServerMeta{
Id: id,
Topic: msg.Original,
Sub: make([]MsgTopicSub, 0, len(subs)),
Timestamp: &now}
presencer := (userData.modeGiven & userData.modeWant).IsPresencer()
sharer := (userData.modeGiven & userData.modeWant).IsSharer()
for i := range subs {
sub := &subs[i]
// Indicator if the requester has provided a cut off date for ts of pub & priv updates.
var sendPubPriv bool
var banned bool
var mts MsgTopicSub
deleted := sub.DeletedAt != nil
if ifModified.IsZero() {
sendPubPriv = true
} else {
// Skip sending deleted subscriptions if they were deleted before the cut off date.
// If they are freshly deleted send minimum info
if deleted {
if !sub.DeletedAt.After(ifModified) {
continue
}
mts.DeletedAt = sub.DeletedAt
}
sendPubPriv = !deleted && sub.UpdatedAt.After(ifModified)
}
uid := types.ParseUid(sub.User)
subMode := sub.ModeGiven & sub.ModeWant
isReader := subMode.IsReader()
if t.cat == types.TopicCatMe {
// Mark subscriptions that the user does not care about.
if !subMode.IsJoiner() {
banned = true
}
// Reporting user's subscriptions to other topics. P2P topic name is the
// UID of the other user.
with := sub.GetWith()
if with != "" {
mts.Topic = with
mts.Online = t.perSubs[with].online && !deleted && presencer
} else if strings.HasPrefix(sub.Topic, "slf") {
mts.Topic = "slf"
// Not reporting Online as it makes no sense for slf.
} else {
mts.Topic = sub.Topic
mts.Online = t.perSubs[sub.Topic].online && !deleted && presencer
}
if !deleted && !banned {
if isReader {
touchedAt := sub.GetTouchedAt()
if touchedAt.IsZero() {
mts.TouchedAt = nil
} else {
mts.TouchedAt = &touchedAt
}
mts.SeqId = sub.GetSeqId()
mts.DelId = sub.DelId
} else if !sub.UpdatedAt.IsZero() {
mts.TouchedAt = &sub.UpdatedAt
}
lastSeen := sub.GetLastSeen()
if lastSeen != nil && !mts.Online {
mts.LastSeen = &MsgLastSeenInfo{
When: lastSeen,
UserAgent: sub.GetUserAgent(),
}
}
mts.SubCnt = sub.GetSubCnt()
}
} else {
// Mark subscriptions that the user does not care about.
if t.cat == types.TopicCatGrp && !subMode.IsJoiner() {
banned = true
}
// Reporting subscribers to fnd, a group or a p2p topic
mts.User = uid.UserId()
if t.cat == types.TopicCatFnd {
mts.Topic = sub.Topic
}
if !deleted {
if uid == asUid && isReader && !banned {
// Report deleted ID for own subscriptions only
mts.DelId = sub.DelId
}
if t.cat == types.TopicCatGrp {
pud := t.perUser[uid]
mts.Online = pud.online > 0 && presencer
}
}
}
if !deleted {
if !sub.UpdatedAt.IsZero() {
mts.UpdatedAt = &sub.UpdatedAt
}
if isReader && !banned {
mts.ReadSeqId = sub.ReadSeqId
mts.RecvSeqId = sub.RecvSeqId
}
if t.cat != types.TopicCatFnd {
// p2p and grp
if !sub.IsDummy() && (sharer || uid == asUid || subMode.IsAdmin()) {
// If user is not a sharer, the access mode of other ordinary users if not accessible.
// Own and admin permissions only are visible to non-sharers.
mts.Acs.Mode = subMode.String()
mts.Acs.Want = sub.ModeWant.String()
mts.Acs.Given = sub.ModeGiven.String()
}
} else {
// Topic 'fnd'
// sub.ModeXXX may be defined by the plugin.
if sub.ModeGiven.IsDefined() && sub.ModeWant.IsDefined() {
mts.Acs.Mode = subMode.String()
mts.Acs.Want = sub.ModeWant.String()
mts.Acs.Given = sub.ModeGiven.String()
} else if types.IsChannel(sub.Topic) {
mts.Acs.Mode = types.ModeCChnReader.String()
} else if defacs := sub.GetDefaultAccess(); defacs != nil {
switch authLevel {
case auth.LevelAnon:
mts.Acs.Mode = defacs.Anon.String()
case auth.LevelAuth, auth.LevelRoot:
mts.Acs.Mode = defacs.Auth.String()
}
}
mts.SubCnt = sub.GetSubCnt()
}
// Returning public and private only if they have changed since ifModified
if sendPubPriv {
// 'sub' has nil 'public'/'trusted' in P2P topics which is OK.
mts.Public = sub.GetPublic()
mts.Trusted = sub.GetTrusted()
// Reporting 'private' only if it's user's own subscription.
if uid == asUid {
mts.Private = sub.Private
}
}
// Always reporting 'private' for fnd topic.
if t.cat == types.TopicCatFnd {
mts.Private = sub.Private
}
}
meta.Sub = append(meta.Sub, mts)
}
sess.queueOut(&ServerComMessage{Meta: meta})
return nil
}
// replySetSub is a response to new subscription request or an update to a subscription {set.sub}:
// update topic metadata cache, save/update subs, reply to the caller as {ctrl} message,
// generate a presence notification, if appropriate.
func (t *Topic) replySetSub(sess *Session, pkt *ClientComMessage, asChan bool) error {
now := types.TimeNow()
asUid := types.ParseUserId(pkt.AsUser)
set := pkt.Set
var target types.Uid
if target = types.ParseUserId(set.Sub.User); target.IsZero() && set.Sub.User != "" {
// Invalid user ID
sess.queueOut(ErrMalformedReply(pkt, now))
return errors.New("invalid user id")
}
// if set.User is not set, request is for the current user
if target.IsZero() {
target = asUid
}
var err error
var modeChanged *MsgAccessMode
if target == asUid {
// Request new subscription or modify own subscription
modeChanged, err = t.thisUserSub(sess, pkt, asUid, asChan, set.Sub.Mode, nil)
} else {
// Request to approve/change someone's subscription
modeChanged, err = t.anotherUserSub(sess, asUid, target, asChan, pkt)
}
if err != nil {
return err
}
var resp *ServerComMessage
if modeChanged != nil {
// Report resulting access mode.
params := map[string]any{"acs": modeChanged}
if target != asUid {
params["user"] = target.UserId()
}
resp = NoErrParamsReply(pkt, now, params)
} else {
resp = InfoNotModifiedReply(pkt, now)
}
sess.queueOut(resp)
return nil
}
// replyGetData is a response to a get.data request - load a list of stored messages, send them to session as {data}
// response goes to a single session rather than all sessions in a topic
func (t *Topic) replyGetData(sess *Session, asUid types.Uid, asChan bool, req *MsgGetOpts, msg *ClientComMessage) error {
now := types.TimeNow()
toriginal := t.original(asUid)
if req != nil && (req.IfModifiedSince != nil || req.User != "" || req.Topic != "") {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("invalid MsgGetOpts query")
}
// Check if the user has permission to read the topic data
count := 0
if userData := t.perUser[asUid]; (userData.modeGiven & userData.modeWant).IsReader() {
// Read messages from DB
messages, err := store.Messages.GetAll(t.name, asUid, msgOpts2storeOpts(req))
if err != nil {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
// Push the list of messages to the client as {data}.
if messages != nil {
count = len(messages)
if count > 0 {
outgoingMessages := make([]*ServerComMessage, count)
for i := range messages {
mm := &messages[i]
from := ""
if !asChan {
// Don't show sender for channel readers
from = types.ParseUid(mm.From).UserId()
}
outgoingMessages[i] = &ServerComMessage{
Data: &MsgServerData{
Topic: toriginal,
Head: mm.Head,
SeqId: mm.SeqId,
From: from,
Timestamp: mm.CreatedAt,
Content: mm.Content,
},
}
}
sess.queueOutBatch(outgoingMessages)
}
}
} else {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("attempt to get messages by non-reader")
}
// Inform the requester that all the data has been served.
if count == 0 {
sess.queueOut(NoContentParamsReply(msg, now, map[string]any{"what": "data"}))
} else {
sess.queueOut(NoErrDeliveredParams(msg.Id, msg.Original, now,
map[string]any{"what": "data", "count": count}))
}
return nil
}
// replyGetTags returns topics' tags - tokens used for discovery.
func (t *Topic) replyGetTags(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
now := types.TimeNow()
if t.cat == types.TopicCatFnd {
// Fnd: checking for alias availability.
// Checking public (session) data only.
if tag := t.fndGetPublic(sess); tag != "" {
var found string
tag, subs, err := pluginFind(asUid, tag)
if err == nil {
if subs == nil {
if prefix, _ := validateTag(tag); prefix != "" {
// Check only if a fully-qualified tag was sent. Otherwise ignore the request.
found, err = store.Users.FindOne(tag)
}
} else {
// The plugin returned a list of topics. Send the first one.
found = subs[0].Topic
}
}
if err != nil {
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Id, msg.Original, now, msg.Timestamp, nil))
return err
}
if found != "" {
sess.queueOut(&ServerComMessage{
Meta: &MsgServerMeta{
Id: msg.Id,
Topic: msg.Original,
Timestamp: &now,
Tags: []string{found},
},
})
return nil
}
}
// Inform the requester that there are no tags.
sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "tags"}))
return nil
}
if t.cat != types.TopicCatMe && t.cat != types.TopicCatGrp {
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("invalid topic category for getting tags")
}
if t.cat == types.TopicCatGrp && t.owner != asUid {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("request for tags from non-owner")
}
if len(t.tags) > 0 {
sess.queueOut(&ServerComMessage{
Meta: &MsgServerMeta{
Id: msg.Id,
Topic: t.original(asUid),
Timestamp: &now,
Tags: t.tags,
},
})
return nil
}
// Inform the requester that there are no tags.
sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "tags"}))
return nil
}
// replySetTags updates topic's tags - tokens used for discovery.
func (t *Topic) replySetTags(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
now := types.TimeNow()
if t.cat != types.TopicCatMe && t.cat != types.TopicCatGrp {
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("invalid topic category to assign tags")
}
if t.cat == types.TopicCatGrp && t.owner != asUid {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("tags update by non-owner")
}
tags := normalizeTags(msg.Set.Tags, globals.maxTagCount)
if len(tags) == 0 {
sess.queueOut(InfoNotModifiedReply(msg, now))
return nil
}
if !restrictedTagsEqual(t.tags, tags, globals.immutableTagNS) {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("attempt to mutate restricted tags")
}
if hasDuplicateNamespaceTags(tags, globals.aliasTagNS) {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("duplicate unique tags")
}
added, removed, _ := stringSliceDelta(t.tags, tags)
if t.cat == types.TopicCatMe && len(added) > 0 {
// User tags must all be prefixed. Users are not rearchable by generic tags.
var prefixed []string
for _, tag := range added {
if prefix, _ := validateTag(tag); prefix != "" {
prefixed = append(prefixed, prefix)
}
}
added = prefixed
}
if len(added) == 0 && len(removed) == 0 {
sess.queueOut(InfoNotModifiedReply(msg, now))
return nil
}
// Remove unprefixed tags
if unique := filterTags(added, map[string]bool{globals.aliasTagNS: true}); len(unique) > 0 {
// Check for global uniqueness.
// It's not inside a transaction, so a race may happen.
for _, tag := range unique {
result, err := store.Users.FindOne(tag)
if err != nil {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
if result != "" {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("globally duplicate unique tags")
}
}
}
update := map[string]any{"Tags": tags, "UpdatedAt": now}
var err error
switch t.cat {
case types.TopicCatMe:
err = store.Users.Update(asUid, update)
case types.TopicCatGrp:
err = store.Topics.Update(t.name, update)
}
if err != nil {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
t.tags = tags
t.presSubsOnline("tags", "", nilPresParams, &presFilters{singleUser: asUid.UserId()}, sess.sid)
params := make(map[string]any)
if len(added) > 0 {
params["added"] = len(added)
}
if len(removed) > 0 {
params["removed"] = len(removed)
}
sess.queueOut(NoErrParamsReply(msg, now, params))
return nil
}
// replyGetCreds returns user's credentials such as email and phone numbers.
func (t *Topic) replyGetCreds(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
now := types.TimeNow()
id := msg.Id
if t.cat != types.TopicCatMe {
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("invalid topic category for getting credentials")
}
screds, err := store.Users.GetAllCreds(asUid, "", false)
if err != nil {
sess.queueOut(decodeStoreErrorExplicitTs(err, id, msg.Original, now, msg.Timestamp, nil))
return err
}
if len(screds) > 0 {
creds := make([]*MsgCredServer, len(screds))
for i, sc := range screds {
creds[i] = &MsgCredServer{Method: sc.Method, Value: sc.Value, Done: sc.Done}
}
sess.queueOut(&ServerComMessage{
Meta: &MsgServerMeta{
Id: id,
Topic: t.original(asUid),
Timestamp: &now,
Cred: creds,
},
})
return nil
}
// Inform the requester that there are no credentials.
sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "creds"}))
return nil
}
// replySetCred adds or validates user credentials such as email and phone numbers.
func (t *Topic) replySetCred(sess *Session, asUid types.Uid, authLevel auth.Level, msg *ClientComMessage) error {
now := types.TimeNow()
set := msg.Set
if t.cat != types.TopicCatMe {
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("invalid topic category for updating credentials")
}
var err error
var tags []string
creds := []MsgCredClient{*set.Cred}
if set.Cred.Response != "" {
// Credential is being validated. Return an arror if response is invalid.
_, tags, err = validatedCreds(asUid, authLevel, creds, true)
} else {
// Credential is being added or updated.
tmpToken, _, _ := store.Store.GetLogicalAuthHandler("token").GenSecret(&auth.Rec{
Uid: asUid,
AuthLevel: auth.LevelNone,
Lifetime: auth.Duration(time.Hour * 24),
Features: auth.FeatureNoLogin,
})
_, tags, err = addCreds(asUid, creds, nil, sess.lang, tmpToken)
}
if tags != nil {
t.tags = tags
t.presSubsOnline("tags", "", nilPresParams, nilPresFilters, "")
}
sess.queueOut(decodeStoreErrorExplicitTs(err, set.Id, t.original(asUid), now, msg.Timestamp, nil))
return err
}
// replyGetAux returns topic's auxiliary set of key-value pairs.
func (t *Topic) replyGetAux(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
now := types.TimeNow()
if t.cat != types.TopicCatP2P && t.cat != types.TopicCatGrp && t.cat != types.TopicCatSlf {
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("invalid topic category to query aux")
}
if len(t.aux) > 0 {
sess.queueOut(&ServerComMessage{
Meta: &MsgServerMeta{
Id: msg.Id,
Topic: t.original(asUid),
Timestamp: &now,
Aux: t.aux,
},
})
return nil
}
// Inform the requester that there are no tags.
sess.queueOut(NoContentParamsReply(msg, now, map[string]string{"what": "aux"}))
return nil
}
// replyGetAux returns topic's auxiliary set of key-value pairs.
func (t *Topic) replySetAux(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
now := types.TimeNow()
if t.cat != types.TopicCatP2P && t.cat != types.TopicCatGrp && t.cat != types.TopicCatSlf {
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("invalid topic category to assign aux")
}
if userData := t.perUser[asUid]; !(userData.modeGiven & userData.modeWant).IsAdmin() {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("aux update by non-admin")
}
if aux, changed := mergeMaps(copyMap(t.aux), msg.Set.Aux); changed {
err := store.Topics.Update(t.name, map[string]any{"Aux": aux, "UpdatedAt": now})
if err == nil {
t.aux = aux
t.presSubsOnline("aux", "", nilPresParams, nilPresFilters, sess.sid)
}
sess.queueOut(decodeStoreErrorExplicitTs(err, msg.Set.Id, t.original(asUid), now, msg.Timestamp, nil))
return err
}
sess.queueOut(InfoNotModifiedReply(msg, now))
return nil
}
// replyGetDel is a response to a get[what=del] request: load a list of deleted message ids, send them to
// a session as {meta}
// response goes to a single session rather than all sessions in a topic
func (t *Topic) replyGetDel(sess *Session, asUid types.Uid, req *MsgGetOpts, msg *ClientComMessage) error {
now := types.TimeNow()
toriginal := t.original(asUid)
id := msg.Id
incomingReqTs := msg.Timestamp
if req != nil && (req.IfModifiedSince != nil || req.User != "" || req.Topic != "") {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("invalid MsgGetOpts query")
}
// Check if the user has permission to read the topic data and the request is valid.
if userData := t.perUser[asUid]; (userData.modeGiven & userData.modeWant).IsReader() {
ranges, delID, err := store.Messages.GetDeleted(t.name, asUid, msgOpts2storeOpts(req))
if err != nil {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
if len(ranges) > 0 {
sess.queueOut(&ServerComMessage{
Meta: &MsgServerMeta{
Id: id,
Topic: toriginal,
Del: &MsgDelValues{
DelId: delID,
DelSeq: rangeDeserialize(ranges),
},
Timestamp: &now,
},
})
return nil
}
}
sess.queueOut(NoContentParams(id, toriginal, now, incomingReqTs, map[string]string{"what": "del"}))
return nil
}
// replyDelMsg deletes (soft or hard) messages in response to del.msg packet.
func (t *Topic) replyDelMsg(sess *Session, asUid types.Uid, asChan bool, msg *ClientComMessage) error {
now := types.TimeNow()
if asChan {
// Do not allow channel readers delete messages.
sess.queueOut(ErrOperationNotAllowedReply(msg, now))
return errors.New("channel readers cannot delete messages")
}
del := msg.Del
pud := t.perUser[asUid]
if !(pud.modeGiven & pud.modeWant).IsDeleter() {
// User must have an R permission: if the user cannot read messages, he has
// no business of deleting them.
if !(pud.modeGiven & pud.modeWant).IsReader() {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("del.msg: permission denied")
}
// User has just the R permission, cannot hard-delete messages, silently
// switching to soft-deleting
del.Hard = false
}
var err error
var ranges []types.Range
if len(del.DelSeq) == 0 {
err = errors.New("del.msg: no IDs to delete")
} else {
count := 0
for _, dq := range del.DelSeq {
if dq.LowId > t.lastID || dq.LowId < 0 || dq.HiId < 0 ||
(dq.HiId > 0 && dq.LowId > dq.HiId) ||
(dq.LowId == 0 && dq.HiId == 0) {
err = errors.New("del.msg: invalid entry in list")
break
}
if dq.HiId > t.lastID {
// Range is inclusive - exclusive [low, hi),
// to delete all messages hi must be lastId + 1
dq.HiId = t.lastID + 1
} else if dq.LowId == dq.HiId || dq.LowId+1 == dq.HiId {
dq.HiId = 0
}
if dq.HiId == 0 {
count++
} else {
count += dq.HiId - dq.LowId
}
ranges = append(ranges, types.Range{Low: dq.LowId, Hi: dq.HiId})
}
if err == nil {
// Sort by Low ascending then by Hi descending.
sort.Sort(types.RangeSorter(ranges))
// Collapse overlapping ranges
ranges = types.RangeSorter(ranges).Normalize()
}
if count > defaultMaxDeleteCount && len(ranges) > 1 {
err = errors.New("del.msg: too many messages to delete")
}
}
if err != nil {
sess.queueOut(ErrMalformedReply(msg, now))
return err
}
forUser := asUid
var age time.Duration
if del.Hard {
forUser = types.ZeroUid
age = globals.msgDeleteAge
}
if err = store.Messages.DeleteList(t.name, t.delID+1, forUser, age, ranges); err != nil {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
// Increment Delete transaction ID
t.delID++
dr := rangeDeserialize(ranges)
if del.Hard {
for uid, pud := range t.perUser {
pud.delID = t.delID
t.perUser[uid] = pud
// Update unread counters for all users who may have had these messages as unread
if (pud.modeGiven & pud.modeWant).IsReader() {
// Calculate how many unread messages were deleted for this user
unreadDeleted := calculateUnreadInRanges(pud.readID, t.lastID, ranges)
if unreadDeleted > 0 {
// Decrease unread count (negative value)
usersUpdateUnread(uid, -unreadDeleted, true)
}
}
}
// Broadcast the change to all, online and offline, exclude the session making the change.
params := &presParams{delID: t.delID, delSeq: dr, actor: asUid.UserId()}
filters := &presFilters{filterIn: types.ModeRead}
t.presSubsOnline("del", params.actor, params, filters, sess.sid)
t.presSubsOffline("del", params, filters, nilPresFilters, sess.sid, true)
} else {
pud := t.perUser[asUid]
pud.delID = t.delID
t.perUser[asUid] = pud
// Notify user's other sessions
t.presPubMessageDelete(asUid, pud.modeGiven&pud.modeWant, t.delID, dr, sess.sid)
}
sess.queueOut(NoErrParamsReply(msg, now, map[string]int{"del": t.delID}))
return nil
}
// Handle request to delete the topic {del what="topic"}.
// 1. If requester is the owner then it should have been handled at the hub, log an error.
// 2. If requester is not the owner, treat it like {leave unsub=true}.
func (t *Topic) replyDelTopic(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
if t.owner != asUid {
return t.replyLeaveUnsub(sess, msg, asUid)
}
// This is an indication of a bug.
logs.Err.Println("replyDelTopic called by owner (SHOULD NOT HAPPEN!)")
return nil
}
// Delete credential
func (t *Topic) replyDelCred(sess *Session, asUid types.Uid, authLvl auth.Level, msg *ClientComMessage) error {
now := types.TimeNow()
incomingReqTs := msg.Timestamp
del := msg.Del
if t.cat != types.TopicCatMe {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("del.cred: invalid topic category")
}
if del.Cred == nil || del.Cred.Method == "" {
sess.queueOut(ErrMalformedReply(msg, now))
return errors.New("del.cred: missing method")
}
tags, err := deleteCred(asUid, authLvl, del.Cred)
if tags != nil {
// Check if anything has been actually removed.
_, removed, _ := stringSliceDelta(t.tags, tags)
if len(removed) > 0 {
t.tags = tags
t.presSubsOnline("tags", "", nilPresParams, nilPresFilters, "")
}
} else if err == nil {
sess.queueOut(InfoNoActionReply(msg, now))
return nil
}
sess.queueOut(decodeStoreErrorExplicitTs(err, del.Id, del.Topic, now, incomingReqTs, nil))
return err
}
// Delete subscription.
func (t *Topic) replyDelSub(sess *Session, asUid types.Uid, msg *ClientComMessage) error {
now := types.TimeNow()
del := msg.Del
asChan, err := t.verifyChannelAccess(msg.Original)
if err != nil {
// User should not be able to address non-channel topic as channel.
sess.queueOut(ErrNotFoundReply(msg, now))
return types.ErrNotFound
}
if asChan {
// Don't allow channel readers to delete self-subscription. Use leave-unsub or del-topic.
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return errors.New("channel access denied: cannot delete subscription")
}
// Get ID of the affected user
uid := types.ParseUserId(del.User)
pud := t.perUser[asUid]
if !(pud.modeGiven & pud.modeWant).IsAdmin() {
err = errors.New("del.sub: permission denied")
} else if uid.IsZero() || uid == asUid {
// Cannot delete self-subscription. User [leave unsub] or [delete topic]
err = errors.New("del.sub: cannot delete self-subscription")
} else if t.cat == types.TopicCatP2P {
// Don't try to delete the other P2P user
err = errors.New("del.sub: cannot apply to a P2P topic")
}
if err != nil {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return err
}
pud, ok := t.perUser[uid]
if !ok {
sess.queueOut(InfoNoActionReply(msg, now))
return errors.New("del.sub: user not found")
}
// Check if the user being ejected is the owner.
if (pud.modeGiven & pud.modeWant).IsOwner() {
err = errors.New("del.sub: cannot evict topic owner")
} else if !pud.modeWant.IsJoiner() {
// If the user has banned the topic, subscription should not be deleted. Otherwise user may be re-invited
// which defeats the purpose of banning.
err = errors.New("del.sub: cannot delete banned subscription")
}
if err != nil {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
return err
}
// Delete user's subscription from the database
if err := store.Subs.Delete(t.name, uid); err != nil {
if err == types.ErrNotFound {
sess.queueOut(InfoNoActionReply(msg, now))
} else {
sess.queueOut(ErrUnknownReply(msg, now))
return err
}
} else {
sess.queueOut(NoErrReply(msg, now))
}
// Update cached unread count: negative value
if (pud.modeWant & pud.modeGiven).IsReader() {
usersUpdateUnread(uid, pud.readID-t.lastID, true)
}
// ModeUnset signifies deleted subscription as opposite to ModeNone - no access.
t.notifySubChange(uid, asUid, false,
pud.modeWant, pud.modeGiven, types.ModeUnset, types.ModeUnset, sess.sid)
t.evictUser(uid, true, "")
// Notify plugins.
pluginSubscription(&types.Subscription{Topic: t.name, User: uid.String()}, plgActDel)
// If all P2P users were deleted, suspend the topic to let it shut down.
if t.cat == types.TopicCatP2P && t.subsCount() == 0 {
t.markPaused(true)
globals.hub.unreg <- &topicUnreg{del: true, sess: nil, rcptTo: t.name, pkt: nil}
}
return nil
}
// replyLeaveUnsub is a request to unsubscribe user and detach all user's sessions from topic.
func (t *Topic) replyLeaveUnsub(sess *Session, msg *ClientComMessage, asUid types.Uid) error {
now := types.TimeNow()
if asUid.IsZero() {
panic("replyLeaveUnsub: zero asUid")
}
if t.owner == asUid {
if msg.init {
sess.queueOut(ErrPermissionDeniedReply(msg, now))
}
return errors.New("replyLeaveUnsub: owner cannot unsubscribe")
}
var err error
var asChan bool
if msg.init {
asChan, err = t.verifyChannelAccess(msg.Original)
if err != nil {
sess.queueOut(ErrNotFoundReply(msg, now))
return errors.New("replyLeaveUnsub: incorrect addressing of channel")
}
}
pud := t.perUser[asUid]
// Delete user's subscription from the database; msg could be nil, so cannot use msg.Original.
if pud.isChan {
// Handle channel reader.
err = store.Subs.Delete(types.GrpToChn(t.name), asUid)
} else {
// Handle subscriber.
err = store.Subs.Delete(t.name, asUid)
}
if err != nil {
if msg.init {
if err == types.ErrNotFound {
sess.queueOut(InfoNoActionReply(msg, now))
err = nil
} else {
sess.queueOut(ErrUnknownReply(msg, now))
}
}
return err
}
if msg.init {
sess.queueOut(NoErrReply(msg, now))
}
var oldWant types.AccessMode
var oldGiven types.AccessMode
if !asChan {
// Update cached unread count: negative value
if (pud.modeWant & pud.modeGiven).IsReader() {
usersUpdateUnread(asUid, pud.readID-t.lastID, true)
}
oldWant, oldGiven = pud.modeWant, pud.modeGiven
} else {
oldWant, oldGiven = types.ModeCChnReader, types.ModeCChnReader
// Unsubscribe user's devices from the channel (FCM topic).
t.channelSubUnsub(asUid, false)
}
// Send prsence notifictions to admins, other users, and user's other sessions.
t.notifySubChange(asUid, asUid, asChan, oldWant, oldGiven, types.ModeUnset, types.ModeUnset, sess.sid)
// Evict all user's sessions, clear cached data, send notifications.
t.evictUser(asUid, true, sess.sid)
// Notify plugins.
pluginSubscription(&types.Subscription{Topic: t.name, User: asUid.String()}, plgActDel)
if t.cat == types.TopicCatGrp {
// Decrement group's cached member count.
t.subCnt--
}
// If all P2P users were deleted, suspend the topic to let it shut down.
if t.cat == types.TopicCatP2P && t.subsCount() == 0 {
t.markPaused(true)
globals.hub.unreg <- &topicUnreg{del: true, sess: nil, rcptTo: t.name, pkt: nil}
}
return nil
}
// evictUser evicts all given user's sessions from the topic and clears user's cached data, if appropriate.
func (t *Topic) evictUser(uid types.Uid, unsub bool, skip string) {
now := types.TimeNow()
pud, ok := t.perUser[uid]
// Detach user from topic
if unsub {
if t.cat == types.TopicCatP2P {
// P2P: mark user as deleted
pud.online = 0
pud.deleted = true
t.perUser[uid] = pud
} else if ok {
// Grp: delete per-user data
delete(t.perUser, uid)
t.computePerUserAcsUnion()
if !pud.isChan {
usersRegisterUser(uid, false)
}
}
} else if ok {
if pud.isChan {
delete(t.perUser, uid)
// No need to call computePerUserAcsUnion because removal of a channel reader does not change union permissions.
// No need to unregister user as we ignore unread channel messages.
} else {
// Clear online status
pud.online = 0
t.perUser[uid] = pud
}
}
// Detach all user's sessions
msg := NoErrEvicted("", t.original(uid), now)
msg.Ctrl.Params = map[string]any{"unsub": unsub}
msg.SkipSid = skip
msg.uid = uid
msg.AsUser = uid.UserId()
for s := range t.sessions {
if pssd, removed := t.remSession(s, uid); pssd != nil {
if removed {
s.detachSession(t.name)
}
if s.sid != skip {
s.queueOut(msg)
}
}
}
}
// User's subscription to a topic has changed, send presence notifications.
// 1. New subscription
// 2. Deleted subscription
// 3. Permissions changed
// Sending to
// (a) Topic admins online on topic itself.
// (b) Topic admins offline on 'me' if approval is needed.
// (c) If subscription is deleted, 'gone' to target.
// (d) 'off' to topic members online if deleted or muted.
// (e) To target user.
func (t *Topic) notifySubChange(uid, actor types.Uid, isChan bool,
oldWant, oldGiven, newWant, newGiven types.AccessMode, skip string) {
unsub := newWant == types.ModeUnset || newGiven == types.ModeUnset
target := uid.UserId()
dWant := types.ModeNone.String()
if newWant.IsDefined() {
if oldWant.IsDefined() && !oldWant.IsZero() {
dWant = oldWant.Delta(newWant)
} else {
dWant = newWant.String()
}
}
dGiven := types.ModeNone.String()
if newGiven.IsDefined() {
if oldGiven.IsDefined() && !oldGiven.IsZero() {
dGiven = oldGiven.Delta(newGiven)
} else {
dGiven = newGiven.String()
}
}
params := &presParams{
target: target,
actor: actor.UserId(),
dWant: dWant,
dGiven: dGiven,
}
filterSharers := &presFilters{
filterIn: types.ModeCSharer,
excludeUser: target,
}
// Announce the change in permissions to the admins who are online in the topic, exclude the target
// and exclude the actor's session.
t.presSubsOnline("acs", target, params, filterSharers, skip)
// If it's a new subscription or if the user asked for permissions in excess of what was granted,
// announce the request to topic admins on 'me' so they can approve the request. The notification
// is not sent to the target user or the actor's session.
if newWant.BetterThan(newGiven) || oldWant == types.ModeNone {
t.presSubsOffline("acs", params, filterSharers, filterSharers, skip, true)
}
// Handling of muting/unmuting.
// Case A: subscription deleted.
// Case B: subscription muted only.
if unsub {
// Subscription deleted.
// In case of a P2P topic subscribe/unsubscribe users from each other's notifications.
if t.cat == types.TopicCatP2P {
uid2 := t.p2pOtherUser(uid)
// Remove user1's subscription to user2 and notify user1's other sessions that he is gone.
t.presSingleUserOffline(uid, newWant&newGiven, "gone", nilPresParams, skip, false)
// Tell user2 that user1 is offline but let him keep sending updates in case user1 resubscribes.
presSingleUserOfflineOffline(uid2, target, "off", nilPresParams, "")
} else if t.cat == types.TopicCatGrp && !isChan {
// Notify all sharers that the user is offline now.
t.presSubsOnline("off", uid.UserId(), nilPresParams, filterSharers, skip)
// Notify target that the subscription is gone.
presSingleUserOfflineOffline(uid, t.name, "gone", nilPresParams, skip)
}
} else {
// Subscription altered.
if !(newWant & newGiven).IsPresencer() && (oldWant & oldGiven).IsPresencer() {
// Subscription just muted.
var source string
if t.cat == types.TopicCatP2P {
source = t.p2pOtherUser(uid).UserId()
} else if t.cat == types.TopicCatGrp && !isChan {
source = t.name
}
if source != "" {
// Tell user1 to start discarding updates from muted topic/user.
presSingleUserOfflineOffline(uid, source, "off+dis", nilPresParams, "")
}
} else if (newWant & newGiven).IsPresencer() && !(oldWant & oldGiven).IsPresencer() {
// Subscription un-muted.
// Notify subscriber of topic's online status.
if t.cat == types.TopicCatGrp && !isChan {
t.presSingleUserOffline(uid, newWant&newGiven, "?unkn+en", nilPresParams, "", false)
} else if t.cat == types.TopicCatMe {
// User is visible online now, notify subscribers.
t.presUsersOfInterest("on+en", t.userAgent)
}
}
// Notify target that permissions have changed.
// Notify sessions online in the topic.
t.presSubsOnlineDirect("acs", params, &presFilters{singleUser: target}, skip)
// Notify target's other sessions on 'me'.
t.presSingleUserOffline(uid, newWant&newGiven, "acs", params, skip, true)
}
}
// FIXME: this won't work correctly with multiplexing sessions.
func (t *Topic) mostRecentSession() *Session {
var sess *Session
var latest int64
for s := range t.sessions {
sessionLastAction := atomic.LoadInt64(&s.lastAction)
if sessionLastAction > latest {
sess = s
latest = sessionLastAction
}
}
return sess
}
const (
// Topic is fully initialized.
topicStatusLoaded = 0x1
// Topic is paused: all packets are rejected.
topicStatusPaused = 0x2
// Topic is in the process of being deleted. This is irrecoverable.
topicStatusMarkedDeleted = 0x10
// Topic is suspended: read-only mode.
topicStatusReadOnly = 0x20
)
// statusChangeBits sets or removes given bits from t.status
func (t *Topic) statusChangeBits(bits int32, set bool) {
for {
oldStatus := atomic.LoadInt32(&t.status)
newStatus := oldStatus
if set {
newStatus |= bits
} else {
newStatus &= ^bits
}
if newStatus == oldStatus {
break
}
if atomic.CompareAndSwapInt32(&t.status, oldStatus, newStatus) {
break
}
}
}
// markLoaded indicates that topic subscribers have been loaded into memory.
func (t *Topic) markLoaded() {
t.statusChangeBits(topicStatusLoaded, true)
}
// markPaused pauses or unpauses the topic. When the topic is paused all
// messages are rejected.
func (t *Topic) markPaused(pause bool) {
t.statusChangeBits(topicStatusPaused, pause)
}
// markDeleted marks topic as being deleted.
func (t *Topic) markDeleted() {
t.statusChangeBits(topicStatusMarkedDeleted, true)
}
// markReadOnly suspends/un-suspends the topic: adds or removes the 'read-only' flag.
func (t *Topic) markReadOnly(readOnly bool) {
t.statusChangeBits(topicStatusReadOnly, readOnly)
}
// isInactive checks if topic is paused or being deleted.
func (t *Topic) isInactive() bool {
return (atomic.LoadInt32(&t.status) & (topicStatusPaused | topicStatusMarkedDeleted)) != 0
}
func (t *Topic) isReadOnly() bool {
return (atomic.LoadInt32(&t.status) & topicStatusReadOnly) != 0
}
func (t *Topic) isLoaded() bool {
return (atomic.LoadInt32(&t.status) & topicStatusLoaded) != 0
}
func (t *Topic) isDeleted() bool {
return (atomic.LoadInt32(&t.status) & topicStatusMarkedDeleted) != 0
}
// Get topic name suitable for the given client
func (t *Topic) original(uid types.Uid) string {
if t.cat == types.TopicCatP2P {
if pud, ok := t.perUser[uid]; ok {
return pud.topicName
}
panic("Invalid P2P topic")
}
if t.cat == types.TopicCatGrp && t.isChan {
if t.perUser[uid].isChan {
// This is a channel reader.
return types.GrpToChn(t.xoriginal)
}
}
return t.xoriginal
}
// Get ID of the other user in a P2P topic
func (t *Topic) p2pOtherUser(uid types.Uid) types.Uid {
if t.cat == types.TopicCatP2P {
// Try to find user in subscribers.
for u2 := range t.perUser {
if u2.Compare(uid) != 0 {
return u2
}
}
}
// Even when one user is deleted, the subscription must be restored
// before p2pOtherUser is called.
panic("Not a valid P2P topic")
}
// Get per-session value of fnd.Public
func (t *Topic) fndGetPublic(sess *Session) string {
if t.cat == types.TopicCatFnd {
if t.public == nil {
return ""
}
if pubmap, ok := t.public.(map[string]any); ok {
if public, ok := pubmap[sess.sid].(string); ok {
return public
}
return ""
}
panic("Invalid Fnd.Public type")
}
panic("Not Fnd topic")
}
// Assign per-session fnd.Public. Returns true if value has been changed.
func (t *Topic) fndSetPublic(sess *Session, public any) bool {
if t.cat != types.TopicCatFnd {
panic("Not Fnd topic")
}
var pubmap map[string]any
var ok bool
if t.public != nil {
if pubmap, ok = t.public.(map[string]any); !ok {
// This could only happen if fnd.public is assigned outside of this function.
panic("Invalid Fnd.Public type")
}
}
if pubmap == nil {
pubmap = make(map[string]any)
}
if public != nil {
pubmap[sess.sid] = public
} else {
ok = (pubmap[sess.sid] != nil)
delete(pubmap, sess.sid)
if len(pubmap) == 0 {
pubmap = nil
}
}
t.public = pubmap
return ok
}
// Remove per-session value of fnd.Public.
func (t *Topic) fndRemovePublic(sess *Session) {
if t.public == nil {
return
}
// FIXME: case of a multiplexing session won't work correctly.
// Maybe handle it at the proxy topic.
if pubmap, ok := t.public.(map[string]any); ok {
delete(pubmap, sess.sid)
return
}
panic("Invalid Fnd.Public type")
}
func (t *Topic) accessFor(authLvl auth.Level) types.AccessMode {
return selectAccessMode(authLvl, t.accessAnon, t.accessAuth, getDefaultAccess(t.cat, true, false))
}
// subsCount returns the number of topic subscribers. This method is different from subCnt with respect to channels:
// * subsCount counts subscribers + attached channel users.
// * subCnt counts all subscribers (including all channel users).
func (t *Topic) subsCount() int {
if t.cat == types.TopicCatP2P {
count := 0
for uid := range t.perUser {
if !t.perUser[uid].deleted {
count++
}
}
return count
}
return len(t.perUser)
}
// Add session record. 'user' may be different from sess.uid.
func (t *Topic) addSession(sess *Session, asUid types.Uid, isChanSub bool) {
s := sess
if sess.multi != nil {
s = s.multi
}
if pssd, ok := t.sessions[s]; ok {
// Subscription already exists.
if s.isMultiplex() && !sess.background {
// This slice is expected to be relatively short.
// Not doing anything fancy here like maps or sorting.
pssd.muids = append(pssd.muids, asUid)
t.sessions[s] = pssd
}
// Maybe panic here.
return
}
if s.isMultiplex() {
if sess.background {
t.sessions[s] = perSessionData{}
} else {
t.sessions[s] = perSessionData{muids: []types.Uid{asUid}, isChanSub: isChanSub}
}
} else {
t.sessions[s] = perSessionData{uid: asUid, isChanSub: isChanSub}
}
}
// Disconnects session from topic if either one of the following is true:
// * 's' is an ordinary session AND ('asUid' is zero OR 'asUid' matches subscribed user).
// * 's' is a multiplexing session and it's being dropped all together ('asUid' is zero ).
// If 's' is a multiplexing session and asUid is not zero, it's removed from the list of session
// users 'muids'.
// Returns perSessionData if it was found and true if session was actually detached from topic.
func (t *Topic) remSession(sess *Session, asUid types.Uid) (*perSessionData, bool) {
s := sess
if sess.multi != nil {
s = s.multi
}
pssd, ok := t.sessions[s]
if !ok {
// Session not found at all.
return nil, false
}
if pssd.uid == asUid || asUid.IsZero() {
delete(t.sessions, s)
return &pssd, true
}
for i := range pssd.muids {
if pssd.muids[i] == asUid {
pssd.muids[i] = pssd.muids[len(pssd.muids)-1]
pssd.muids = pssd.muids[:len(pssd.muids)-1]
t.sessions[s] = pssd
if len(pssd.muids) == 0 {
delete(t.sessions, s)
return &pssd, true
}
return &pssd, false
}
}
return nil, false
}
// Check if topic has any online (non-background) users.
func (t *Topic) isOnline() bool {
// Find at least one non-background session.
for s, pssd := range t.sessions {
if s.isMultiplex() && len(pssd.muids) > 0 {
return true
}
if !s.background {
return true
}
}
return false
}
// Verifies if topic can be access by the provided name: access any topic as non-channel, access channel as channel.
// Returns true if access is for channel, false if not and error if access is invalid.
func (t *Topic) verifyChannelAccess(asTopic string) (bool, error) {
if !types.IsChannel(asTopic) {
return false, nil
}
if t.isChan {
return true, nil
}
return false, types.ErrNotFound
}
// Infer topic category from name.
func topicCat(name string) types.TopicCat {
return types.GetTopicCat(name)
}
// Generate the name of the group topic as a "grp" followed by random-looking
// unique string.
func genTopicName() string {
return "grp" + store.Store.GetUidString()
}
// Convert expanded (routable) topic name into name suitable for sending to the user.
// For example p2pAbCDef123 -> usrAbCDef
func topicNameForUser(name string, uid types.Uid, isChan bool) string {
switch topicCat(name) {
case types.TopicCatMe:
return "me"
case types.TopicCatFnd:
return "fnd"
case types.TopicCatP2P:
topic, _ := types.P2PNameForUser(uid, name)
return topic
case types.TopicCatGrp:
if isChan {
return types.GrpToChn(name)
}
}
return name
}
// calculateUnreadInRanges calculates how many unread messages are within the given ranges.
// unreadStart is the first unread message SeqId (readID + 1), unreadEnd is the last possible message SeqId.
// Assumes ranges are sorted by Low ascending.
func calculateUnreadInRanges(readID, lastID int, ranges []types.Range) int {
if readID >= lastID {
// No unread messages
return 0
}
unreadStart := readID + 1
unreadEnd := lastID
// Sum up unread messages.
count := 0
for i := 0; i < len(ranges); i++ {
rangeStart := ranges[i].Low
rangeEnd := ranges[i].Hi
if rangeEnd == 0 {
rangeEnd = rangeStart + 1
}
// Find the first range where rangeEnd > readID
if rangeEnd <= readID {
continue
}
// Find intersection of [unreadStart, unreadEnd] and [rangeStart, rangeEnd)
intersectionStart := max(unreadStart, rangeStart)
intersectionEnd := min(unreadEnd+1, rangeEnd) // +1 because unreadEnd is inclusive
if intersectionStart < intersectionEnd {
count += intersectionEnd - intersectionStart
}
}
return count
}