From 7845eb01243d5866146101c78363c58f510e72e3 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 1 Jun 2022 23:24:44 -0400 Subject: [PATCH] So much logging --- docs/privacy.md | 4 +- docs/releases.md | 5 ++ log/log.go | 39 +++++++-- server/server.go | 170 +++++++++++++++++++++----------------- server/server.yml | 6 +- server/server_firebase.go | 12 +-- server/server_test.go | 16 ++-- server/smtp_sender.go | 51 +++++++++--- server/smtp_server.go | 45 +++++++--- server/topic.go | 6 +- server/util.go | 30 +++++++ web/public/home.html | 2 +- 12 files changed, 264 insertions(+), 122 deletions(-) diff --git a/docs/privacy.md b/docs/privacy.md index 5a36a1c8..f89f9aaa 100644 --- a/docs/privacy.md +++ b/docs/privacy.md @@ -8,5 +8,5 @@ any outside service. All data is exclusively used to make the service function p I use is Firebase Cloud Messaging (FCM) service, which is required to provide instant Android notifications (see [FAQ](faq.md) for details). To avoid FCM altogether, download the F-Droid version. -The web server does not log or otherwise store request paths, remote IP addresses or even topics or messages, -aside from a short on-disk cache to support service restarts. +For debugging purposes, the ntfy server may temporarily log request paths, remote IP addresses or even topics +or messages, though typically this is turned off. diff --git a/docs/releases.md b/docs/releases.md index 1a3fa4d6..a8d3fe10 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -13,6 +13,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release ## ntfy server v1.25.0 (UNRELEASED) +**Features:** + +* Advanced logging, with different log levels and hot reloading of the log level (no ticket) + **Bugs**: * Respect Firebase "quota exceeded" response for topics, block Firebase publishing for user for 10min ([#289](https://github.com/binwiederhier/ntfy/issues/289)) @@ -27,6 +31,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release * [Examples](examples.md) for [Home Assistant](https://www.home-assistant.io/) ([#282](https://github.com/binwiederhier/ntfy/pull/282), thanks to [@poblabs](https://github.com/poblabs)) * Install instructions for [NixOS/Nix](https://ntfy.sh/docs/install/#nixos-nix) ([#282](https://github.com/binwiederhier/ntfy/pull/282), thanks to [@arjan-s](https://github.com/arjan-s)) * Clarify `poll_request` wording for [iOS push notifications](https://ntfy.sh/docs/config/#ios-instant-notifications) ([#300](https://github.com/binwiederhier/ntfy/issues/300), thanks to [@prabirshrestha](https://github.com/prabirshrestha) for reporting) +* Example for using ntfy with docker-compose.yml without root privileges ([#304](https://github.com/binwiederhier/ntfy/pull/304), thanks to [@ksurl](https://github.com/ksurl)) **Additional translations:** diff --git a/log/log.go b/log/log.go index 36abc0e3..f105f720 100644 --- a/log/log.go +++ b/log/log.go @@ -11,7 +11,8 @@ type Level int // Well known log levels const ( - DebugLevel Level = iota + TraceLevel Level = iota + DebugLevel InfoLevel WarnLevel ErrorLevel @@ -19,6 +20,8 @@ const ( func (l Level) String() string { switch l { + case TraceLevel: + return "TRACE" case DebugLevel: return "DEBUG" case InfoLevel: @@ -36,7 +39,12 @@ var ( mu = &sync.Mutex{} ) -// Debug prints the given message, if the current log level is DEBUG +// Trace prints the given message, if the current log level is TRACE +func Trace(message string, v ...interface{}) { + logIf(TraceLevel, message, v...) +} + +// Debug prints the given message, if the current log level is DEBUG or lower func Debug(message string, v ...interface{}) { logIf(DebugLevel, message, v...) } @@ -78,20 +86,37 @@ func SetLevel(newLevel Level) { // ToLevel converts a string to a Level. It returns InfoLevel if the string // does not match any known log levels. func ToLevel(s string) Level { - switch strings.ToLower(s) { - case "debug": + switch strings.ToUpper(s) { + case "TRACE": + return TraceLevel + case "DEBUG": return DebugLevel - case "info": + case "INFO": return InfoLevel - case "warn", "warning": + case "WARN", "WARNING": return WarnLevel - case "error": + case "ERROR": return ErrorLevel default: return InfoLevel } } +// Loggable returns true if the given log level is lower or equal to the current log level +func Loggable(l Level) bool { + return CurrentLevel() <= l +} + +// IsTrace returns true if the current log level is TraceLevel +func IsTrace() bool { + return Loggable(TraceLevel) +} + +// IsDebug returns true if the current log level is DebugLevel or below +func IsDebug() bool { + return Loggable(DebugLevel) +} + func logIf(l Level, message string, v ...interface{}) { if CurrentLevel() <= l { log.Printf(l.String()+" "+message, v...) diff --git a/server/server.go b/server/server.go index 4688a03f..98299213 100644 --- a/server/server.go +++ b/server/server.go @@ -32,22 +32,22 @@ import ( // Server is the main server, providing the UI and API for ntfy type Server struct { - config *Config - httpServer *http.Server - httpsServer *http.Server - unixListener net.Listener - smtpServer *smtp.Server - smtpBackend *smtpBackend - topics map[string]*topic - visitors map[string]*visitor - firebaseClient *firebaseClient - mailer mailer - messages int64 - auth auth.Auther - messageCache *messageCache - fileCache *fileCache - closeChan chan bool - mu sync.Mutex + config *Config + httpServer *http.Server + httpsServer *http.Server + unixListener net.Listener + smtpServer *smtp.Server + smtpServerBackend *smtpBackend + smtpSender mailer + topics map[string]*topic + visitors map[string]*visitor + firebaseClient *firebaseClient + messages int64 + auth auth.Auther + messageCache *messageCache + fileCache *fileCache + closeChan chan bool + mu sync.Mutex } // handleFunc extends the normal http.HandlerFunc to be able to easily return errors @@ -147,7 +147,7 @@ func New(conf *Config) (*Server, error) { messageCache: messageCache, fileCache: fileCache, firebaseClient: firebaseClient, - mailer: mailer, + smtpSender: mailer, topics: topics, auth: auther, visitors: make(map[string]*visitor), @@ -246,14 +246,14 @@ func (s *Server) Stop() { func (s *Server) handle(w http.ResponseWriter, r *http.Request) { v := s.visitor(r) - log.Debug("%s HTTP %s %s", v.ip, r.Method, r.URL.Path) + log.Debug("%s Dispatching request", logHTTPPrefix(v, r)) if err := s.handleInternal(w, r, v); err != nil { if websocket.IsWebSocketUpgrade(r) { - isNormalError := websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || strings.Contains(err.Error(), "i/o timeout") + isNormalError := strings.Contains(err.Error(), "i/o timeout") if isNormalError { - log.Debug("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error()) + log.Debug("%s WebSocket error (this error is okay, it happens a lot): %s", logHTTPPrefix(v, r), err.Error()) } else { - log.Warn("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error()) + log.Warn("%s WebSocket error: %s", logHTTPPrefix(v, r), err.Error()) } return // Do not attempt to write to upgraded connection } @@ -261,13 +261,12 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) { if !ok { httpErr = errHTTPInternalError } - isNormalError := httpErr.Code == 404 + isNormalError := httpErr.HTTPCode == http.StatusNotFound if isNormalError { - log.Debug("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error()) + log.Debug("%s Connection closed with HTTP %d (ntfy error %d): %s", logHTTPPrefix(v, r), httpErr.HTTPCode, httpErr.Code, err.Error()) } else { - log.Info("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error()) + log.Info("%s Connection closed with HTTP %d (ntfy error %d): %s", logHTTPPrefix(v, r), httpErr.HTTPCode, httpErr.Code, err.Error()) } - w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests w.WriteHeader(httpErr.HTTPCode) @@ -444,8 +443,11 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito m.Message = emptyMessageBody } delayed := m.Time > time.Now().Unix() - log.Debug("%s Received message: ev=%s, body=%d bytes, delayed=%t, fb=%t, cache=%t, up=%t, email=%s", - logPrefix(v, m), m.Event, len(body.PeekedBytes), delayed, firebase, cache, unifiedpush, email) + log.Debug("%s Received message: event=%s, body=%d byte(s), delayed=%t, firebase=%t, cache=%t, up=%t, email=%s", + logMessagePrefix(v, m), m.Event, len(m.Message), delayed, firebase, cache, unifiedpush, email) + if log.IsTrace() { + log.Trace("%s Message body: %s", logMessagePrefix(v, m), maybeMarshalJSON(m)) + } if !delayed { if err := t.Publish(v, m); err != nil { return err @@ -453,14 +455,14 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito if s.firebaseClient != nil && firebase { go s.sendToFirebase(v, m) } - if s.mailer != nil && email != "" { + if s.smtpSender != nil && email != "" { go s.sendEmail(v, m, email) } if s.config.UpstreamBaseURL != "" { go s.forwardPollRequest(v, m) } } else { - log.Debug("%s Message delayed, will process later", logPrefix(v, m)) + log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m)) } if cache { if err := s.messageCache.AddMessage(m); err != nil { @@ -479,16 +481,16 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito } func (s *Server) sendToFirebase(v *visitor, m *message) { - log.Debug("%s Publishing to Firebase", logPrefix(v, m)) + log.Debug("%s Publishing to Firebase", logMessagePrefix(v, m)) if err := s.firebaseClient.Send(v, m); err != nil { - log.Warn("%s Unable to publish to Firebase: %v", logPrefix(v, m), err.Error()) + log.Warn("%s Unable to publish to Firebase: %v", logMessagePrefix(v, m), err.Error()) } } func (s *Server) sendEmail(v *visitor, m *message, email string) { - log.Debug("%s Sending email to %s", logPrefix(v, m), email) - if err := s.mailer.Send(v.ip, email, m); err != nil { - log.Warn("%s Unable to send email: %v", logPrefix(v, m), err.Error()) + log.Debug("%s Sending email to %s", logMessagePrefix(v, m), email) + if err := s.smtpSender.Send(v, m, email); err != nil { + log.Warn("%s Unable to send email: %v", logMessagePrefix(v, m), err.Error()) } } @@ -496,10 +498,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) { topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic) topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL))) forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash) - log.Debug("%s Publishing poll request to %s", logPrefix(v, m), forwardURL) + log.Debug("%s Publishing poll request to %s", logMessagePrefix(v, m), forwardURL) req, err := http.NewRequest("POST", forwardURL, strings.NewReader("")) if err != nil { - log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error()) + log.Warn("%s Unable to publish poll request: %v", logMessagePrefix(v, m), err.Error()) return } req.Header.Set("X-Poll-ID", m.ID) @@ -508,10 +510,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) { } response, err := httpClient.Do(req) if err != nil { - log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error()) + log.Warn("%s Unable to publish poll request: %v", logMessagePrefix(v, m), err.Error()) return } else if response.StatusCode != http.StatusOK { - log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logPrefix(v, m), response.StatusCode) + log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logMessagePrefix(v, m), response.StatusCode) return } } @@ -553,7 +555,7 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca return false, false, "", false, errHTTPTooManyRequestsLimitEmails } } - if s.mailer == nil && email != "" { + if s.smtpSender == nil && email != "" { return false, false, "", false, errHTTPBadRequestEmailDisabled } messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n") @@ -627,7 +629,7 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca // If file.txt is > message limit, treat it as an attachment func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, unifiedpush bool) error { if m.Event == pollRequestEvent { // Case 1 - return nil + return s.handleBodyDiscard(body) } else if unifiedpush { return s.handleBodyAsMessageAutoDetect(m, body) // Case 2 } else if m.Attachment != nil && m.Attachment.URL != "" { @@ -640,6 +642,12 @@ func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body return s.handleBodyAsAttachment(r, v, m, body) // Case 6 } +func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error { + _, err := io.Copy(io.Discard, body) + _ = body.Close() + return err +} + func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error { if utf8.Valid(body.PeekedBytes) { m.Message = string(body.PeekedBytes) // Do not trim @@ -739,6 +747,8 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v } func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error { + log.Debug("%s HTTP stream connection opened", logHTTPPrefix(v, r)) + defer log.Debug("%s HTTP stream connection closed", logHTTPPrefix(v, r)) if err := v.SubscriptionAllowed(); err != nil { return errHTTPTooManyRequestsLimitSubscriptions } @@ -795,6 +805,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v * case <-r.Context().Done(): return nil case <-time.After(s.config.KeepaliveInterval): + log.Trace("%s Sending keepalive message", logHTTPPrefix(v, r)) v.Keepalive() if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message return err @@ -811,6 +822,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return errHTTPTooManyRequestsLimitSubscriptions } defer v.RemoveSubscription() + log.Debug("%s WebSocket connection opened", logHTTPPrefix(v, r)) + defer log.Debug("%s WebSocket connection closed", logHTTPPrefix(v, r)) topics, topicsStr, err := s.topicsFromPath(r.URL.Path) if err != nil { return err @@ -840,6 +853,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return err } conn.SetPongHandler(func(appData string) error { + log.Trace("%s Received WebSocket pong", logHTTPPrefix(v, r)) return conn.SetReadDeadline(time.Now().Add(pongWait)) }) for { @@ -856,6 +870,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { return err } + log.Trace("%s Sending WebSocket ping", logHTTPPrefix(v, r)) return conn.WriteMessage(websocket.PingMessage, nil) } for { @@ -901,8 +916,9 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return err } err = g.Wait() - if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { - return nil // Normal closures are not errors + if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Trace("%s WebSocket connection closed: %s", logHTTPPrefix(v, r), err.Error()) + return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot } return err } @@ -1025,12 +1041,15 @@ func (s *Server) updateStatsAndPrune() { defer s.mu.Unlock() // Expire visitors from rate visitors map + staleVisitors := 0 for ip, v := range s.visitors { if v.Stale() { log.Debug("Deleting stale visitor %s", v.ip) delete(s.visitors, ip) + staleVisitors++ } } + log.Debug("Manager: Deleted %d stale visitor(s)", staleVisitors) // Delete expired attachments if s.fileCache != nil { @@ -1038,20 +1057,20 @@ func (s *Server) updateStatsAndPrune() { if err != nil { log.Warn("Error retrieving expired attachments: %s", err.Error()) } else if len(ids) > 0 { - log.Debug("Deleting expired attachments: %v", ids) + log.Debug("Manager: Deleting expired attachments: %v", ids) if err := s.fileCache.Remove(ids...); err != nil { log.Warn("Error deleting attachments: %s", err.Error()) } } else { - log.Debug("No expired attachments to delete") + log.Debug("Manager: No expired attachments to delete") } } // Prune message cache olderThan := time.Now().Add(-1 * s.config.CacheDuration) - log.Debug("Pruning messages older tha %v", olderThan) + log.Debug("Manager: Pruning messages older than %s", olderThan.Format("2006-01-02 15:04:05")) if err := s.messageCache.Prune(olderThan); err != nil { - log.Warn("Error pruning cache: %s", err.Error()) + log.Warn("Manager: Error pruning cache: %s", err.Error()) } // Prune old topics, remove subscriptions without subscribers @@ -1060,7 +1079,7 @@ func (s *Server) updateStatsAndPrune() { subs := t.Subscribers() msgs, err := s.messageCache.MessageCount(t.ID) if err != nil { - log.Warn("Cannot get stats for topic %s: %s", t.ID, err.Error()) + log.Warn("Manager: Cannot get stats for topic %s: %s", t.ID, err.Error()) continue } if msgs == 0 && subs == 0 { @@ -1072,19 +1091,25 @@ func (s *Server) updateStatsAndPrune() { } // Mail stats - var mailSuccess, mailFailure int64 - if s.smtpBackend != nil { - mailSuccess, mailFailure = s.smtpBackend.Counts() + var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64 + if s.smtpServerBackend != nil { + receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts() + } + var sentMailTotal, sentMailSuccess, sentMailFailure int64 + if s.smtpSender != nil { + sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts() } // Print stats - log.Info("Stats: %d message(s) published, %d in cache, %d successful mails, %d failed, %d topic(s) active, %d subscriber(s), %d visitor(s)", - s.messages, messages, mailSuccess, mailFailure, len(s.topics), subscribers, len(s.visitors)) + log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)", + s.messages, messages, len(s.topics), subscribers, len(s.visitors), + receivedMailTotal, receivedMailSuccess, receivedMailFailure, + sentMailTotal, sentMailSuccess, sentMailFailure) } func (s *Server) runSMTPServer() error { - s.smtpBackend = newMailBackend(s.config, s.handle) - s.smtpServer = smtp.NewServer(s.smtpBackend) + s.smtpServerBackend = newMailBackend(s.config, s.handle) + s.smtpServer = smtp.NewServer(s.smtpServerBackend) s.smtpServer.Addr = s.config.SMTPServerListen s.smtpServer.Domain = s.config.SMTPServerDomain s.smtpServer.ReadTimeout = 10 * time.Second @@ -1099,7 +1124,6 @@ func (s *Server) runManager() { for { select { case <-time.After(s.config.ManagerInterval): - log.Debug("Running manager") s.updateStatsAndPrune() case <-s.closeChan: return @@ -1107,19 +1131,6 @@ func (s *Server) runManager() { } } -func (s *Server) runDelayedSender() { - for { - select { - case <-time.After(s.config.DelayedSenderInterval): - if err := s.sendDelayedMessages(); err != nil { - log.Warn("error sending scheduled messages: %s", err.Error()) - } - case <-s.closeChan: - return - } - } -} - func (s *Server) runFirebaseKeepaliver() { if s.firebaseClient == nil { return @@ -1137,6 +1148,19 @@ func (s *Server) runFirebaseKeepaliver() { } } +func (s *Server) runDelayedSender() { + for { + select { + case <-time.After(s.config.DelayedSenderInterval): + if err := s.sendDelayedMessages(); err != nil { + log.Warn("Error sending delayed messages: %s", err.Error()) + } + case <-s.closeChan: + return + } + } +} + func (s *Server) sendDelayedMessages() error { messages, err := s.messageCache.MessagesDue() if err != nil { @@ -1145,7 +1169,7 @@ func (s *Server) sendDelayedMessages() error { for _, m := range messages { v := s.visitorFromIP(m.Sender) if err := s.sendDelayedMessage(v, m); err != nil { - log.Warn("%s Error sending delayed message: %s", logPrefix(v, m), err.Error()) + log.Warn("%s Error sending delayed message: %s", logMessagePrefix(v, m), err.Error()) } } return nil @@ -1154,13 +1178,13 @@ func (s *Server) sendDelayedMessages() error { func (s *Server) sendDelayedMessage(v *visitor, m *message) error { s.mu.Lock() defer s.mu.Unlock() - log.Debug("%s Sending delayed message", logPrefix(v, m)) + log.Debug("%s Sending delayed message", logMessagePrefix(v, m)) t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published if ok { go func() { // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler if err := t.Publish(v, m); err != nil { - log.Warn("%s Unable to publish message: %v", logPrefix(v, m), err.Error()) + log.Warn("%s Unable to publish message: %v", logMessagePrefix(v, m), err.Error()) } }() } @@ -1333,7 +1357,3 @@ func (s *Server) visitorFromIP(ip string) *visitor { v.Keepalive() return v } - -func logPrefix(v *visitor, m *message) string { - return fmt.Sprintf("%s/%s/%s", v.ip, m.Topic, m.ID) -} diff --git a/server/server.yml b/server/server.yml index f6c14a64..2d62b704 100644 --- a/server/server.yml +++ b/server/server.yml @@ -51,6 +51,7 @@ # cache-file: # cache-duration: "12h" + # If set, access to the ntfy server and API can be controlled on a granular level using # the 'ntfy user' and 'ntfy access' commands. See the --help pages for details, or check the docs. # @@ -179,7 +180,10 @@ # visitor-attachment-total-size-limit: "100M" # visitor-attachment-daily-bandwidth-limit: "500M" -# Log level, can be DEBUG, INFO, WARN or ERROR +# Log level, can be TRACE, DEBUG, INFO, WARN or ERROR # This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy". # +# Be aware that DEBUG (and particularly TRACE) can be VERY CHATTY. Only turn them on for +# debugging purposes, or your disk will fill up quickly. +# # log-level: INFO diff --git a/server/server_firebase.go b/server/server_firebase.go index b34348d6..2abc10de 100644 --- a/server/server_firebase.go +++ b/server/server_firebase.go @@ -4,14 +4,13 @@ import ( "context" "encoding/json" "errors" - "fmt" - "log" - "strings" - firebase "firebase.google.com/go/v4" "firebase.google.com/go/v4/messaging" + "fmt" "google.golang.org/api/option" "heckel.io/ntfy/auth" + "heckel.io/ntfy/log" + "strings" ) const ( @@ -45,9 +44,12 @@ func (c *firebaseClient) Send(v *visitor, m *message) error { if err != nil { return err } + if log.IsTrace() { + log.Trace("%s Firebase message: %s", logMessagePrefix(v, m), maybeMarshalJSON(fbm)) + } err = c.sender.Send(fbm) if err == errFirebaseQuotaExceeded { - log.Printf("[%s] FB quota exceeded for topic %s, temporarily denying FB access to visitor", v.ip, m.Topic) + log.Warn("%s Firebase quota exceeded (likely for topic), temporarily denying Firebase access to visitor", logMessagePrefix(v, m)) v.FirebaseTemporarilyDeny() } return err diff --git a/server/server_test.go b/server/server_test.go index ce63f272..8010643f 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -477,7 +477,7 @@ func TestServer_PublishMessageInHeaderWithNewlines(t *testing.T) { func TestServer_PublishInvalidTopic(t *testing.T) { s := newTestServer(t, newTestConfig(t)) - s.mailer = &testMailer{} + s.smtpSender = &testMailer{} response := request(t, s, "PUT", "/docs", "fail", nil) require.Equal(t, 40010, toHTTPError(t, response.Body.String()).Code) } @@ -743,13 +743,17 @@ type testMailer struct { mu sync.Mutex } -func (t *testMailer) Send(from, to string, m *message) error { +func (t *testMailer) Send(v *visitor, m *message, to string) error { t.mu.Lock() defer t.mu.Unlock() t.count++ return nil } +func (t *testMailer) Counts() (total int64, success int64, failure int64) { + return 0, 0, 0 +} + func (t *testMailer) Count() int { t.mu.Lock() defer t.mu.Unlock() @@ -795,7 +799,7 @@ func TestServer_PublishTooRequests_ShortReplenish(t *testing.T) { func TestServer_PublishTooManyEmails_Defaults(t *testing.T) { s := newTestServer(t, newTestConfig(t)) - s.mailer = &testMailer{} + s.smtpSender = &testMailer{} for i := 0; i < 16; i++ { response := request(t, s, "PUT", "/mytopic", fmt.Sprintf("message %d", i), map[string]string{ "E-Mail": "test@example.com", @@ -812,7 +816,7 @@ func TestServer_PublishTooManyEmails_Replenish(t *testing.T) { c := newTestConfig(t) c.VisitorEmailLimitReplenish = 500 * time.Millisecond s := newTestServer(t, c) - s.mailer = &testMailer{} + s.smtpSender = &testMailer{} for i := 0; i < 16; i++ { response := request(t, s, "PUT", "/mytopic", fmt.Sprintf("message %d", i), map[string]string{ "E-Mail": "test@example.com", @@ -838,7 +842,7 @@ func TestServer_PublishTooManyEmails_Replenish(t *testing.T) { func TestServer_PublishDelayedEmail_Fail(t *testing.T) { s := newTestServer(t, newTestConfig(t)) - s.mailer = &testMailer{} + s.smtpSender = &testMailer{} response := request(t, s, "PUT", "/mytopic", "fail", map[string]string{ "E-Mail": "test@example.com", "Delay": "20 min", @@ -956,7 +960,7 @@ func TestServer_PublishAsJSON(t *testing.T) { func TestServer_PublishAsJSON_WithEmail(t *testing.T) { mailer := &testMailer{} s := newTestServer(t, newTestConfig(t)) - s.mailer = mailer + s.smtpSender = mailer body := `{"topic":"mytopic","message":"A message","email":"phil@example.com"}` response := request(t, s, "PUT", "/", body, nil) require.Equal(t, 200, response.Code) diff --git a/server/smtp_sender.go b/server/smtp_sender.go index 15f004c1..1ccaf084 100644 --- a/server/smtp_sender.go +++ b/server/smtp_sender.go @@ -4,33 +4,62 @@ import ( _ "embed" // required by go:embed "encoding/json" "fmt" + "heckel.io/ntfy/log" "heckel.io/ntfy/util" "mime" "net" "net/smtp" "strings" + "sync" "time" ) type mailer interface { - Send(from, to string, m *message) error + Send(v *visitor, m *message, to string) error + Counts() (total int64, success int64, failure int64) } type smtpSender struct { - config *Config + config *Config + success int64 + failure int64 + mu sync.Mutex } -func (s *smtpSender) Send(senderIP, to string, m *message) error { - host, _, err := net.SplitHostPort(s.config.SMTPSenderAddr) +func (s *smtpSender) Send(v *visitor, m *message, to string) error { + return s.withCount(v, m, func() error { + host, _, err := net.SplitHostPort(s.config.SMTPSenderAddr) + if err != nil { + return err + } + message, err := formatMail(s.config.BaseURL, v.ip, s.config.SMTPSenderFrom, to, m) + if err != nil { + return err + } + auth := smtp.PlainAuth("", s.config.SMTPSenderUser, s.config.SMTPSenderPass, host) + log.Debug("%s Sending mail: via=%s, user=%s, pass=***, to=%s", logMessagePrefix(v, m), s.config.SMTPSenderAddr, s.config.SMTPSenderUser, to) + log.Trace("%s Mail body: %s", logMessagePrefix(v, m), message) + return smtp.SendMail(s.config.SMTPSenderAddr, auth, s.config.SMTPSenderFrom, []string{to}, []byte(message)) + }) +} + +func (s *smtpSender) Counts() (total int64, success int64, failure int64) { + s.mu.Lock() + defer s.mu.Unlock() + return s.success + s.failure, s.success, s.failure +} + +func (s *smtpSender) withCount(v *visitor, m *message, fn func() error) error { + err := fn() + s.mu.Lock() + defer s.mu.Unlock() if err != nil { - return err + log.Debug("%s Sending mail failed: %s", logMessagePrefix(v, m), err.Error()) + s.failure++ + } else { + s.success++ } - message, err := formatMail(s.config.BaseURL, senderIP, s.config.SMTPSenderFrom, to, m) - if err != nil { - return err - } - auth := smtp.PlainAuth("", s.config.SMTPSenderUser, s.config.SMTPSenderPass, host) - return smtp.SendMail(s.config.SMTPSenderAddr, auth, s.config.SMTPSenderFrom, []string{to}, []byte(message)) + return err } func formatMail(baseURL, senderIP, from, to string, m *message) (string, error) { diff --git a/server/smtp_server.go b/server/smtp_server.go index 7812371e..3f4b9b68 100644 --- a/server/smtp_server.go +++ b/server/smtp_server.go @@ -5,9 +5,11 @@ import ( "errors" "fmt" "github.com/emersion/go-smtp" + "heckel.io/ntfy/log" "io" "mime" "mime/multipart" + "net" "net/http" "net/http/httptest" "net/mail" @@ -40,36 +42,41 @@ func newMailBackend(conf *Config, handler func(http.ResponseWriter, *http.Reques } func (b *smtpBackend) Login(state *smtp.ConnectionState, username, password string) (smtp.Session, error) { - return &smtpSession{backend: b, remoteAddr: state.RemoteAddr.String()}, nil + log.Debug("%s Incoming mail, login with user %s", logSMTPPrefix(state), username) + return &smtpSession{backend: b, state: state}, nil } func (b *smtpBackend) AnonymousLogin(state *smtp.ConnectionState) (smtp.Session, error) { - return &smtpSession{backend: b, remoteAddr: state.RemoteAddr.String()}, nil + log.Debug("%s Incoming mail, anonymous login", logSMTPPrefix(state)) + return &smtpSession{backend: b, state: state}, nil } -func (b *smtpBackend) Counts() (success int64, failure int64) { +func (b *smtpBackend) Counts() (total int64, success int64, failure int64) { b.mu.Lock() defer b.mu.Unlock() - return b.success, b.failure + return b.success + b.failure, b.success, b.failure } // smtpSession is returned after EHLO. type smtpSession struct { - backend *smtpBackend - remoteAddr string - topic string - mu sync.Mutex + backend *smtpBackend + state *smtp.ConnectionState + topic string + mu sync.Mutex } func (s *smtpSession) AuthPlain(username, password string) error { + log.Debug("%s AUTH PLAIN (with username %s)", logSMTPPrefix(s.state), username) return nil } func (s *smtpSession) Mail(from string, opts smtp.MailOptions) error { + log.Debug("%s MAIL FROM: %s (with options: %#v)", logSMTPPrefix(s.state), from, opts) return nil } func (s *smtpSession) Rcpt(to string) error { + log.Debug("%s RCPT TO: %s", logSMTPPrefix(s.state), to) return s.withFailCount(func() error { conf := s.backend.config addressList, err := mail.ParseAddressList(to) @@ -106,6 +113,11 @@ func (s *smtpSession) Data(r io.Reader) error { if err != nil { return err } + if log.IsTrace() { + log.Trace("%s DATA: %s", logSMTPPrefix(s.state), string(b)) + } else if log.IsDebug() { + log.Debug("%s DATA: %d byte(s)", logSMTPPrefix(s.state), len(b)) + } msg, err := mail.ReadMessage(bytes.NewReader(b)) if err != nil { return err @@ -143,10 +155,18 @@ func (s *smtpSession) Data(r io.Reader) error { } func (s *smtpSession) publishMessage(m *message) error { + // Extract remote address (for rate limiting) + remoteAddr, _, err := net.SplitHostPort(s.state.RemoteAddr.String()) + if err != nil { + remoteAddr = s.state.RemoteAddr.String() + } + + // Call HTTP handler with fake HTTP request url := fmt.Sprintf("%s/%s", s.backend.config.BaseURL, m.Topic) - req, err := http.NewRequest("PUT", url, strings.NewReader(m.Message)) - req.RemoteAddr = s.remoteAddr // rate limiting!! - req.Header.Set("X-Forwarded-For", s.remoteAddr) + req, err := http.NewRequest("POST", url, strings.NewReader(m.Message)) + req.RequestURI = "/" + m.Topic // just for the logs + req.RemoteAddr = remoteAddr // rate limiting!! + req.Header.Set("X-Forwarded-For", remoteAddr) if err != nil { return err } @@ -176,6 +196,9 @@ func (s *smtpSession) withFailCount(fn func() error) error { s.backend.mu.Lock() defer s.backend.mu.Unlock() if err != nil { + // Almost all of these errors are parse errors, and user input errors. + // We do not want to spam the log with WARN messages. + log.Debug("%s Incoming mail error: %s", logSMTPPrefix(s.state), err.Error()) s.backend.failure++ } return err diff --git a/server/topic.go b/server/topic.go index 3cb11394..889f1eb7 100644 --- a/server/topic.go +++ b/server/topic.go @@ -47,14 +47,14 @@ func (t *topic) Publish(v *visitor, m *message) error { t.mu.Lock() defer t.mu.Unlock() if len(t.subscribers) > 0 { - log.Debug("%s Forwarding to %d subscriber(s)", logPrefix(v, m), len(t.subscribers)) + log.Debug("%s Forwarding to %d subscriber(s)", logMessagePrefix(v, m), len(t.subscribers)) for _, s := range t.subscribers { if err := s(v, m); err != nil { - log.Warn("%s Error forwarding to subscriber", logPrefix(v, m)) + log.Warn("%s Error forwarding to subscriber", logMessagePrefix(v, m)) } } } else { - log.Debug("%s No subscribers, not forwarding", logPrefix(v, m)) + log.Trace("%s No stream or WebSocket subscribers, not forwarding", logMessagePrefix(v, m)) } }() return nil diff --git a/server/util.go b/server/util.go index 7c596344..ffd07d15 100644 --- a/server/util.go +++ b/server/util.go @@ -1,6 +1,9 @@ package server import ( + "encoding/json" + "fmt" + "github.com/emersion/go-smtp" "net/http" "strings" ) @@ -40,3 +43,30 @@ func readQueryParam(r *http.Request, names ...string) string { } return "" } + +func logMessagePrefix(v *visitor, m *message) string { + return fmt.Sprintf("%s/%s/%s", v.ip, m.Topic, m.ID) +} + +func logHTTPPrefix(v *visitor, r *http.Request) string { + requestURI := r.RequestURI + if requestURI == "" { + requestURI = r.URL.Path + } + return fmt.Sprintf("%s HTTP %s %s", v.ip, r.Method, requestURI) +} + +func logSMTPPrefix(state *smtp.ConnectionState) string { + return fmt.Sprintf("%s/%s SMTP", state.Hostname, state.RemoteAddr.String()) +} + +func maybeMarshalJSON(v interface{}) string { + messageJSON, err := json.MarshalIndent(v, "", " ") + if err != nil { + return "" + } + if len(messageJSON) > 5000 { + return string(messageJSON)[:5000] + } + return string(messageJSON) +} diff --git a/web/public/home.html b/web/public/home.html index 3e3a95d8..43007ca3 100644 --- a/web/public/home.html +++ b/web/public/home.html @@ -110,7 +110,7 @@

- +

Here's a video showing the app in action: