mirror of
https://github.com/tinode/chat.git
synced 2026-05-07 20:12:42 +00:00
846 lines
23 KiB
Go
846 lines
23 KiB
Go
/******************************************************************************
|
|
*
|
|
* Description :
|
|
*
|
|
* Topic initilization routines.
|
|
*
|
|
*****************************************************************************/
|
|
|
|
package main
|
|
|
|
import (
|
|
"strings"
|
|
|
|
"github.com/tinode/chat/server/auth"
|
|
"github.com/tinode/chat/server/logs"
|
|
"github.com/tinode/chat/server/store"
|
|
"github.com/tinode/chat/server/store/types"
|
|
)
|
|
|
|
// topicInit reads an existing topic from database or creates a new topic
|
|
func topicInit(t *Topic, join *ClientComMessage, h *Hub) {
|
|
var subscribeReqIssued bool
|
|
defer func() {
|
|
if !subscribeReqIssued && join.Sub != nil && join.sess.inflightReqs != nil {
|
|
// If it was a client initiated subscribe request and we failed it.
|
|
join.sess.inflightReqs.Done()
|
|
}
|
|
}()
|
|
|
|
timestamp := types.TimeNow()
|
|
|
|
var err error
|
|
switch {
|
|
case t.xoriginal == "me":
|
|
// Request to load a 'me' topic. The topic always exists, the subscription is never new.
|
|
err = initTopicMe(t, join)
|
|
case t.xoriginal == "fnd":
|
|
// Request to load a 'find' topic. The topic always exists, the subscription is never new.
|
|
err = initTopicFnd(t, join)
|
|
case strings.HasPrefix(t.xoriginal, "usr") || strings.HasPrefix(t.xoriginal, "p2p"):
|
|
// Request to load an existing or create a new p2p topic, then attach to it.
|
|
err = initTopicP2P(t, join)
|
|
case strings.HasPrefix(t.xoriginal, "new"):
|
|
// Processing request to create a new group topic.
|
|
err = initTopicNewGrp(t, join, false)
|
|
case strings.HasPrefix(t.xoriginal, "nch"):
|
|
// Processing request to create a new channel.
|
|
err = initTopicNewGrp(t, join, true)
|
|
case strings.HasPrefix(t.xoriginal, "grp") || strings.HasPrefix(t.xoriginal, "chn"):
|
|
// Load existing group topic (or channel).
|
|
err = initTopicGrp(t)
|
|
case t.xoriginal == "sys":
|
|
// Initialize system topic.
|
|
err = initTopicSys(t)
|
|
case t.xoriginal == "slf":
|
|
// Initialize self (notes and saved messages) topic.
|
|
err = initTopicSlf(t, join)
|
|
default:
|
|
// Unrecognized topic name
|
|
err = types.ErrTopicNotFound
|
|
}
|
|
|
|
// Failed to create or load the topic.
|
|
if err != nil {
|
|
// Remove topic from cache to prevent hub from forwarding more messages to it.
|
|
h.topicDel(join.RcptTo)
|
|
|
|
logs.Err.Println("init_topic: failed to load or create topic:", join.RcptTo, err)
|
|
join.sess.queueOut(decodeStoreErrorExplicitTs(err, join.Id, t.xoriginal, timestamp, join.Timestamp, nil))
|
|
|
|
// Re-queue pending requests to join the topic.
|
|
for len(t.reg) > 0 {
|
|
h.join <- (<-t.reg)
|
|
}
|
|
|
|
// Reject all other pending requests
|
|
for len(t.clientMsg) > 0 {
|
|
msg := <-t.clientMsg
|
|
if msg.init {
|
|
msg.sess.queueOut(ErrLockedExplicitTs(msg.Id, t.xoriginal, timestamp, join.Timestamp))
|
|
}
|
|
}
|
|
for len(t.unreg) > 0 {
|
|
msg := <-t.unreg
|
|
if msg.sess != nil && msg.sess.inflightReqs != nil {
|
|
msg.sess.inflightReqs.Done()
|
|
}
|
|
if msg.init {
|
|
msg.sess.queueOut(ErrLockedReply(msg, timestamp))
|
|
}
|
|
}
|
|
for len(t.meta) > 0 {
|
|
msg := <-t.meta
|
|
if msg.init {
|
|
msg.sess.queueOut(ErrLockedReply(msg, timestamp))
|
|
}
|
|
}
|
|
if len(t.exit) > 0 {
|
|
msg := <-t.exit
|
|
msg.done <- true
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
t.computePerUserAcsUnion()
|
|
|
|
// prevent newly initialized topics to go live while shutdown in progress
|
|
if globals.shuttingDown {
|
|
h.topicDel(join.RcptTo)
|
|
return
|
|
}
|
|
|
|
if t.isDeleted() {
|
|
// Someone deleted the topic while we were trying to create it.
|
|
return
|
|
}
|
|
|
|
statsInc("LiveTopics", 1)
|
|
statsInc("TotalTopics", 1)
|
|
usersRegisterTopic(t, true)
|
|
|
|
// Topic will check access rights, send invite to p2p user, send {ctrl} message to the initiator session
|
|
if join.Sub != nil {
|
|
subscribeReqIssued = true
|
|
t.reg <- join
|
|
}
|
|
|
|
t.markPaused(false)
|
|
if t.cat == types.TopicCatFnd || t.cat == types.TopicCatSys {
|
|
t.markLoaded()
|
|
}
|
|
|
|
go t.run(h)
|
|
}
|
|
|
|
// Initialize 'me' topic.
|
|
func initTopicMe(t *Topic, sreg *ClientComMessage) error {
|
|
t.cat = types.TopicCatMe
|
|
|
|
user, err := store.Users.Get(types.ParseUserId(t.name))
|
|
if err != nil {
|
|
// Log out the session
|
|
sreg.sess.uid = types.ZeroUid
|
|
return err
|
|
} else if user == nil {
|
|
// Log out the session
|
|
sreg.sess.uid = types.ZeroUid
|
|
return types.ErrUserNotFound
|
|
}
|
|
|
|
// User's default access for p2p topics
|
|
t.accessAuth = user.Access.Auth
|
|
t.accessAnon = user.Access.Anon
|
|
|
|
// Assign tags
|
|
t.tags = user.Tags
|
|
|
|
if err = t.loadSubscribers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.public = user.Public
|
|
t.trusted = user.Trusted
|
|
|
|
t.created = user.CreatedAt
|
|
t.updated = user.UpdatedAt
|
|
|
|
// The following values are exlicitly not set for 'me'.
|
|
// t.touched, t.lastId, t.delId
|
|
|
|
// 'me' has no owner, t.owner = nil
|
|
|
|
// Initiate User Agent with the UA of the creating session to report it later
|
|
t.userAgent = sreg.sess.userAgent
|
|
// Initialize channel for receiving user agent and session online updates.
|
|
t.supd = make(chan *sessionUpdate, 32)
|
|
|
|
if !t.isProxy {
|
|
// Allocate storage for contacts.
|
|
t.perSubs = make(map[string]perSubsData)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Initialize 'fnd' topic
|
|
func initTopicFnd(t *Topic, sreg *ClientComMessage) error {
|
|
t.cat = types.TopicCatFnd
|
|
|
|
uid := types.ParseUserId(sreg.AsUser)
|
|
if uid.IsZero() {
|
|
return types.ErrNotFound
|
|
}
|
|
|
|
user, err := store.Users.Get(uid)
|
|
if err != nil {
|
|
return err
|
|
} else if user == nil {
|
|
if !sreg.sess.isMultiplex() {
|
|
sreg.sess.uid = types.ZeroUid
|
|
}
|
|
return types.ErrNotFound
|
|
}
|
|
|
|
// Make sure no one can join the topic.
|
|
t.accessAuth = getDefaultAccess(t.cat, true, false)
|
|
t.accessAnon = getDefaultAccess(t.cat, false, false)
|
|
|
|
if err = t.loadSubscribers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.created = user.CreatedAt
|
|
t.updated = user.UpdatedAt
|
|
|
|
// 'fnd' has no owner, t.owner = nil
|
|
|
|
// Publishing to fnd is not supported
|
|
// t.lastId = 0, t.delId = 0, t.touched = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// Load or create a P2P topic.
|
|
// There is a reace condition when two users try to create a p2p topic at the same time.
|
|
func initTopicP2P(t *Topic, sreg *ClientComMessage) error {
|
|
pktsub := sreg.Sub
|
|
|
|
// Handle the following cases:
|
|
// 1. Neither topic nor subscriptions exist: create a new p2p topic & subscriptions.
|
|
// 2. Topic exists, one of the subscriptions is missing:
|
|
// 2.1 Requester's subscription is missing, recreate it.
|
|
// 2.2 Other user's subscription is missing, treat like a new request for user 2.
|
|
// 3. Topic exists, both subscriptions are missing: should not happen, fail.
|
|
// 4. Topic and both subscriptions exist: attach to topic
|
|
|
|
t.cat = types.TopicCatP2P
|
|
|
|
// Check if the topic already exists
|
|
stopic, err := store.Topics.Get(t.name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If topic exists, load subscriptions
|
|
var subs []types.Subscription
|
|
if stopic != nil {
|
|
// Subs already have Public swapped
|
|
if subs, err = store.Topics.GetUsers(t.name, nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Case 3, fail
|
|
if len(subs) == 0 {
|
|
logs.Err.Println("hub: missing both subscriptions for '" + t.name + "' (SHOULD NEVER HAPPEN!)")
|
|
return types.ErrInternal
|
|
}
|
|
|
|
t.created = stopic.CreatedAt
|
|
t.updated = stopic.UpdatedAt
|
|
if !stopic.TouchedAt.IsZero() {
|
|
t.touched = stopic.TouchedAt
|
|
}
|
|
t.aux = stopic.Aux
|
|
t.lastID = stopic.SeqId
|
|
t.delID = stopic.DelId
|
|
}
|
|
|
|
// t.owner is blank for p2p topics
|
|
|
|
// Default user access to P2P topics is not set because it's unused.
|
|
// Other users cannot join the topic because of how topic name is constructed.
|
|
// The two participants set each other's access instead.
|
|
// t.accessAuth = getDefaultAccess(t.cat, true)
|
|
// t.accessAnon = getDefaultAccess(t.cat, false)
|
|
|
|
// t.public and t.trusted are not used for p2p topics since each user get a different public/trusted.
|
|
|
|
if stopic != nil && len(subs) == 2 {
|
|
// Case 4.
|
|
for i := range 2 {
|
|
uid := types.ParseUid(subs[i].User)
|
|
t.perUser[uid] = perUserData{
|
|
// Adapter has already swapped the state, public, defaultAccess, lastSeen values.
|
|
public: subs[i].GetPublic(),
|
|
lastSeen: subs[i].GetLastSeen(),
|
|
lastUA: subs[i].GetUserAgent(),
|
|
topicName: types.ParseUid(subs[(i+1)%2].User).UserId(),
|
|
|
|
private: subs[i].Private,
|
|
modeWant: subs[i].ModeWant,
|
|
modeGiven: subs[i].ModeGiven,
|
|
delID: subs[i].DelId,
|
|
recvID: subs[i].RecvSeqId,
|
|
readID: subs[i].ReadSeqId,
|
|
}
|
|
}
|
|
} else {
|
|
// Cases 1 (new topic), 2 (one of the two subscriptions is missing: either it's a new request
|
|
// or the subscription was deleted)
|
|
var userData perUserData
|
|
|
|
// Fetching records for both users.
|
|
// Requester.
|
|
userID1 := types.ParseUserId(sreg.AsUser)
|
|
// The other user.
|
|
userID2 := types.ParseUserId(t.xoriginal)
|
|
|
|
// User index: u1 - requester, u2 - responder, the other user
|
|
var u1, u2 int
|
|
users, err := store.Users.GetAll(userID1, userID2)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(users) != 2 {
|
|
// Invited user does not exist
|
|
return types.ErrUserNotFound
|
|
}
|
|
|
|
// User records are unsorted, make sure we know who is who.
|
|
if users[0].Uid() == userID1 {
|
|
u1, u2 = 0, 1
|
|
} else {
|
|
u1, u2 = 1, 0
|
|
}
|
|
|
|
// Figure out which subscriptions are missing: User1's, User2's or both.
|
|
var sub1, sub2 *types.Subscription
|
|
// Set to true if only requester's subscription has to be created.
|
|
var user1only bool
|
|
if len(subs) == 1 {
|
|
if subs[0].User == userID1.String() {
|
|
// User2's subscription is missing, user1's exists
|
|
sub1 = &subs[0]
|
|
} else {
|
|
// User1's is missing, user2's exists
|
|
sub2 = &subs[0]
|
|
user1only = true
|
|
}
|
|
}
|
|
|
|
// Other user's (responder's) subscription is missing
|
|
if sub2 == nil {
|
|
sub2 = &types.Subscription{
|
|
User: userID2.String(),
|
|
Topic: t.name,
|
|
Private: nil,
|
|
}
|
|
|
|
// Assign user2's ModeGiven based on what user1 has provided.
|
|
// We don't know access mode for user2, assume it's Auth.
|
|
if pktsub.Set != nil && pktsub.Set.Desc != nil && pktsub.Set.Desc.DefaultAcs != nil {
|
|
// Use provided DefaultAcs as non-default modeGiven for the other user.
|
|
// The other user is assumed to have auth level "Auth".
|
|
sub2.ModeGiven = users[u1].Access.Auth
|
|
if err := sub2.ModeGiven.UnmarshalText([]byte(pktsub.Set.Desc.DefaultAcs.Auth)); err != nil {
|
|
logs.Err.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Desc.DefaultAcs.Auth)
|
|
}
|
|
} else {
|
|
// Use user1.Auth as modeGiven for the other user
|
|
sub2.ModeGiven = users[u1].Access.Auth
|
|
}
|
|
// Sanity check
|
|
sub2.ModeGiven = sub2.ModeGiven&globals.typesModeCP2P | types.ModeApprove
|
|
|
|
// Swap Public+Trusted to match swapped Public+Trusted in subs returned from store.Topics.GetSubs
|
|
sub2.SetPublic(users[u1].Public)
|
|
sub2.SetTrusted(users[u1].Trusted)
|
|
|
|
// Mark the entire topic as new.
|
|
pktsub.Created = true
|
|
}
|
|
|
|
// Requester's subscription is missing:
|
|
// a. requester is starting a new topic
|
|
// b. requester's subscription is missing: deleted or creation failed
|
|
if sub1 == nil {
|
|
// Set user1's ModeGiven from user2's default values
|
|
userData.modeGiven = selectAccessMode(auth.Level(sreg.AuthLvl),
|
|
users[u2].Access.Anon,
|
|
users[u2].Access.Auth,
|
|
globals.typesModeCP2P)
|
|
|
|
// By default assign the same mode that user1 gave to user2 (could be changed below)
|
|
userData.modeWant = sub2.ModeGiven
|
|
|
|
if pktsub.Set != nil {
|
|
if pktsub.Set.Sub != nil {
|
|
uid := userID1
|
|
if pktsub.Set.Sub.User != "" {
|
|
uid = types.ParseUserId(pktsub.Set.Sub.User)
|
|
}
|
|
|
|
if uid != userID1 {
|
|
// Report the error and ignore the value
|
|
logs.Err.Println("hub: setting mode for another user is not supported '" + t.name + "'")
|
|
} else {
|
|
// user1 is setting non-default modeWant
|
|
if err := userData.modeWant.UnmarshalText([]byte(pktsub.Set.Sub.Mode)); err != nil {
|
|
logs.Err.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode)
|
|
}
|
|
// Ensure sanity
|
|
userData.modeWant = userData.modeWant&globals.typesModeCP2P | types.ModeApprove
|
|
}
|
|
|
|
// Since user1 issued a {sub} request, make sure the user can join
|
|
userData.modeWant |= types.ModeJoin
|
|
}
|
|
|
|
// user1 sets non-default Private
|
|
if pktsub.Set.Desc != nil {
|
|
if !isNullValue(pktsub.Set.Desc.Private) {
|
|
userData.private = pktsub.Set.Desc.Private
|
|
}
|
|
// Public, if present, is ignored
|
|
}
|
|
}
|
|
|
|
sub1 = &types.Subscription{
|
|
User: userID1.String(),
|
|
Topic: t.name,
|
|
ModeWant: userData.modeWant,
|
|
ModeGiven: userData.modeGiven,
|
|
Private: userData.private,
|
|
}
|
|
// Swap Public+Trsuted to match swapped Public+Trusted in subs returned from store.Topics.GetSubs
|
|
sub1.SetPublic(users[u2].Public)
|
|
sub1.SetTrusted(users[u2].Trusted)
|
|
|
|
// Mark this subscription as new
|
|
pktsub.Newsub = true
|
|
}
|
|
|
|
if !user1only {
|
|
// sub2 is being created, assign sub2.modeWant to what user2 gave to user1 (sub1.modeGiven)
|
|
sub2.ModeWant = selectAccessMode(auth.Level(sreg.AuthLvl),
|
|
users[u2].Access.Anon,
|
|
users[u2].Access.Auth,
|
|
globals.typesModeCP2P)
|
|
// Ensure sanity
|
|
sub2.ModeWant = sub2.ModeWant&globals.typesModeCP2P | types.ModeApprove
|
|
}
|
|
|
|
// Create everything
|
|
if stopic == nil {
|
|
if err = store.Topics.CreateP2P(sub1, sub2); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.created = sub1.CreatedAt
|
|
t.updated = sub1.UpdatedAt
|
|
t.touched = t.updated
|
|
|
|
// t.lastId is not set (default 0) for new topics
|
|
|
|
} else {
|
|
// TODO possibly update subscription, if changed
|
|
|
|
// Recreate one of the subscriptions
|
|
var subToMake *types.Subscription
|
|
if user1only {
|
|
subToMake = sub1
|
|
} else {
|
|
subToMake = sub2
|
|
}
|
|
if err = store.Subs.Create(subToMake); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Public and Trusted are already swapped.
|
|
userData.public = sub1.GetPublic()
|
|
userData.trusted = sub1.GetTrusted()
|
|
userData.topicName = userID2.UserId()
|
|
userData.modeWant = sub1.ModeWant
|
|
userData.modeGiven = sub1.ModeGiven
|
|
userData.delID = sub1.DelId
|
|
userData.readID = sub1.ReadSeqId
|
|
userData.recvID = sub1.RecvSeqId
|
|
t.perUser[userID1] = userData
|
|
|
|
t.perUser[userID2] = perUserData{
|
|
public: sub2.GetPublic(),
|
|
trusted: sub2.GetTrusted(),
|
|
topicName: userID1.UserId(),
|
|
modeWant: sub2.ModeWant,
|
|
modeGiven: sub2.ModeGiven,
|
|
delID: sub2.DelId,
|
|
readID: sub2.ReadSeqId,
|
|
recvID: sub2.RecvSeqId,
|
|
}
|
|
}
|
|
|
|
// Clear original topic name.
|
|
t.xoriginal = ""
|
|
|
|
return nil
|
|
}
|
|
|
|
// Create a new group topic
|
|
func initTopicNewGrp(t *Topic, sreg *ClientComMessage, isChan bool) error {
|
|
timestamp := types.TimeNow()
|
|
pktsub := sreg.Sub
|
|
|
|
t.cat = types.TopicCatGrp
|
|
t.isChan = isChan
|
|
|
|
// Generic topics have parameters stored in the topic object
|
|
t.owner = types.ParseUserId(sreg.AsUser)
|
|
authLevel := auth.Level(sreg.AuthLvl)
|
|
|
|
t.accessAuth = getDefaultAccess(t.cat, true, isChan)
|
|
t.accessAnon = getDefaultAccess(t.cat, false, isChan)
|
|
|
|
// Owner/creator gets full access to the topic. Owner may change the default modeWant through 'set'.
|
|
userData := perUserData{
|
|
modeGiven: types.ModeCFull,
|
|
modeWant: types.ModeCFull,
|
|
}
|
|
|
|
if pktsub.Set != nil {
|
|
// User sent initialization parameters
|
|
if pktsub.Set.Desc != nil {
|
|
if pktsub.Set.Desc.Trusted != nil && authLevel != auth.LevelRoot {
|
|
logs.Err.Println("hub: attempt to assign Trusted by non-ROOT", t.name)
|
|
return types.ErrPermissionDenied
|
|
}
|
|
|
|
if !isNullValue(pktsub.Set.Desc.Public) {
|
|
t.public = pktsub.Set.Desc.Public
|
|
}
|
|
if !isNullValue(pktsub.Set.Desc.Trusted) {
|
|
t.trusted = pktsub.Set.Desc.Trusted
|
|
}
|
|
if !isNullValue(pktsub.Set.Desc.Private) {
|
|
userData.private = pktsub.Set.Desc.Private
|
|
}
|
|
|
|
// set default access
|
|
if pktsub.Set.Desc.DefaultAcs != nil {
|
|
if authMode, anonMode, err := parseTopicAccess(pktsub.Set.Desc.DefaultAcs,
|
|
t.accessAuth, t.accessAnon); err != nil {
|
|
|
|
// Invalid access for one or both. Make it explicitly None
|
|
if authMode.IsInvalid() {
|
|
t.accessAuth = types.ModeNone
|
|
} else {
|
|
t.accessAuth = authMode
|
|
}
|
|
if anonMode.IsInvalid() {
|
|
t.accessAnon = types.ModeNone
|
|
} else {
|
|
t.accessAnon = anonMode
|
|
}
|
|
logs.Err.Println("hub: invalid access mode for topic '" + t.name + "': '" + err.Error() + "'")
|
|
} else if authMode.IsOwner() || anonMode.IsOwner() {
|
|
logs.Err.Println("hub: OWNER default access in topic", t.name)
|
|
t.accessAuth, t.accessAnon = authMode & ^types.ModeOwner, anonMode & ^types.ModeOwner
|
|
} else {
|
|
t.accessAuth, t.accessAnon = authMode, anonMode
|
|
}
|
|
}
|
|
}
|
|
|
|
// Owner/creator may restrict own access to topic
|
|
if pktsub.Set.Sub != nil && pktsub.Set.Sub.Mode != "" {
|
|
userData.modeWant = types.ModeCFull
|
|
if err := userData.modeWant.UnmarshalText([]byte(pktsub.Set.Sub.Mode)); err != nil {
|
|
logs.Err.Println("hub: invalid access mode", t.xoriginal, pktsub.Set.Sub.Mode)
|
|
}
|
|
// User must not unset ModeJoin or the owner flags
|
|
userData.modeWant |= types.ModeJoin | types.ModeOwner
|
|
}
|
|
|
|
if tags := normalizeTags(pktsub.Set.Tags, globals.maxTagCount); len(tags) > 0 {
|
|
if !restrictedTagsEqual(tags, nil, globals.immutableTagNS) {
|
|
return types.ErrPermissionDenied
|
|
}
|
|
// Assign tags
|
|
t.tags = tags
|
|
}
|
|
}
|
|
|
|
t.perUser[t.owner] = userData
|
|
|
|
t.created = timestamp
|
|
t.updated = timestamp
|
|
t.touched = timestamp
|
|
|
|
// t.lastId & t.delId are not set for new topics
|
|
|
|
stopic := &types.Topic{
|
|
ObjHeader: types.ObjHeader{Id: sreg.RcptTo, CreatedAt: timestamp},
|
|
Access: types.DefaultAccess{Auth: t.accessAuth, Anon: t.accessAnon},
|
|
Tags: t.tags,
|
|
UseBt: isChan,
|
|
Public: t.public,
|
|
Trusted: t.trusted,
|
|
}
|
|
|
|
// store.Topics.Create will add a subscription record for the topic creator
|
|
stopic.GiveAccess(t.owner, userData.modeWant, userData.modeGiven)
|
|
err := store.Topics.Create(stopic, t.owner, t.perUser[t.owner].private)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Link uploaded avatar to topic.
|
|
if sreg.Extra != nil && len(sreg.Extra.Attachments) > 0 {
|
|
if err := store.Files.LinkAttachments(t.name, types.ZeroUid, sreg.Extra.Attachments); err != nil {
|
|
logs.Warn.Printf("topic[%s] failed to link avatar attachment: %v", t.name, err)
|
|
// This is not a critical error, continue execution.
|
|
}
|
|
}
|
|
|
|
t.xoriginal = t.name // keeping 'new' or 'nch' as original has no value to the client
|
|
t.subCnt = 1 // One subscription, the owner.
|
|
|
|
pktsub.Created = true
|
|
pktsub.Newsub = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// Initialize existing group topic. There is a race condition when two users attempt to load
|
|
// the same topic at the same time. It's prevented at hub level.
|
|
func initTopicGrp(t *Topic) error {
|
|
t.cat = types.TopicCatGrp
|
|
|
|
// TODO(gene): check and validate topic name
|
|
stopic, err := store.Topics.Get(t.name)
|
|
if err != nil {
|
|
return err
|
|
} else if stopic == nil {
|
|
return types.ErrTopicNotFound
|
|
}
|
|
|
|
if err = t.loadSubscribers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.isChan = stopic.UseBt
|
|
|
|
// t.owner is set by loadSubscriptions
|
|
|
|
t.accessAuth = stopic.Access.Auth
|
|
t.accessAnon = stopic.Access.Anon
|
|
|
|
// Assign tags & auxiliary data.
|
|
t.tags = stopic.Tags
|
|
t.aux = stopic.Aux
|
|
|
|
t.public = stopic.Public
|
|
t.trusted = stopic.Trusted
|
|
|
|
t.created = stopic.CreatedAt
|
|
t.updated = stopic.UpdatedAt
|
|
if !stopic.TouchedAt.IsZero() {
|
|
t.touched = stopic.TouchedAt
|
|
}
|
|
t.lastID = stopic.SeqId
|
|
t.delID = stopic.DelId
|
|
t.subCnt = stopic.SubCnt
|
|
|
|
// Initialize channel for receiving session online updates.
|
|
t.supd = make(chan *sessionUpdate, 32)
|
|
|
|
t.xoriginal = t.name // topic may have been loaded by a channel reader; make sure it's grpXXX, not chnXXX.
|
|
|
|
return nil
|
|
}
|
|
|
|
// Initialize system topic. System topic is a singleton, always in memory.
|
|
func initTopicSys(t *Topic) error {
|
|
t.cat = types.TopicCatSys
|
|
|
|
stopic, err := store.Topics.Get(t.name)
|
|
if err != nil {
|
|
return err
|
|
} else if stopic == nil {
|
|
return types.ErrTopicNotFound
|
|
}
|
|
|
|
if err = t.loadSubscribers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// There is no t.owner
|
|
|
|
// Default permissions are 'W'
|
|
t.accessAuth = types.ModeWrite
|
|
t.accessAnon = types.ModeWrite
|
|
|
|
t.public = stopic.Public
|
|
t.trusted = stopic.Trusted
|
|
|
|
t.created = stopic.CreatedAt
|
|
t.updated = stopic.UpdatedAt
|
|
if !stopic.TouchedAt.IsZero() {
|
|
t.touched = stopic.TouchedAt
|
|
}
|
|
t.lastID = stopic.SeqId
|
|
|
|
return nil
|
|
}
|
|
|
|
// Initialize or load a self-topic 'slf'.
|
|
func initTopicSlf(t *Topic, sreg *ClientComMessage) error {
|
|
t.cat = types.TopicCatSlf
|
|
|
|
stopic, err := store.Topics.Get(t.name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If topic exists, load subscriptions
|
|
if stopic != nil {
|
|
if err = t.loadSubscribers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// t.owner is set by loadSubscriptions
|
|
|
|
// Topic exists but subscription is missing. Fail.
|
|
if len(t.perUser) == 0 {
|
|
logs.Err.Println("hub: missing subscription for '" + t.name + "' (SHOULD NEVER HAPPEN!)")
|
|
return types.ErrInternal
|
|
}
|
|
|
|
t.created = stopic.CreatedAt
|
|
t.updated = stopic.UpdatedAt
|
|
if !stopic.TouchedAt.IsZero() {
|
|
t.touched = stopic.TouchedAt
|
|
}
|
|
t.aux = stopic.Aux
|
|
t.lastID = stopic.SeqId
|
|
t.delID = stopic.DelId
|
|
|
|
} else {
|
|
// Get topic owner.
|
|
userID := types.ParseUserId(sreg.AsUser)
|
|
user, err := store.Users.Get(userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if user == nil {
|
|
// User not found. Really should not happen.
|
|
return types.ErrUserNotFound
|
|
}
|
|
|
|
t.owner = userID
|
|
|
|
t.accessAuth = getDefaultAccess(t.cat, true, false)
|
|
t.accessAnon = getDefaultAccess(t.cat, false, false)
|
|
|
|
// Default access for the self-owner.
|
|
userData := perUserData{
|
|
modeGiven: t.accessAuth,
|
|
modeWant: t.accessAuth,
|
|
}
|
|
|
|
// Mark the topic as new.
|
|
sreg.Sub.Created = true
|
|
|
|
if sreg.Sub.Set != nil {
|
|
// User sets non-default Private
|
|
if sreg.Sub.Set.Desc != nil {
|
|
if !isNullValue(sreg.Sub.Set.Desc.Private) {
|
|
userData.private = sreg.Sub.Set.Desc.Private
|
|
}
|
|
// Public, trusted are ignored.
|
|
}
|
|
|
|
if tags := normalizeTags(sreg.Sub.Set.Tags, globals.maxTagCount); len(tags) > 0 {
|
|
if !restrictedTagsEqual(tags, nil, globals.immutableTagNS) {
|
|
return types.ErrPermissionDenied
|
|
}
|
|
|
|
// Assign tags
|
|
t.tags = tags
|
|
}
|
|
}
|
|
|
|
// Mark this subscription as new
|
|
sreg.Sub.Newsub = true
|
|
|
|
t.perUser[t.owner] = userData
|
|
|
|
timestamp := types.TimeNow()
|
|
|
|
t.created = timestamp
|
|
t.updated = timestamp
|
|
t.touched = timestamp
|
|
|
|
stopic = &types.Topic{
|
|
ObjHeader: types.ObjHeader{Id: sreg.RcptTo, CreatedAt: timestamp},
|
|
Access: types.DefaultAccess{Auth: t.accessAuth, Anon: t.accessAnon},
|
|
Tags: t.tags,
|
|
}
|
|
|
|
// store.Topics.Create will add a subscription record for the topic creator
|
|
stopic.GiveAccess(t.owner, userData.modeWant, userData.modeGiven)
|
|
err = store.Topics.Create(stopic, t.owner, t.perUser[t.owner].private)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sreg.Sub.Created = true
|
|
sreg.Sub.Newsub = true
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// loadSubscribers loads topic subscribers, sets topic owner.
|
|
func (t *Topic) loadSubscribers() error {
|
|
subs, err := store.Topics.GetSubs(t.name, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if subs == nil {
|
|
return nil
|
|
}
|
|
|
|
for i := range subs {
|
|
sub := &subs[i]
|
|
uid := types.ParseUid(sub.User)
|
|
t.perUser[uid] = perUserData{
|
|
delID: sub.DelId,
|
|
readID: sub.ReadSeqId,
|
|
recvID: sub.RecvSeqId,
|
|
private: sub.Private,
|
|
modeWant: sub.ModeWant,
|
|
modeGiven: sub.ModeGiven,
|
|
}
|
|
|
|
if (sub.ModeGiven & sub.ModeWant).IsOwner() {
|
|
t.owner = uid
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|