Subscribe to more than one topic

This commit is contained in:
Philipp Heckel 2021-11-15 07:56:58 -05:00
parent a481f4c448
commit 52136030be

View File

@ -78,9 +78,9 @@ const (
var ( var (
topicRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app! topicRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
jsonRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/json$`) jsonRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
sseRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/sse$`) sseRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
rawRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/raw$`) rawRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
staticRegex = regexp.MustCompile(`^/static/.+`) staticRegex = regexp.MustCompile(`^/static/.+`)
@ -223,7 +223,7 @@ func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request) error {
} }
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error { func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
t, err := s.topic(r.URL.Path[1:]) t, err := s.topicFromID(r.URL.Path[1:])
if err != nil { if err != nil {
return err return err
} }
@ -289,7 +289,9 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
return errHTTPTooManyRequests return errHTTPTooManyRequests
} }
defer v.RemoveSubscription() defer v.RemoveSubscription()
t, err := s.topic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack topicsStr := strings.TrimSuffix(r.URL.Path[1:], "/"+format) // Hack
topicIDs := strings.Split(topicsStr, ",")
topics, err := s.topicsFromIDs(topicIDs...)
if err != nil { if err != nil {
return err return err
} }
@ -314,14 +316,21 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset! w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
if poll { if poll {
return s.sendOldMessages(t, since, sub) return s.sendOldMessages(topics, since, sub)
} }
subscriberID := t.Subscribe(sub) subscriberIDs := make([]int, 0)
defer t.Unsubscribe(subscriberID) for _, t := range topics {
if err := sub(newOpenMessage(t.id)); err != nil { // Send out open message subscriberIDs = append(subscriberIDs, t.Subscribe(sub))
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message
return err return err
} }
if err := s.sendOldMessages(t, since, sub); err != nil { if err := s.sendOldMessages(topics, since, sub); err != nil {
return err return err
} }
for { for {
@ -330,25 +339,27 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
return nil return nil
case <-time.After(s.config.KeepaliveInterval): case <-time.After(s.config.KeepaliveInterval):
v.Keepalive() v.Keepalive()
if err := sub(newKeepaliveMessage(t.id)); err != nil { // Send keepalive message if err := sub(newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
return err return err
} }
} }
} }
} }
func (s *Server) sendOldMessages(t *topic, since sinceTime, sub subscriber) error { func (s *Server) sendOldMessages(topics []*topic, since sinceTime, sub subscriber) error {
if since.IsNone() { if since.IsNone() {
return nil return nil
} }
messages, err := s.cache.Messages(t.id, since) for _, t := range topics {
if err != nil { messages, err := s.cache.Messages(t.id, since)
return err if err != nil {
}
for _, m := range messages {
if err := sub(m); err != nil {
return err return err
} }
for _, m := range messages {
if err := sub(m); err != nil {
return err
}
}
} }
return nil return nil
} }
@ -382,19 +393,31 @@ func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
return nil return nil
} }
func (s *Server) topic(id string) (*topic, error) { func (s *Server) topicFromID(id string) (*topic, error) {
topics, err := s.topicsFromIDs(id)
if err != nil {
return nil, err
}
return topics[0], nil
}
func (s *Server) topicsFromIDs(ids... string) ([]*topic, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if _, ok := s.topics[id]; !ok { topics := make([]*topic, 0)
if len(s.topics) >= s.config.GlobalTopicLimit { for _, id := range ids {
return nil, errHTTPTooManyRequests if _, ok := s.topics[id]; !ok {
} if len(s.topics) >= s.config.GlobalTopicLimit {
s.topics[id] = newTopic(id, time.Now()) return nil, errHTTPTooManyRequests
if s.firebase != nil { }
s.topics[id].Subscribe(s.firebase) s.topics[id] = newTopic(id, time.Now())
if s.firebase != nil {
s.topics[id].Subscribe(s.firebase)
}
} }
topics = append(topics, s.topics[id])
} }
return s.topics[id], nil return topics, nil
} }
func (s *Server) updateStatsAndExpire() { func (s *Server) updateStatsAndExpire() {