add option to fetch messages by ranges of seq IDs

This commit is contained in:
or-else
2023-04-18 17:06:30 -07:00
parent 41b35d8046
commit 126d5f522f
11 changed files with 915 additions and 831 deletions
+650 -637
View File
File diff suppressed because it is too large Load Diff
+7 -5
View File
@@ -95,6 +95,11 @@ message SetDesc {
bytes trusted = 4;
}
message SeqRange {
int32 low = 1;
int32 hi = 2;
}
message GetOpts {
// Timestamp in milliseconds since epoch 01/01/1970
int64 if_modified_since = 1;
@@ -108,6 +113,8 @@ message GetOpts {
int32 before_id = 5;
// Maximum number of results to return
int32 limit = 6;
// Load messages by id or ranges of ids
repeated SeqRange ranges = 7;
}
message GetQuery {
@@ -134,11 +141,6 @@ message SetQuery {
map<string, bytes> aux = 5;
}
message SeqRange {
int32 low = 1;
int32 hi = 2;
}
// Client handshake
message ClientHi {
string id = 1;
File diff suppressed because one or more lines are too long
+29 -23
View File
@@ -26,12 +26,14 @@ type MsgGetOpts struct {
Topic string `json:"topic,omitempty"`
// Return results modified since this timespamp.
IfModifiedSince *time.Time `json:"ims,omitempty"`
// Load messages/ranges with IDs equal or greater than this (inclusive or closed)
// Load messages/ranges with IDs equal or greater than this (inclusive or closed).
SinceId int `json:"since,omitempty"`
// Load messages/ranges with IDs lower than this (exclusive or open)
// Load messages/ranges with IDs lower than this (exclusive or open).
BeforeId int `json:"before,omitempty"`
// Limit the number of messages loaded
// Limit the number of messages loaded.
Limit int `json:"limit,omitempty"`
// Fetch messages with IDs in these ranges.
IdRanges []MsgRange `json:"ranges,omitempty"`
}
// MsgGetQuery is a topic metadata or data query.
@@ -50,19 +52,23 @@ type MsgGetQuery struct {
// MsgSetSub is a payload in set.sub request to update current subscription or invite another user, {sub.what} == "sub".
type MsgSetSub struct {
// User affected by this request. Default (empty): current user
// User affected by this request. Default (empty): current user.
User string `json:"user,omitempty"`
// Access mode change, either Given or Want depending on context
// Access mode change, either Given or Want depending on context.
Mode string `json:"mode,omitempty"`
}
// MsgSetDesc is a C2S in set.what == "desc", acc, sub message.
type MsgSetDesc struct {
DefaultAcs *MsgDefaultAcsMode `json:"defacs,omitempty"` // default access mode
Public any `json:"public,omitempty"` // description of the user or topic
Trusted any `json:"trusted,omitempty"` // trusted (system-provided) user or topic data
Private any `json:"private,omitempty"` // per-subscription private data
// Default access mode.
DefaultAcs *MsgDefaultAcsMode `json:"defacs,omitempty"`
// Description of the user or topic.
Public any `json:"public,omitempty"`
// Trusted (system-provided) user or topic data.
Trusted any `json:"trusted,omitempty"`
// Per-subscription private data.
Private any `json:"private,omitempty"`
}
// MsgCredClient is an account credential such as email or phone number.
@@ -91,9 +97,9 @@ type MsgSetQuery struct {
Aux map[string]any
}
// MsgDelRange is either an individual ID (HiId=0) or a randge of deleted IDs, low end inclusive (closed),
// MsgRange is either an individual ID (HiId=0) or a randge of IDs, low end inclusive (closed),
// high-end exclusive (open): [LowId .. HiId), e.g. 1..5 -> 1, 2, 3, 4.
type MsgDelRange struct {
type MsgRange struct {
LowId int `json:"low,omitempty"`
HiId int `json:"hi,omitempty"`
}
@@ -291,7 +297,7 @@ type MsgClientDel struct {
// * "cred" to delete credential (email or phone)
What string `json:"what"`
// Delete messages with these IDs (either one by one or a set of ranges)
DelSeq []MsgDelRange `json:"delseq,omitempty"`
DelSeq []MsgRange `json:"delseq,omitempty"`
// User ID of the user or subscription to delete
User string `json:"user,omitempty"`
// Credential to delete
@@ -562,8 +568,8 @@ func (src *MsgTopicSub) describe() string {
// MsgDelValues describes request to delete messages.
type MsgDelValues struct {
DelId int `json:"clear,omitempty"`
DelSeq []MsgDelRange `json:"delseq,omitempty"`
DelId int `json:"clear,omitempty"`
DelSeq []MsgRange `json:"delseq,omitempty"`
}
// MsgServerCtrl is a server control message {ctrl}.
@@ -626,15 +632,15 @@ func (src *MsgServerData) describe() string {
// MsgServerPres is presence notification {pres} (authoritative update).
type MsgServerPres struct {
Topic string `json:"topic"`
Src string `json:"src,omitempty"`
What string `json:"what"`
UserAgent string `json:"ua,omitempty"`
SeqId int `json:"seq,omitempty"`
DelId int `json:"clear,omitempty"`
DelSeq []MsgDelRange `json:"delseq,omitempty"`
AcsTarget string `json:"tgt,omitempty"`
AcsActor string `json:"act,omitempty"`
Topic string `json:"topic"`
Src string `json:"src,omitempty"`
What string `json:"what"`
UserAgent string `json:"ua,omitempty"`
SeqId int `json:"seq,omitempty"`
DelId int `json:"clear,omitempty"`
DelSeq []MsgRange `json:"delseq,omitempty"`
AcsTarget string `json:"tgt,omitempty"`
AcsActor string `json:"act,omitempty"`
// Acs or a delta Acs. Need to marshal it to json under a name different than 'acs'
// to allow different handling on the client
Acs *MsgAccessMode `json:"dacs,omitempty"`
+34 -11
View File
@@ -2561,16 +2561,37 @@ func (a *adapter) MessageSave(msg *t.Message) error {
func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ([]t.Message, error) {
var limit = a.maxMessageResults
var lower = 0
var upper = 1<<31 - 1
args := []any{store.DecodeUid(forUser), topic}
seqIdConstraint := ""
if opts != nil {
if opts.Since > 0 {
lower = opts.Since
}
if opts.Before > 0 {
// MySQL BETWEEN is inclusive-inclusive, Tinode API requires inclusive-exclusive, thus -1
upper = opts.Before - 1
if len(opts.IdRanges) > 0 {
seqCount := 0
for _, r := range opts.IdRanges {
if r.Hi == 0 {
args = append(args, r.Low)
seqCount++
} else {
for i := r.Low; i < r.Hi; i++ {
args = append(args, i)
seqCount++
}
}
}
seqIdConstraint = "AND m.seqid IN (?" + strings.Repeat(",?", seqCount-1) + ")"
} else {
seqIdConstraint = "AND m.seqid BETWEEN ? AND ?"
if opts.Since > 0 {
args = append(args, opts.Since)
} else {
args = append(args, 0)
}
if opts.Before > 0 {
// MySQL BETWEEN is inclusive-inclusive, Tinode API requires inclusive-exclusive, thus -1
args = append(args, opts.Before-1)
} else {
args = append(args, 1<<31-1)
}
}
if opts.Limit > 0 && opts.Limit < limit {
@@ -2578,19 +2599,21 @@ func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) (
}
}
unum := store.DecodeUid(forUser)
args = append(args, limit)
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
rows, err := a.db.QueryxContext(
ctx,
"SELECT m.createdat,m.updatedat,m.deletedat,m.delid,m.seqid,m.topic,m.`from`,m.head,m.content"+
" FROM messages AS m LEFT JOIN dellog AS d"+
" ON d.topic=m.topic AND m.seqid BETWEEN d.low AND d.hi-1 AND d.deletedfor=?"+
" WHERE m.delid=0 AND m.topic=? AND m.seqid BETWEEN ? AND ? AND d.deletedfor IS NULL"+
" WHERE m.delid=0 AND m.topic=? "+seqIdConstraint+" AND d.deletedfor IS NULL"+
" ORDER BY m.seqid DESC LIMIT ?",
unum, topic, lower, upper, limit)
args...)
if err != nil {
return nil, err
+18 -3
View File
@@ -586,6 +586,13 @@ func pbGetQuerySerialize(in *MsgGetQuery) *pbx.GetQuery {
SinceId: int32(in.Data.SinceId),
Limit: int32(in.Data.Limit),
}
if len(in.Data.IdRanges) > 0 {
out.Data.Ranges = make([]*pbx.SeqRange, len(in.Data.IdRanges))
for i, dq := range in.Data.IdRanges {
out.Data.Ranges[i] = &pbx.SeqRange{Low: int32(dq.LowId), Hi: int32(dq.HiId)}
}
}
}
return out
}
@@ -617,6 +624,14 @@ func pbGetQueryDeserialize(in *pbx.GetQuery) *MsgGetQuery {
SinceId: int(data.GetSinceId()),
Limit: int(data.GetLimit()),
}
if ranges := data.GetRanges(); len(ranges) > 0 {
msg.Data.IdRanges = make([]MsgRange, len(ranges))
for i, sr := range ranges {
msg.Data.IdRanges[i].LowId = int(sr.GetLow())
msg.Data.IdRanges[i].HiId = int(sr.GetHi())
}
}
}
return &msg
@@ -1036,7 +1051,7 @@ func pbSubSliceDeserialize(subs []*pbx.TopicSub) []types.Subscription {
return out
}
func pbDelQuerySerialize(in []MsgDelRange) []*pbx.SeqRange {
func pbDelQuerySerialize(in []MsgRange) []*pbx.SeqRange {
if in == nil {
return nil
}
@@ -1049,12 +1064,12 @@ func pbDelQuerySerialize(in []MsgDelRange) []*pbx.SeqRange {
return out
}
func pbDelQueryDeserialize(in []*pbx.SeqRange) []MsgDelRange {
func pbDelQueryDeserialize(in []*pbx.SeqRange) []MsgRange {
if in == nil {
return nil
}
out := make([]MsgDelRange, len(in))
out := make([]MsgRange, len(in))
for i, sr := range in {
out[i].LowId = int(sr.GetLow())
out[i].HiId = int(sr.GetHi())
+2 -2
View File
@@ -14,7 +14,7 @@ type presParams struct {
userAgent string
seqID int
delID int
delSeq []MsgDelRange
delSeq []MsgRange
// Uid who performed the action
actor string
@@ -681,7 +681,7 @@ func (t *Topic) presPubMessageCount(uid types.Uid, mode types.AccessMode, read,
// Let other sessions of a given user know that messages are now deleted
// Cases V.1, V.2
func (t *Topic) presPubMessageDelete(uid types.Uid, mode types.AccessMode, delID int, list []MsgDelRange, skip string) {
func (t *Topic) presPubMessageDelete(uid types.Uid, mode types.AccessMode, delID int, list []MsgRange, skip string) {
if len(list) == 0 && delID <= 0 {
logs.Warn.Printf("Case V.1, V.2: topic[%s] invalid request - missing payload", t.name)
return
+3 -3
View File
@@ -911,7 +911,7 @@ func TestDispatchDelMsg(t *testing.T) {
Id: "123",
Topic: destUid.UserId(),
What: "msg",
DelSeq: []MsgDelRange{{LowId: 3, HiId: 4}},
DelSeq: []MsgRange{{LowId: 3, HiId: 4}},
Hard: true,
},
}
@@ -994,7 +994,7 @@ func TestDispatchDelMetaChanFull(t *testing.T) {
Id: "123",
Topic: destUid.UserId(),
What: "msg",
DelSeq: []MsgDelRange{{LowId: 3, HiId: 4}},
DelSeq: []MsgRange{{LowId: 3, HiId: 4}},
Hard: true,
},
}
@@ -1028,7 +1028,7 @@ func TestDispatchDelUnsubscribedSession(t *testing.T) {
Id: "123",
Topic: destUid.UserId(),
What: "msg",
DelSeq: []MsgDelRange{{LowId: 3, HiId: 4}},
DelSeq: []MsgRange{{LowId: 3, HiId: 4}},
Hard: true,
},
}
+2
View File
@@ -1278,6 +1278,8 @@ type QueryOpt struct {
Before int
// Common parameter
Limit int
// Ranges of IDs.
IdRanges []Range
}
// TopicCat is an enum of topic categories.
+2 -2
View File
@@ -3030,7 +3030,7 @@ func (t *Topic) replyGetDel(sess *Session, asUid types.Uid, req *MsgGetOpts, msg
Topic: toriginal,
Del: &MsgDelValues{
DelId: delID,
DelSeq: delrangeDeserialize(ranges),
DelSeq: rangeDeserialize(ranges),
},
Timestamp: &now,
},
@@ -3130,7 +3130,7 @@ func (t *Topic) replyDelMsg(sess *Session, asUid types.Uid, asChan bool, msg *Cl
// Increment Delete transaction ID
t.delID++
dr := delrangeDeserialize(ranges)
dr := rangeDeserialize(ranges)
if del.Hard {
for uid, pud := range t.perUser {
pud.delID = t.delID
+19 -4
View File
@@ -37,15 +37,29 @@ var tagRegexp = regexp.MustCompile(`^[-_+.!?#@\pL\pN]{1,96}$`)
const nullValue = "\u2421"
// Convert a list of IDs into ranges
func delrangeDeserialize(in []types.Range) []MsgDelRange {
// Convert database ranges into wire protocol ranges.
func rangeDeserialize(in []types.Range) []MsgRange {
if len(in) == 0 {
return nil
}
out := make([]MsgDelRange, 0, len(in))
out := make([]MsgRange, 0, len(in))
for _, r := range in {
out = append(out, MsgDelRange{LowId: r.Low, HiId: r.Hi})
out = append(out, MsgRange{LowId: r.Low, HiId: r.Hi})
}
return out
}
// Convert wire protocol ranges into database ranges.
func rangeSerialize(in []MsgRange) []types.Range {
if len(in) == 0 {
return nil
}
out := make([]types.Range, 0, len(in))
for _, r := range in {
out = append(out, types.Range{Low: r.LowId, Hi: r.HiId})
}
return out
@@ -215,6 +229,7 @@ func msgOpts2storeOpts(req *MsgGetOpts) *types.QueryOpt {
Limit: req.Limit,
Since: req.SinceId,
Before: req.BeforeId,
IdRanges: rangeSerialize(req.IdRanges),
}
}
return opts