Fail subscribe requests when the Hub's join queue is full.

Handle subscribe responses in tsung.xml in such cases.
Also, gofmt on session.go.
This commit is contained in:
aforge
2020-08-25 23:13:45 -07:00
parent 53216d94b3
commit 8f7a1e2bf1
2 changed files with 10 additions and 3 deletions
+1
View File
@@ -105,6 +105,7 @@
<thinktime value="3" random="true"></thinktime>
<request subst="true">
<match do="abort" when="nomatch">{"ctrl":.*"code":200.*}</match>
<websocket type="message" frame="text">{"sub":{"id":"%%_baseid%%%%_topicx%%%%_ctr%%","topic":"%%_topicx%%","get":{"what":"desc sub data del"}}}</websocket>
</request>
+9 -3
View File
@@ -261,7 +261,7 @@ func (s *Session) queueOut(msg *ServerComMessage) bool {
statsAddHistSample("RequestLatency", float64(duration))
}
if 200 <= msg.Ctrl.Code && msg.Ctrl.Code < 600 {
statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code / 100), 1)
statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code/100), 1)
} else {
log.Println("Invalid response code: ", msg.Ctrl.Code)
}
@@ -487,9 +487,15 @@ func (s *Session) subscribe(msg *ClientComMessage) {
if sub := s.getSub(msg.RcptTo); sub != nil {
s.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp))
} else {
globals.hub.join <- &sessionJoin{
select {
case globals.hub.join <- &sessionJoin{
pkt: msg,
sess: s}
sess: s}:
default:
// Reply with a 500 to the user.
s.queueOut(ErrUnknownReply(msg, msg.Timestamp))
log.Println("s.subscribe: join queue full, topic ", msg.RcptTo, s.sid)
}
// Hub will send Ctrl success/failure packets back to session
}
}