mirror of
https://github.com/tinode/chat.git
synced 2026-05-07 20:12:42 +00:00
e776c4d466
Instead of writing to Session.detach.
311 lines
10 KiB
Go
311 lines
10 KiB
Go
/******************************************************************************
|
|
* Description :
|
|
* Topic in a cluster which serves as a local representation of the master
|
|
* topic hosted at another node.
|
|
*****************************************************************************/
|
|
|
|
package main
|
|
|
|
import (
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/tinode/chat/server/logs"
|
|
"github.com/tinode/chat/server/store/types"
|
|
)
|
|
|
|
func (t *Topic) runProxy(hub *Hub) {
|
|
killTimer := time.NewTimer(time.Hour)
|
|
killTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case msg := <-t.reg:
|
|
// Request to add a connection to this topic
|
|
if t.isInactive() {
|
|
msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow()))
|
|
} else if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, msg, t.name, msg.sess); err != nil {
|
|
// Response (ctrl message) will be handled when it's received via the proxy channel.
|
|
logs.Warn.Printf("proxy topic[%s]: route join request from proxy to master failed - %s", t.name, err)
|
|
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
|
|
}
|
|
if msg.sess.inflightReqs != nil {
|
|
msg.sess.inflightReqs.Done()
|
|
}
|
|
|
|
case msg := <-t.unreg:
|
|
if !t.handleProxyLeaveRequest(msg, killTimer) {
|
|
sid := "nil"
|
|
if msg.sess != nil {
|
|
sid = msg.sess.sid
|
|
}
|
|
logs.Warn.Printf("proxy topic[%s]: failed to update proxy topic state for leave request - sid %s", t.name, sid)
|
|
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
|
|
}
|
|
if msg.init && msg.sess.inflightReqs != nil {
|
|
// If it's a client initiated request.
|
|
msg.sess.inflightReqs.Done()
|
|
}
|
|
|
|
case msg := <-t.clientMsg:
|
|
// Content message intended for broadcasting to recipients
|
|
if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil {
|
|
logs.Warn.Printf("topic proxy[%s]: route broadcast request from proxy to master failed - %s", t.name, err)
|
|
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
|
|
}
|
|
|
|
case msg := <-t.serverMsg:
|
|
if msg.Info != nil || msg.Pres != nil {
|
|
globals.cluster.routeToTopicIntraCluster(t.name, msg, msg.sess)
|
|
} else {
|
|
// FIXME: should something be done here?
|
|
logs.Err.Printf("ERROR!!! topic proxy[%s]: unexpected server-side message in proxy topic %s", t.name, msg.describe())
|
|
}
|
|
|
|
case msg := <-t.meta:
|
|
// Request to get/set topic metadata
|
|
if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, msg, t.name, msg.sess); err != nil {
|
|
logs.Warn.Printf("proxy topic[%s]: route meta request from proxy to master failed - %s", t.name, err)
|
|
msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
|
|
}
|
|
|
|
case upd := <-t.supd:
|
|
// Either an update to 'me' user agent from one of the sessions or
|
|
// background session comes to foreground.
|
|
req := ProxyReqMeUserAgent
|
|
tmpSess := &Session{userAgent: upd.userAgent}
|
|
if upd.sess != nil {
|
|
// Subscribed user may not match session user. Find out who is subscribed
|
|
pssd, ok := t.sessions[upd.sess]
|
|
if !ok {
|
|
logs.Warn.Printf("proxy topic[%s]: sess update request from detached session - sid %s", t.name, upd.sess.sid)
|
|
continue
|
|
}
|
|
req = ProxyReqBgSession
|
|
tmpSess.uid = pssd.uid
|
|
tmpSess.sid = upd.sess.sid
|
|
tmpSess.userAgent = upd.sess.userAgent
|
|
}
|
|
if err := globals.cluster.routeToTopicMaster(req, nil, t.name, tmpSess); err != nil {
|
|
logs.Warn.Printf("proxy topic[%s]: route sess update request from proxy to master failed - %s", t.name, err)
|
|
}
|
|
|
|
case msg := <-t.proxy:
|
|
t.proxyMasterResponse(msg, killTimer)
|
|
|
|
case sd := <-t.exit:
|
|
// Tell sessions to remove the topic
|
|
for s := range t.sessions {
|
|
s.detachSession(t.name)
|
|
}
|
|
|
|
if err := globals.cluster.topicProxyGone(t.name); err != nil {
|
|
logs.Warn.Printf("proxy topic[%s] shutdown: failed to notify master - %s", t.name, err)
|
|
}
|
|
|
|
// Report completion back to sender, if 'done' is not nil.
|
|
if sd.done != nil {
|
|
sd.done <- true
|
|
}
|
|
return
|
|
|
|
case <-killTimer.C:
|
|
// Topic timeout
|
|
hub.unreg <- &topicUnreg{rcptTo: t.name}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Takes a session leave request, forwards it to the topic master and
|
|
// modifies the local state accordingly.
|
|
// Returns whether the operation was successful.
|
|
func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.Timer) bool {
|
|
// Detach session from topic; session may continue to function.
|
|
var asUid types.Uid
|
|
if msg.init {
|
|
asUid = types.ParseUserId(msg.AsUser)
|
|
}
|
|
|
|
if asUid.IsZero() {
|
|
if pssd, ok := t.sessions[msg.sess]; ok {
|
|
asUid = pssd.uid
|
|
} else {
|
|
logs.Warn.Printf("proxy topic[%s]: leave request sent for unknown session", t.name)
|
|
return false
|
|
}
|
|
}
|
|
// Remove the session from the topic without waiting for a response from the master node
|
|
// because by the time the response arrives this session may be already gone from the session store
|
|
// and we won't be able to find and remove it by its sid.
|
|
pssd, result := t.remSession(msg.sess, asUid)
|
|
if result {
|
|
msg.sess.delSub(t.name)
|
|
}
|
|
if !msg.init {
|
|
// Explicitly specify the uid because the master multiplex session needs to know which
|
|
// of its multiple hosted sessions to delete.
|
|
msg.AsUser = asUid.UserId()
|
|
msg.Leave = &MsgClientLeave{}
|
|
msg.init = true
|
|
}
|
|
// Make sure we set the Original field if it's empty (e.g. when session is terminating altogether).
|
|
if msg.Original == "" {
|
|
if t.cat == types.TopicCatGrp && t.isChan {
|
|
// It's a channel topic. Original topic name depends the subscription type.
|
|
if result && pssd.isChanSub {
|
|
msg.Original = types.GrpToChn(t.xoriginal)
|
|
} else {
|
|
msg.Original = t.xoriginal
|
|
}
|
|
} else {
|
|
msg.Original = t.original(asUid)
|
|
}
|
|
}
|
|
|
|
if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, msg, t.name, msg.sess); err != nil {
|
|
logs.Warn.Printf("proxy topic[%s]: route leave request from proxy to master failed - %s", t.name, err)
|
|
}
|
|
if len(t.sessions) == 0 {
|
|
// No more sessions attached. Start the countdown.
|
|
killTimer.Reset(idleProxyTopicTimeout)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// proxyMasterResponse at proxy topic processes a master topic response to an earlier request.
|
|
func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
|
|
// Kills topic after a period of inactivity.
|
|
keepAlive := idleProxyTopicTimeout
|
|
|
|
if msg.SrvMsg.Pres != nil && msg.SrvMsg.Pres.What == "acs" && msg.SrvMsg.Pres.Acs != nil {
|
|
// If the server changed acs on this topic, update the internal state.
|
|
t.updateAcsFromPresMsg(msg.SrvMsg.Pres)
|
|
}
|
|
|
|
if msg.OrigSid == "*" {
|
|
// It is a broadcast.
|
|
switch {
|
|
case msg.SrvMsg.Pres != nil || msg.SrvMsg.Data != nil || msg.SrvMsg.Info != nil:
|
|
// Regular broadcast.
|
|
t.handleProxyBroadcast(msg.SrvMsg)
|
|
case msg.SrvMsg.Ctrl != nil:
|
|
// Ctrl broadcast. E.g. for user eviction.
|
|
t.proxyCtrlBroadcast(msg.SrvMsg)
|
|
default:
|
|
}
|
|
} else {
|
|
sess := globals.sessionStore.Get(msg.OrigSid)
|
|
if sess == nil {
|
|
logs.Warn.Printf("proxy topic[%s]: session %s not found; already terminated?", t.name, msg.OrigSid)
|
|
}
|
|
switch msg.OrigReqType {
|
|
case ProxyReqJoin:
|
|
if sess != nil && msg.SrvMsg.Ctrl != nil {
|
|
// TODO: do we need to let the master topic know that the subscription is not longer valid
|
|
// or is it already informed by the session when it terminated?
|
|
|
|
// Subscription result.
|
|
if msg.SrvMsg.Ctrl.Code < 300 {
|
|
sess.sessionStoreLock.Lock()
|
|
// Make sure the session isn't gone yet.
|
|
if session := globals.sessionStore.Get(msg.OrigSid); session != nil {
|
|
// Successful subscriptions.
|
|
t.addSession(session, msg.SrvMsg.uid, types.IsChannel(msg.SrvMsg.Ctrl.Topic))
|
|
session.addSub(t.name, &Subscription{
|
|
broadcast: t.clientMsg,
|
|
done: t.unreg,
|
|
meta: t.meta,
|
|
supd: t.supd,
|
|
})
|
|
}
|
|
sess.sessionStoreLock.Unlock()
|
|
|
|
killTimer.Stop()
|
|
} else if len(t.sessions) == 0 {
|
|
killTimer.Reset(keepAlive)
|
|
}
|
|
}
|
|
case ProxyReqBroadcast, ProxyReqMeta, ProxyReqCall:
|
|
// no processing
|
|
case ProxyReqLeave:
|
|
if msg.SrvMsg != nil && msg.SrvMsg.Ctrl != nil {
|
|
if msg.SrvMsg.Ctrl.Code < 300 {
|
|
if sess != nil {
|
|
t.remSession(sess, sess.uid)
|
|
}
|
|
}
|
|
// All sessions are gone. Start the kill timer.
|
|
if len(t.sessions) == 0 {
|
|
killTimer.Reset(keepAlive)
|
|
}
|
|
}
|
|
|
|
default:
|
|
logs.Err.Printf("proxy topic[%s] received response referencing unexpected request type %d",
|
|
t.name, msg.OrigReqType)
|
|
}
|
|
|
|
if sess != nil && !sess.queueOut(msg.SrvMsg) {
|
|
logs.Err.Printf("proxy topic[%s]: timeout in sending response - sid %s", t.name, sess.sid)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleProxyBroadcast broadcasts a Data, Info or Pres message to sessions attached to this proxy topic.
|
|
func (t *Topic) handleProxyBroadcast(msg *ServerComMessage) {
|
|
if t.isInactive() {
|
|
// Ignore broadcast - topic is paused or being deleted.
|
|
return
|
|
}
|
|
|
|
if msg.Data != nil {
|
|
t.lastID = msg.Data.SeqId
|
|
}
|
|
|
|
t.broadcastToSessions(msg)
|
|
}
|
|
|
|
// proxyCtrlBroadcast broadcasts a ctrl command to certain sessions attached to this proxy topic.
|
|
func (t *Topic) proxyCtrlBroadcast(msg *ServerComMessage) {
|
|
if msg.Ctrl.Code == http.StatusResetContent && msg.Ctrl.Text == "evicted" {
|
|
// We received a ctrl command for evicting a user.
|
|
if msg.uid.IsZero() {
|
|
logs.Err.Panicf("proxy topic[%s]: proxy received evict message with empty uid", t.name)
|
|
}
|
|
for sess := range t.sessions {
|
|
// Proxy topic may only have ordinary sessions. No multiplexing or proxy sessions here.
|
|
if _, removed := t.remSession(sess, msg.uid); removed {
|
|
sess.detachSession(t.name)
|
|
if sess.sid != msg.SkipSid {
|
|
sess.queueOut(msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateAcsFromPresMsg modifies user acs in Topic's perUser struct based on the data in `pres`.
|
|
func (t *Topic) updateAcsFromPresMsg(pres *MsgServerPres) {
|
|
uid := types.ParseUserId(pres.Src)
|
|
if uid.IsZero() {
|
|
if t.cat != types.TopicCatMe {
|
|
logs.Warn.Printf("proxy topic[%s]: received acs change for invalid user id '%s'", t.name, pres.Src)
|
|
}
|
|
return
|
|
}
|
|
|
|
// If t.perUser[uid] does not exist, pud is initialized with blanks, otherwise it gets existing values.
|
|
pud := t.perUser[uid]
|
|
dacs := pres.Acs
|
|
if err := pud.modeWant.ApplyMutation(dacs.Want); err != nil {
|
|
logs.Warn.Printf("proxy topic[%s]: could not process acs change - want: %s", t.name, err)
|
|
return
|
|
}
|
|
if err := pud.modeGiven.ApplyMutation(dacs.Given); err != nil {
|
|
logs.Warn.Printf("proxy topic[%s]: could not process acs change - given: %s", t.name, err)
|
|
return
|
|
}
|
|
// Update existing or add new.
|
|
t.perUser[uid] = pud
|
|
}
|