From 8f7a1e2bf1536be8a44cecbd1cf982663183f3cb Mon Sep 17 00:00:00 2001 From: aforge Date: Tue, 25 Aug 2020 23:13:45 -0700 Subject: [PATCH] Fail subscribe requests when the Hub's join queue is full. Handle subscribe responses in tsung.xml in such cases. Also, gofmt on session.go. --- loadtest/tsung.xml | 1 + server/session.go | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/loadtest/tsung.xml b/loadtest/tsung.xml index 8a163d5f..adac3260 100644 --- a/loadtest/tsung.xml +++ b/loadtest/tsung.xml @@ -105,6 +105,7 @@ + {"ctrl":.*"code":200.*} {"sub":{"id":"%%_baseid%%%%_topicx%%%%_ctr%%","topic":"%%_topicx%%","get":{"what":"desc sub data del"}}} diff --git a/server/session.go b/server/session.go index 6d9656fa..82df370b 100644 --- a/server/session.go +++ b/server/session.go @@ -261,7 +261,7 @@ func (s *Session) queueOut(msg *ServerComMessage) bool { statsAddHistSample("RequestLatency", float64(duration)) } if 200 <= msg.Ctrl.Code && msg.Ctrl.Code < 600 { - statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code / 100), 1) + statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code/100), 1) } else { log.Println("Invalid response code: ", msg.Ctrl.Code) } @@ -487,9 +487,15 @@ func (s *Session) subscribe(msg *ClientComMessage) { if sub := s.getSub(msg.RcptTo); sub != nil { s.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp)) } else { - globals.hub.join <- &sessionJoin{ + select { + case globals.hub.join <- &sessionJoin{ pkt: msg, - sess: s} + sess: s}: + default: + // Reply with a 500 to the user. + s.queueOut(ErrUnknownReply(msg, msg.Timestamp)) + log.Println("s.subscribe: join queue full, topic ", msg.RcptTo, s.sid) + } // Hub will send Ctrl success/failure packets back to session } }