Merge pull request #48 from binwiederhier/delay

WIP: Add 'At:'/'In:' headers to support scheduled messages
This commit is contained in:
Philipp C. Heckel 2021-12-11 00:14:17 -05:00 committed by GitHub
commit 281faeff3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 827 additions and 231 deletions

View File

@ -11,6 +11,10 @@ const (
DefaultCacheDuration = 12 * time.Hour DefaultCacheDuration = 12 * time.Hour
DefaultKeepaliveInterval = 30 * time.Second DefaultKeepaliveInterval = 30 * time.Second
DefaultManagerInterval = time.Minute DefaultManagerInterval = time.Minute
DefaultAtSenderInterval = 10 * time.Second
DefaultMinDelay = 10 * time.Second
DefaultMaxDelay = 3 * 24 * time.Hour
DefaultMessageLimit = 512
) )
// Defines all the limits // Defines all the limits
@ -35,6 +39,10 @@ type Config struct {
CacheDuration time.Duration CacheDuration time.Duration
KeepaliveInterval time.Duration KeepaliveInterval time.Duration
ManagerInterval time.Duration ManagerInterval time.Duration
AtSenderInterval time.Duration
MessageLimit int
MinDelay time.Duration
MaxDelay time.Duration
GlobalTopicLimit int GlobalTopicLimit int
VisitorRequestLimitBurst int VisitorRequestLimitBurst int
VisitorRequestLimitReplenish time.Duration VisitorRequestLimitReplenish time.Duration
@ -54,6 +62,10 @@ func New(listenHTTP string) *Config {
CacheDuration: DefaultCacheDuration, CacheDuration: DefaultCacheDuration,
KeepaliveInterval: DefaultKeepaliveInterval, KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval, ManagerInterval: DefaultManagerInterval,
MessageLimit: DefaultMessageLimit,
MinDelay: DefaultMinDelay,
MaxDelay: DefaultMaxDelay,
AtSenderInterval: DefaultAtSenderInterval,
GlobalTopicLimit: DefaultGlobalTopicLimit, GlobalTopicLimit: DefaultGlobalTopicLimit,
VisitorRequestLimitBurst: DefaultVisitorRequestLimitBurst, VisitorRequestLimitBurst: DefaultVisitorRequestLimitBurst,
VisitorRequestLimitReplenish: DefaultVisitorRequestLimitReplenish, VisitorRequestLimitReplenish: DefaultVisitorRequestLimitReplenish,

View File

@ -32,7 +32,7 @@ You can also entirely disable the cache by setting `cache-duration` to `0`. When
passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward
the message to the subscribers. the message to the subscribers.
Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling), as well as the Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling-for-messages), as well as the
[`since=` parameter](subscribe/api.md#fetching-cached-messages). [`since=` parameter](subscribe/api.md#fetching-cached-messages).
## Behind a proxy (TLS, etc.) ## Behind a proxy (TLS, etc.)

View File

@ -332,6 +332,85 @@ them with a comma, e.g. `tag1,tag2,tag3`.
<figcaption>Detail view of notifications with tags</figcaption> <figcaption>Detail view of notifications with tags</figcaption>
</figure> </figure>
## Scheduled delivery
You can delay the delivery of messages and let ntfy send them at a later date. This can be used to send yourself
reminders or even to execute commands at a later date (if your subscriber acts on messages).
Usage is pretty straight forward. You can set the delivery time using the `X-Delay` header (or any of its aliases: `Delay`,
`X-At`, `At`, `X-In` or `In`), either by specifying a Unix timestamp (e.g. `1639194738`), a duration (e.g. `30m`,
`3h`, `2 days`), or a natural language time string (e.g. `10am`, `8:30pm`, `tomorrow, 3pm`, `Tuesday, 7am`,
[and more](https://github.com/olebedev/when)).
As of today, the minimum delay you can set is **10 seconds** and the maximum delay is **3 days**. This can currently
not be configured otherwise ([let me know](https://github.com/binwiederhier/ntfy/issues) if you'd like to change
these limits).
For the purposes of [message caching](config.md#message-cache), scheduled messages are kept in the cache until 12 hours
after they were delivered (or whatever the server-side cache duration is set to). For instance, if a message is scheduled
to be delivered in 3 days, it'll remain in the cache for 3 days and 12 hours. Also note that naturally,
[turning off server-side caching](#message-caching) is not possible in combination with this feature.
=== "Command line (curl)"
```
curl -H "At: tomorrow, 10am" -d "Good morning" ntfy.sh/hello
curl -H "In: 30min" -d "It's 30 minutes later now" ntfy.sh/reminder
curl -H "Delay: 1639194738" -d "Unix timestamps are awesome" ntfy.sh/itsaunixsystem
```
=== "HTTP"
``` http
POST /hello HTTP/1.1
Host: ntfy.sh
At: tomorrow, 10am
Good morning
```
=== "JavaScript"
``` javascript
fetch('https://ntfy.sh/hello', {
method: 'POST',
body: 'Good morning',
headers: { 'At': 'tomorrow, 10am' }
})
```
=== "Go"
``` go
req, _ := http.NewRequest("POST", "https://ntfy.sh/hello", strings.NewReader("Good morning"))
req.Header.Set("At", "tomorrow, 10am")
http.DefaultClient.Do(req)
```
=== "PHP"
``` php-inline
file_get_contents('https://ntfy.sh/backups', false, stream_context_create([
'http' => [
'method' => 'POST',
'header' =>
"Content-Type: text/plain\r\n" .
"At: tomorrow, 10am",
'content' => 'Good morning'
]
]));
```
Here are a few examples (assuming today's date is **12/10/2021, 9am, Eastern Time Zone**):
<table class="remove-md-box"><tr>
<td>
<table><thead><tr><th><code>Delay/At/In</code> header</th><th>Message will be delivered at</th><th>Explanation</th></tr></thead><tbody>
<tr><td><code>30m</code></td><td>12/10/2021, 9:<b>30</b>am</td><td>30 minutes from now</td></tr>
<tr><td><code>2 hours</code></td><td>12/10/2021, <b>11:30</b>am</td><td>2 hours from now</td></tr>
<tr><td><code>1 day</code></td><td>12/<b>11</b>/2021, 9am</td><td>24 hours from now</td></tr>
<tr><td><code>10am</code></td><td>12/10/2021, <b>10am</b></td><td>Today at 10am (same day, because it's only 9am)</td></tr>
<tr><td><code>8am</code></td><td>12/<b>11</b>/2021, <b>8am</b></td><td>Tomorrow at 8am (because it's 9am already)</td></tr>
<tr><td><code>1639152000</code></td><td>12/10/2021, 11am (EST)</td><td> Today at 11am (EST)</td></tr>
</tbody></table>
</td>
</tr></table>
## Advanced features ## Advanced features
### Message caching ### Message caching
@ -347,7 +426,7 @@ client-side network disruptions, but arguably this feature also may raise privac
To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`. To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`.
This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages
are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetching-cached-messages) and are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetching-cached-messages) and
[`poll=1`](subscribe/api.md#polling) won't return the message anymore. [`poll=1`](subscribe/api.md#polling-for-messages) won't return the message anymore.
=== "Command line (curl)" === "Command line (curl)"
``` ```
@ -393,7 +472,7 @@ are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fe
])); ]));
``` ```
### Firebase ### Disable Firebase
!!! info !!! info
If `Firebase: no` is used and [instant delivery](subscribe/phone.md#instant-delivery) isn't enabled in the Android If `Firebase: no` is used and [instant delivery](subscribe/phone.md#instant-delivery) isn't enabled in the Android
app (Google Play variant only), **message delivery will be significantly delayed (up to 15 minutes)**. To overcome app (Google Play variant only), **message delivery will be significantly delayed (up to 15 minutes)**. To overcome

View File

@ -239,7 +239,7 @@ or `all` (all cached messages).
curl -s "ntfy.sh/mytopic/json?since=10m" curl -s "ntfy.sh/mytopic/json?since=10m"
``` ```
### Polling ### Polling for messages
You can also just poll for messages if you don't like the long-standing connection using the `poll=1` You can also just poll for messages if you don't like the long-standing connection using the `poll=1`
query parameter. The connection will end after all available messages have been read. This parameter can be query parameter. The connection will end after all available messages have been read. This parameter can be
combined with `since=` (defaults to `since=all`). combined with `since=` (defaults to `since=all`).
@ -248,6 +248,16 @@ combined with `since=` (defaults to `since=all`).
curl -s "ntfy.sh/mytopic/json?poll=1" curl -s "ntfy.sh/mytopic/json?poll=1"
``` ```
### Fetching scheduled messages
Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically
returned when subscribing via the API, which makes sense, because after all, the messages have technically not been
delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`)
parameter (makes most sense with the `poll=1` parameter):
```
curl -s "ntfy.sh/mytopic/json?poll=1&sched=1"
```
### Subscribing to multiple topics ### Subscribing to multiple topics
It's possible to subscribe to multiple topics in one HTTP call by providing a It's possible to subscribe to multiple topics in one HTTP call by providing a
comma-separated list of topics in the URL. This allows you to reduce the number of connections you have to maintain: comma-separated list of topics in the URL. This allows you to reduce the number of connections you have to maintain:

13
go.mod
View File

@ -2,6 +2,8 @@ module heckel.io/ntfy
go 1.17 go 1.17
replace github.com/olebedev/when => github.com/binwiederhier/when v0.0.1-binwiederhier2
require ( require (
cloud.google.com/go/firestore v1.6.1 // indirect cloud.google.com/go/firestore v1.6.1 // indirect
cloud.google.com/go/storage v1.18.2 // indirect cloud.google.com/go/storage v1.18.2 // indirect
@ -9,36 +11,39 @@ require (
github.com/BurntSushi/toml v0.4.1 // indirect github.com/BurntSushi/toml v0.4.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/mattn/go-sqlite3 v1.14.9 github.com/mattn/go-sqlite3 v1.14.9
github.com/olebedev/when v0.0.0-20190311101825-c3b538a97254
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0 github.com/urfave/cli/v2 v2.3.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
google.golang.org/api v0.61.0 google.golang.org/api v0.62.0
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
) )
require ( require (
cloud.google.com/go v0.99.0 // indirect cloud.google.com/go v0.99.0 // indirect
github.com/AlekSi/pointer v1.0.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 // indirect github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/envoyproxy/go-control-plane v0.10.1 // indirect github.com/envoyproxy/go-control-plane v0.10.1 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect github.com/google/go-cmp v0.5.6 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
go.opencensus.io v0.23.0 // indirect go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.42.0 // indirect google.golang.org/grpc v1.42.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect

24
go.sum
View File

@ -25,6 +25,7 @@ cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aD
cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI= cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI=
cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4= cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4=
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go v0.98.0/go.mod h1:ua6Ush4NALrHk5QXDWnjvZHN93OuF0HfuEPq9I1X0cM=
cloud.google.com/go v0.99.0 h1:y/cM2iqGgGi5D5DQZl6D9STN/3dR/Vx5Mp8s752oJTY= cloud.google.com/go v0.99.0 h1:y/cM2iqGgGi5D5DQZl6D9STN/3dR/Vx5Mp8s752oJTY=
cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA= cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
@ -51,12 +52,16 @@ cloud.google.com/go/storage v1.18.2/go.mod h1:AiIj7BWXyhO5gGVmYJ+S8tbkCx3yb0IMju
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/AlekSi/pointer v1.0.0 h1:KWCWzsvFxNLcmM5XmiqHsGTTsuwZMsLFwWF9Y+//bNE=
github.com/AlekSi/pointer v1.0.0/go.mod h1:1kjywbfcPFCmncIxtk6fIEub6LKrfMz3gc5QKVOSOA8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/binwiederhier/when v0.0.1-binwiederhier2 h1:BjQC7OQI4MK0vXeltn2BEuf0Tdh/M6YNh1JrepnVr2I=
github.com/binwiederhier/when v0.0.1-binwiederhier2/go.mod h1:DPucAeQGDPUzYUt+NaWw6qsF5SFapWWToxEiVDh2aV0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk= github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -84,8 +89,9 @@ github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWH
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@ -199,6 +205,8 @@ github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc8
github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA= github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA=
github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -213,6 +221,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -390,8 +399,9 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d h1:FjkYO/PPp4Wi0EAUOVLxePm7qVW4r4ctbWpURyuOD0E=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -495,8 +505,9 @@ google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqiv
google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E= google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU= google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8=
google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I=
google.golang.org/api v0.62.0 h1:PhGymJMXfGBzc4lBRmrx9+1w4w2wEzURHNGF/sD/xGc=
google.golang.org/api v0.62.0/go.mod h1:dKmwPCydfsad4qCH08MSdgWjfHOyfpd4VtDGgRFdavw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -566,9 +577,11 @@ google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20211016002631-37fc39342514/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211016002631-37fc39342514/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211028162531-8db9c33dc351/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211028162531-8db9c33dc351/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f h1:QH7+Ym+7e2XV1dZIHapkXoeqHyNaCzn6MNp3JBaYYUc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa h1:I0YcKz0I7OAhddo7ya8kMnvprhcWM045PmkBdMO9zN0=
google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -594,6 +607,7 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A= google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=

View File

@ -14,8 +14,10 @@ var (
// i.e. message structs with the Event messageEvent. // i.e. message structs with the Event messageEvent.
type cache interface { type cache interface {
AddMessage(m *message) error AddMessage(m *message) error
Messages(topic string, since sinceTime) ([]*message, error) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error)
MessagesDue() ([]*message, error)
MessageCount(topic string) (int, error) MessageCount(topic string) (int, error)
Topics() (map[string]*topic, error) Topics() (map[string]*topic, error)
Prune(olderThan time.Time) error Prune(olderThan time.Time) error
MarkPublished(m *message) error
} }

View File

@ -1,14 +1,16 @@
package server package server
import ( import (
"sort"
"sync" "sync"
"time" "time"
) )
type memCache struct { type memCache struct {
messages map[string][]*message messages map[string][]*message
nop bool scheduled map[string]*message // Message ID -> message
mu sync.Mutex nop bool
mu sync.Mutex
} }
var _ cache = (*memCache)(nil) var _ cache = (*memCache)(nil)
@ -16,8 +18,9 @@ var _ cache = (*memCache)(nil)
// newMemCache creates an in-memory cache // newMemCache creates an in-memory cache
func newMemCache() *memCache { func newMemCache() *memCache {
return &memCache{ return &memCache{
messages: make(map[string][]*message), messages: make(map[string][]*message),
nop: false, scheduled: make(map[string]*message),
nop: false,
} }
} }
@ -25,77 +28,109 @@ func newMemCache() *memCache {
// it is always empty and can be used if caching is entirely disabled // it is always empty and can be used if caching is entirely disabled
func newNopCache() *memCache { func newNopCache() *memCache {
return &memCache{ return &memCache{
messages: make(map[string][]*message), messages: make(map[string][]*message),
nop: true, scheduled: make(map[string]*message),
nop: true,
} }
} }
func (s *memCache) AddMessage(m *message) error { func (c *memCache) AddMessage(m *message) error {
s.mu.Lock() c.mu.Lock()
defer s.mu.Unlock() defer c.mu.Unlock()
if s.nop { if c.nop {
return nil return nil
} }
if m.Event != messageEvent { if m.Event != messageEvent {
return errUnexpectedMessageType return errUnexpectedMessageType
} }
if _, ok := s.messages[m.Topic]; !ok { if _, ok := c.messages[m.Topic]; !ok {
s.messages[m.Topic] = make([]*message, 0) c.messages[m.Topic] = make([]*message, 0)
} }
s.messages[m.Topic] = append(s.messages[m.Topic], m) delayed := m.Time > time.Now().Unix()
if delayed {
c.scheduled[m.ID] = m
}
c.messages[m.Topic] = append(c.messages[m.Topic], m)
return nil return nil
} }
func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) { func (c *memCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
s.mu.Lock() c.mu.Lock()
defer s.mu.Unlock() defer c.mu.Unlock()
if _, ok := s.messages[topic]; !ok || since.IsNone() { if _, ok := c.messages[topic]; !ok || since.IsNone() {
return make([]*message, 0), nil return make([]*message, 0), nil
} }
messages := make([]*message, 0) // copy! messages := make([]*message, 0)
for _, m := range s.messages[topic] { for _, m := range c.messages[topic] {
msgTime := time.Unix(m.Time, 0) _, messageScheduled := c.scheduled[m.ID]
if msgTime == since.Time() || msgTime.After(since.Time()) { include := m.Time >= since.Time().Unix() && (!messageScheduled || scheduled)
if include {
messages = append(messages, m) messages = append(messages, m)
} }
} }
sort.Slice(messages, func(i, j int) bool {
return messages[i].Time < messages[j].Time
})
return messages, nil return messages, nil
} }
func (s *memCache) MessageCount(topic string) (int, error) { func (c *memCache) MessagesDue() ([]*message, error) {
s.mu.Lock() c.mu.Lock()
defer s.mu.Unlock() defer c.mu.Unlock()
if _, ok := s.messages[topic]; !ok { messages := make([]*message, 0)
return 0, nil for _, m := range c.scheduled {
due := time.Now().Unix() >= m.Time
if due {
messages = append(messages, m)
}
} }
return len(s.messages[topic]), nil sort.Slice(messages, func(i, j int) bool {
return messages[i].Time < messages[j].Time
})
return messages, nil
} }
func (s *memCache) Topics() (map[string]*topic, error) { func (c *memCache) MarkPublished(m *message) error {
s.mu.Lock() c.mu.Lock()
defer s.mu.Unlock() delete(c.scheduled, m.ID)
c.mu.Unlock()
return nil
}
func (c *memCache) MessageCount(topic string) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.messages[topic]; !ok {
return 0, nil
}
return len(c.messages[topic]), nil
}
func (c *memCache) Topics() (map[string]*topic, error) {
c.mu.Lock()
defer c.mu.Unlock()
topics := make(map[string]*topic) topics := make(map[string]*topic)
for topic := range s.messages { for topic := range c.messages {
topics[topic] = newTopic(topic) topics[topic] = newTopic(topic)
} }
return topics, nil return topics, nil
} }
func (s *memCache) Prune(olderThan time.Time) error { func (c *memCache) Prune(olderThan time.Time) error {
s.mu.Lock() c.mu.Lock()
defer s.mu.Unlock() defer c.mu.Unlock()
for topic := range s.messages { for topic := range c.messages {
s.pruneTopic(topic, olderThan) c.pruneTopic(topic, olderThan)
} }
return nil return nil
} }
func (s *memCache) pruneTopic(topic string, olderThan time.Time) { func (c *memCache) pruneTopic(topic string, olderThan time.Time) {
messages := make([]*message, 0) messages := make([]*message, 0)
for _, m := range s.messages[topic] { for _, m := range c.messages[topic] {
if m.Time >= olderThan.Unix() { if m.Time >= olderThan.Unix() {
messages = append(messages, m) messages = append(messages, m)
} }
} }
s.messages[topic] = messages c.messages[topic] = messages
} }

View File

@ -9,6 +9,10 @@ func TestMemCache_Messages(t *testing.T) {
testCacheMessages(t, newMemCache()) testCacheMessages(t, newMemCache())
} }
func TestMemCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newMemCache())
}
func TestMemCache_Topics(t *testing.T) { func TestMemCache_Topics(t *testing.T) {
testCacheTopics(t, newMemCache()) testCacheTopics(t, newMemCache())
} }
@ -25,7 +29,7 @@ func TestMemCache_NopCache(t *testing.T) {
c := newNopCache() c := newNopCache()
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message"))) assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
messages, err := c.Messages("mytopic", sinceAllMessages) messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err) assert.Nil(t, err)
assert.Empty(t, messages) assert.Empty(t, messages)

View File

@ -21,19 +21,32 @@ const (
message VARCHAR(512) NOT NULL, message VARCHAR(512) NOT NULL,
title VARCHAR(256) NOT NULL, title VARCHAR(256) NOT NULL,
priority INT NOT NULL, priority INT NOT NULL,
tags VARCHAR(256) NOT NULL tags VARCHAR(256) NOT NULL,
published INT NOT NULL
); );
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT; COMMIT;
` `
insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)` insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
pruneMessagesQuery = `DELETE FROM messages WHERE time < ?` pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
selectMessagesSinceTimeQuery = ` selectMessagesSinceTimeQuery = `
SELECT id, time, message, title, priority, tags SELECT id, time, topic, message, title, priority, tags
FROM messages
WHERE topic = ? AND time >= ? AND published = 1
ORDER BY time ASC
`
selectMessagesSinceTimeIncludeScheduledQuery = `
SELECT id, time, topic, message, title, priority, tags
FROM messages FROM messages
WHERE topic = ? AND time >= ? WHERE topic = ? AND time >= ?
ORDER BY time ASC ORDER BY time ASC
` `
selectMessagesDueQuery = `
SELECT id, time, topic, message, title, priority, tags
FROM messages
WHERE time <= ? AND published = 0
`
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?` selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
@ -41,7 +54,7 @@ const (
// Schema management queries // Schema management queries
const ( const (
currentSchemaVersion = 1 currentSchemaVersion = 2
createSchemaVersionTableQuery = ` createSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion ( CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY, id INT PRIMARY KEY,
@ -49,6 +62,7 @@ const (
); );
` `
insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)` insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1` selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
// 0 -> 1 // 0 -> 1
@ -59,6 +73,11 @@ const (
ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
COMMIT; COMMIT;
` `
// 1 -> 2
migrate1To2AlterMessagesTableQuery = `
ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
`
) )
type sqliteCache struct { type sqliteCache struct {
@ -84,46 +103,39 @@ func (c *sqliteCache) AddMessage(m *message) error {
if m.Event != messageEvent { if m.Event != messageEvent {
return errUnexpectedMessageType return errUnexpectedMessageType
} }
_, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ",")) published := m.Time <= time.Now().Unix()
_, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published)
return err return err
} }
func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) { func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
if since.IsNone() { if since.IsNone() {
return make([]*message, 0), nil return make([]*message, 0), nil
} }
rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix()) var rows *sql.Rows
var err error
if scheduled {
rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
} else {
rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() return readMessages(rows)
messages := make([]*message, 0) }
for rows.Next() {
var timestamp int64 func (c *sqliteCache) MessagesDue() ([]*message, error) {
var priority int rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
var id, msg, title, tagsStr string if err != nil {
if err := rows.Scan(&id, &timestamp, &msg, &title, &priority, &tagsStr); err != nil {
return nil, err
}
var tags []string
if tagsStr != "" {
tags = strings.Split(tagsStr, ",")
}
messages = append(messages, &message{
ID: id,
Time: timestamp,
Event: messageEvent,
Topic: topic,
Message: msg,
Title: title,
Priority: priority,
Tags: tags,
})
}
if err := rows.Err(); err != nil {
return nil, err return nil, err
} }
return messages, nil return readMessages(rows)
}
func (c *sqliteCache) MarkPublished(m *message) error {
_, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
return err
} }
func (c *sqliteCache) MessageCount(topic string) (int, error) { func (c *sqliteCache) MessageCount(topic string) (int, error) {
@ -169,13 +181,44 @@ func (c *sqliteCache) Prune(olderThan time.Time) error {
return err return err
} }
func readMessages(rows *sql.Rows) ([]*message, error) {
defer rows.Close()
messages := make([]*message, 0)
for rows.Next() {
var timestamp int64
var priority int
var id, topic, msg, title, tagsStr string
if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr); err != nil {
return nil, err
}
var tags []string
if tagsStr != "" {
tags = strings.Split(tagsStr, ",")
}
messages = append(messages, &message{
ID: id,
Time: timestamp,
Event: messageEvent,
Topic: topic,
Message: msg,
Title: title,
Priority: priority,
Tags: tags,
})
}
if err := rows.Err(); err != nil {
return nil, err
}
return messages, nil
}
func setupDB(db *sql.DB) error { func setupDB(db *sql.DB) error {
// If 'messages' table does not exist, this must be a new database // If 'messages' table does not exist, this must be a new database
rowsMC, err := db.Query(selectMessagesCountQuery) rowsMC, err := db.Query(selectMessagesCountQuery)
if err != nil { if err != nil {
return setupNewDB(db) return setupNewDB(db)
} }
defer rowsMC.Close() rowsMC.Close()
// If 'messages' table exists, check 'schemaVersion' table // If 'messages' table exists, check 'schemaVersion' table
schemaVersion := 0 schemaVersion := 0
@ -188,13 +231,16 @@ func setupDB(db *sql.DB) error {
if err := rowsSV.Scan(&schemaVersion); err != nil { if err := rowsSV.Scan(&schemaVersion); err != nil {
return err return err
} }
rowsSV.Close()
} }
// Do migrations // Do migrations
if schemaVersion == currentSchemaVersion { if schemaVersion == currentSchemaVersion {
return nil return nil
} else if schemaVersion == 0 { } else if schemaVersion == 0 {
return migrateFrom0To1(db) return migrateFrom0(db)
} else if schemaVersion == 1 {
return migrateFrom1(db)
} }
return fmt.Errorf("unexpected schema version found: %d", schemaVersion) return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
} }
@ -212,7 +258,7 @@ func setupNewDB(db *sql.DB) error {
return nil return nil
} }
func migrateFrom0To1(db *sql.DB) error { func migrateFrom0(db *sql.DB) error {
log.Print("Migrating cache database schema: from 0 to 1") log.Print("Migrating cache database schema: from 0 to 1")
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil { if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
return err return err
@ -223,5 +269,16 @@ func migrateFrom0To1(db *sql.DB) error {
if _, err := db.Exec(insertSchemaVersion, 1); err != nil { if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
return err return err
} }
return nil return migrateFrom1(db)
}
func migrateFrom1(db *sql.DB) error {
log.Print("Migrating cache database schema: from 1 to 2")
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
return err
}
return nil // Update this when a new version is added
} }

View File

@ -3,16 +3,20 @@ package server
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/require"
"path/filepath" "path/filepath"
"testing" "testing"
"time" "time"
) )
func TestSqliteCache_AddMessage(t *testing.T) { func TestSqliteCache_Messages(t *testing.T) {
testCacheMessages(t, newSqliteTestCache(t)) testCacheMessages(t, newSqliteTestCache(t))
} }
func TestSqliteCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newSqliteTestCache(t))
}
func TestSqliteCache_Topics(t *testing.T) { func TestSqliteCache_Topics(t *testing.T) {
testCacheTopics(t, newSqliteTestCache(t)) testCacheTopics(t, newSqliteTestCache(t))
} }
@ -25,10 +29,10 @@ func TestSqliteCache_Prune(t *testing.T) {
testCachePrune(t, newSqliteTestCache(t)) testCachePrune(t, newSqliteTestCache(t))
} }
func TestSqliteCache_Migration_0to1(t *testing.T) { func TestSqliteCache_Migration_From0(t *testing.T) {
filename := newSqliteTestCacheFile(t) filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename) db, err := sql.Open("sqlite3", filename)
assert.Nil(t, err) require.Nil(t, err)
// Create "version 0" schema // Create "version 0" schema
_, err = db.Exec(` _, err = db.Exec(`
@ -42,32 +46,91 @@ func TestSqliteCache_Migration_0to1(t *testing.T) {
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT; COMMIT;
`) `)
assert.Nil(t, err) require.Nil(t, err)
// Insert a bunch of messages // Insert a bunch of messages
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`, _, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i)) fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
assert.Nil(t, err) require.Nil(t, err)
} }
require.Nil(t, db.Close())
// Create cache to trigger migration // Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename) c := newSqliteTestCacheFromFile(t, filename)
messages, err := c.Messages("mytopic", sinceAllMessages) checkSchemaVersion(t, c.db)
assert.Nil(t, err)
assert.Equal(t, 10, len(messages))
assert.Equal(t, "some message 5", messages[5].Message)
assert.Equal(t, "", messages[5].Title)
assert.Nil(t, messages[5].Tags)
assert.Equal(t, 0, messages[5].Priority)
rows, err := c.db.Query(`SELECT version FROM schemaVersion`) messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err) require.Nil(t, err)
assert.True(t, rows.Next()) require.Equal(t, 10, len(messages))
require.Equal(t, "some message 5", messages[5].Message)
require.Equal(t, "", messages[5].Title)
require.Nil(t, messages[5].Tags)
require.Equal(t, 0, messages[5].Priority)
}
func TestSqliteCache_Migration_From1(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
require.Nil(t, err)
// Create "version 1" schema
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(20) PRIMARY KEY,
time INT NOT NULL,
topic VARCHAR(64) NOT NULL,
message VARCHAR(512) NOT NULL,
title VARCHAR(256) NOT NULL,
priority INT NOT NULL,
tags VARCHAR(256) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
version INT NOT NULL
);
INSERT INTO schemaVersion (id, version) VALUES (1, 1);
`)
require.Nil(t, err)
// Insert a bunch of messages
for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
require.Nil(t, err)
}
require.Nil(t, db.Close())
// Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename)
checkSchemaVersion(t, c.db)
// Add delayed message
delayedMessage := newDefaultMessage("mytopic", "some delayed message")
delayedMessage.Time = time.Now().Add(time.Minute).Unix()
require.Nil(t, c.AddMessage(delayedMessage))
// 10, not 11!
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 10, len(messages))
// 11!
messages, err = c.Messages("mytopic", sinceAllMessages, true)
require.Nil(t, err)
require.Equal(t, 11, len(messages))
}
func checkSchemaVersion(t *testing.T, db *sql.DB) {
rows, err := db.Query(`SELECT version FROM schemaVersion`)
require.Nil(t, err)
require.True(t, rows.Next())
var schemaVersion int var schemaVersion int
assert.Nil(t, rows.Scan(&schemaVersion)) require.Nil(t, rows.Scan(&schemaVersion))
assert.Equal(t, 1, schemaVersion) require.Equal(t, currentSchemaVersion, schemaVersion)
require.Nil(t, rows.Close())
} }
func newSqliteTestCache(t *testing.T) *sqliteCache { func newSqliteTestCache(t *testing.T) *sqliteCache {

View File

@ -27,7 +27,7 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, 2, count) assert.Equal(t, 2, count)
// mytopic: since all // mytopic: since all
messages, _ := c.Messages("mytopic", sinceAllMessages) messages, _ := c.Messages("mytopic", sinceAllMessages, false)
assert.Equal(t, 2, len(messages)) assert.Equal(t, 2, len(messages))
assert.Equal(t, "my message", messages[0].Message) assert.Equal(t, "my message", messages[0].Message)
assert.Equal(t, "mytopic", messages[0].Topic) assert.Equal(t, "mytopic", messages[0].Topic)
@ -38,11 +38,11 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, "my other message", messages[1].Message) assert.Equal(t, "my other message", messages[1].Message)
// mytopic: since none // mytopic: since none
messages, _ = c.Messages("mytopic", sinceNoMessages) messages, _ = c.Messages("mytopic", sinceNoMessages, false)
assert.Empty(t, messages) assert.Empty(t, messages)
// mytopic: since 2 // mytopic: since 2
messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0))) messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)), false)
assert.Equal(t, 1, len(messages)) assert.Equal(t, 1, len(messages))
assert.Equal(t, "my other message", messages[0].Message) assert.Equal(t, "my other message", messages[0].Message)
@ -52,7 +52,7 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, 1, count) assert.Equal(t, 1, count)
// example: since all // example: since all
messages, _ = c.Messages("example", sinceAllMessages) messages, _ = c.Messages("example", sinceAllMessages, false)
assert.Equal(t, "my example message", messages[0].Message) assert.Equal(t, "my example message", messages[0].Message)
// non-existing: count // non-existing: count
@ -61,7 +61,7 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, 0, count) assert.Equal(t, 0, count)
// non-existing: since all // non-existing: since all
messages, _ = c.Messages("doesnotexist", sinceAllMessages) messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
assert.Empty(t, messages) assert.Empty(t, messages)
} }
@ -103,7 +103,7 @@ func testCachePrune(t *testing.T, c cache) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 0, count) assert.Equal(t, 0, count)
messages, err := c.Messages("mytopic", sinceAllMessages) messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(messages)) assert.Equal(t, 1, len(messages))
assert.Equal(t, "my other message", messages[0].Message) assert.Equal(t, "my other message", messages[0].Message)
@ -116,8 +116,34 @@ func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
m.Title = "some title" m.Title = "some title"
assert.Nil(t, c.AddMessage(m)) assert.Nil(t, c.AddMessage(m))
messages, _ := c.Messages("mytopic", sinceAllMessages) messages, _ := c.Messages("mytopic", sinceAllMessages, false)
assert.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags) assert.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
assert.Equal(t, 5, messages[0].Priority) assert.Equal(t, 5, messages[0].Priority)
assert.Equal(t, "some title", messages[0].Title) assert.Equal(t, "some title", messages[0].Title)
} }
func testCacheMessagesScheduled(t *testing.T, c cache) {
m1 := newDefaultMessage("mytopic", "message 1")
m2 := newDefaultMessage("mytopic", "message 2")
m2.Time = time.Now().Add(time.Hour).Unix()
m3 := newDefaultMessage("mytopic", "message 3")
m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
m4 := newDefaultMessage("mytopic2", "message 4")
m4.Time = time.Now().Add(time.Minute).Unix()
assert.Nil(t, c.AddMessage(m1))
assert.Nil(t, c.AddMessage(m2))
assert.Nil(t, c.AddMessage(m3))
messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
assert.Equal(t, 1, len(messages))
assert.Equal(t, "message 1", messages[0].Message)
messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
assert.Equal(t, 3, len(messages))
assert.Equal(t, "message 1", messages[0].Message)
assert.Equal(t, "message 3", messages[1].Message) // Order!
assert.Equal(t, "message 2", messages[2].Message)
messages, _ = c.MessagesDue()
assert.Empty(t, messages)
}

View File

@ -71,10 +71,6 @@ var (
sinceNoMessages = sinceTime(time.Unix(1, 0)) sinceNoMessages = sinceTime(time.Unix(1, 0))
) )
const (
messageLimit = 512
)
var ( var (
topicRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app! topicRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
jsonRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`) jsonRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
@ -180,7 +176,16 @@ func (s *Server) Run() error {
ticker := time.NewTicker(s.config.ManagerInterval) ticker := time.NewTicker(s.config.ManagerInterval)
for { for {
<-ticker.C <-ticker.C
s.updateStatsAndExpire() s.updateStatsAndPrune()
}
}()
go func() {
ticker := time.NewTicker(s.config.AtSenderInterval)
for {
<-ticker.C
if err := s.sendDelayedMessages(); err != nil {
log.Printf("error sending scheduled messages: %s", err.Error())
}
} }
}() }()
listenStr := fmt.Sprintf("%s/http", s.config.ListenHTTP) listenStr := fmt.Sprintf("%s/http", s.config.ListenHTTP)
@ -270,7 +275,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
if err != nil { if err != nil {
return err return err
} }
reader := io.LimitReader(r.Body, messageLimit) reader := io.LimitReader(r.Body, int64(s.config.MessageLimit))
b, err := io.ReadAll(reader) b, err := io.ReadAll(reader)
if err != nil { if err != nil {
return err return err
@ -279,14 +284,17 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
if m.Message == "" { if m.Message == "" {
return errHTTPBadRequest return errHTTPBadRequest
} }
title, priority, tags, cache, firebase := parseHeaders(r.Header) cache, firebase, err := s.parseHeaders(r.Header, m)
m.Title = title if err != nil {
m.Priority = priority
m.Tags = tags
if err := t.Publish(m); err != nil {
return err return err
} }
if s.firebase != nil && firebase { delayed := m.Time > time.Now().Unix()
if !delayed {
if err := t.Publish(m); err != nil {
return err
}
}
if s.firebase != nil && firebase && !delayed {
go func() { go func() {
if err := s.firebase(m); err != nil { if err := s.firebase(m); err != nil {
log.Printf("Unable to publish to Firebase: %v", err.Error()) log.Printf("Unable to publish to Firebase: %v", err.Error())
@ -308,35 +316,50 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
return nil return nil
} }
func parseHeaders(header http.Header) (title string, priority int, tags []string, cache bool, firebase bool) { func (s *Server) parseHeaders(header http.Header, m *message) (cache bool, firebase bool, err error) {
title = readHeader(header, "x-title", "title", "ti", "t") cache = readHeader(header, "x-cache", "cache") != "no"
firebase = readHeader(header, "x-firebase", "firebase") != "no"
m.Title = readHeader(header, "x-title", "title", "ti", "t")
priorityStr := readHeader(header, "x-priority", "priority", "prio", "p") priorityStr := readHeader(header, "x-priority", "priority", "prio", "p")
if priorityStr != "" { if priorityStr != "" {
switch strings.ToLower(priorityStr) { switch strings.ToLower(priorityStr) {
case "1", "min": case "1", "min":
priority = 1 m.Priority = 1
case "2", "low": case "2", "low":
priority = 2 m.Priority = 2
case "3", "default": case "3", "default":
priority = 3 m.Priority = 3
case "4", "high": case "4", "high":
priority = 4 m.Priority = 4
case "5", "max", "urgent": case "5", "max", "urgent":
priority = 5 m.Priority = 5
default: default:
priority = 0 return false, false, errHTTPBadRequest
} }
} }
tagsStr := readHeader(header, "x-tags", "tag", "tags", "ta") tagsStr := readHeader(header, "x-tags", "tag", "tags", "ta")
if tagsStr != "" { if tagsStr != "" {
tags = make([]string, 0) m.Tags = make([]string, 0)
for _, s := range strings.Split(tagsStr, ",") { for _, s := range strings.Split(tagsStr, ",") {
tags = append(tags, strings.TrimSpace(s)) m.Tags = append(m.Tags, strings.TrimSpace(s))
} }
} }
cache = readHeader(header, "x-cache", "cache") != "no" delayStr := readHeader(header, "x-delay", "delay", "x-at", "at", "x-in", "in")
firebase = readHeader(header, "x-firebase", "firebase") != "no" if delayStr != "" {
return title, priority, tags, cache, firebase if !cache {
return false, false, errHTTPBadRequest
}
delay, err := util.ParseFutureTime(delayStr, time.Now())
if err != nil {
return false, false, errHTTPBadRequest
} else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
return false, false, errHTTPBadRequest
} else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
return false, false, errHTTPBadRequest
}
m.Time = delay.Unix()
}
return cache, firebase, nil
} }
func readHeader(header http.Header, names ...string) string { func readHeader(header http.Header, names ...string) string {
@ -401,6 +424,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
} }
var wlock sync.Mutex var wlock sync.Mutex
poll := r.URL.Query().Has("poll") poll := r.URL.Query().Has("poll")
scheduled := r.URL.Query().Has("scheduled") || r.URL.Query().Has("sched")
sub := func(msg *message) error { sub := func(msg *message) error {
wlock.Lock() wlock.Lock()
defer wlock.Unlock() defer wlock.Unlock()
@ -419,7 +443,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset! w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
if poll { if poll {
return s.sendOldMessages(topics, since, sub) return s.sendOldMessages(topics, since, scheduled, sub)
} }
subscriberIDs := make([]int, 0) subscriberIDs := make([]int, 0)
for _, t := range topics { for _, t := range topics {
@ -433,7 +457,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message
return err return err
} }
if err := s.sendOldMessages(topics, since, sub); err != nil { if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil {
return err return err
} }
for { for {
@ -449,12 +473,12 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
} }
} }
func (s *Server) sendOldMessages(topics []*topic, since sinceTime, sub subscriber) error { func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error {
if since.IsNone() { if since.IsNone() {
return nil return nil
} }
for _, t := range topics { for _, t := range topics {
messages, err := s.cache.Messages(t.ID, since) messages, err := s.cache.Messages(t.ID, since, scheduled)
if err != nil { if err != nil {
return err return err
} }
@ -521,7 +545,7 @@ func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
return topics, nil return topics, nil
} }
func (s *Server) updateStatsAndExpire() { func (s *Server) updateStatsAndPrune() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -532,13 +556,13 @@ func (s *Server) updateStatsAndExpire() {
} }
} }
// Prune cache // Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration) olderThan := time.Now().Add(-1 * s.config.CacheDuration)
if err := s.cache.Prune(olderThan); err != nil { if err := s.cache.Prune(olderThan); err != nil {
log.Printf("error pruning cache: %s", err.Error()) log.Printf("error pruning cache: %s", err.Error())
} }
// Prune old messages, remove subscriptions without subscribers // Prune old topics, remove subscriptions without subscribers
var subscribers, messages int var subscribers, messages int
for _, t := range s.topics { for _, t := range s.topics {
subs := t.Subscribers() subs := t.Subscribers()
@ -560,6 +584,32 @@ func (s *Server) updateStatsAndExpire() {
s.messages, len(s.topics), subscribers, messages, len(s.visitors)) s.messages, len(s.topics), subscribers, messages, len(s.visitors))
} }
func (s *Server) sendDelayedMessages() error {
s.mu.Lock()
defer s.mu.Unlock()
messages, err := s.cache.MessagesDue()
if err != nil {
return err
}
for _, m := range messages {
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
if ok {
if err := t.Publish(m); err != nil {
log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
}
if s.firebase != nil {
if err := s.firebase(m); err != nil {
log.Printf("unable to publish to Firebase: %v", err.Error())
}
}
}
if err := s.cache.MarkPublished(m); err != nil {
return err
}
}
return nil
}
func (s *Server) withRateLimit(w http.ResponseWriter, r *http.Request, handler func(w http.ResponseWriter, r *http.Request, v *visitor) error) error { func (s *Server) withRateLimit(w http.ResponseWriter, r *http.Request, handler func(w http.ResponseWriter, r *http.Request, v *visitor) error) error {
v := s.visitor(r) v := s.visitor(r)
if err := v.RequestAllowed(); err != nil { if err := v.RequestAllowed(); err != nil {

View File

@ -4,7 +4,7 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/require"
"heckel.io/ntfy/config" "heckel.io/ntfy/config"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -19,33 +19,33 @@ func TestServer_PublishAndPoll(t *testing.T) {
response1 := request(t, s, "PUT", "/mytopic", "my first message", nil) response1 := request(t, s, "PUT", "/mytopic", "my first message", nil)
msg1 := toMessage(t, response1.Body.String()) msg1 := toMessage(t, response1.Body.String())
assert.NotEmpty(t, msg1.ID) require.NotEmpty(t, msg1.ID)
assert.Equal(t, "my first message", msg1.Message) require.Equal(t, "my first message", msg1.Message)
response2 := request(t, s, "PUT", "/mytopic", "my second\n\nmessage", nil) response2 := request(t, s, "PUT", "/mytopic", "my second\n\nmessage", nil)
msg2 := toMessage(t, response2.Body.String()) msg2 := toMessage(t, response2.Body.String())
assert.NotEqual(t, msg1.ID, msg2.ID) require.NotEqual(t, msg1.ID, msg2.ID)
assert.NotEmpty(t, msg2.ID) require.NotEmpty(t, msg2.ID)
assert.Equal(t, "my second\n\nmessage", msg2.Message) require.Equal(t, "my second\n\nmessage", msg2.Message)
response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil) response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String()) messages := toMessages(t, response.Body.String())
assert.Equal(t, 2, len(messages)) require.Equal(t, 2, len(messages))
assert.Equal(t, "my first message", messages[0].Message) require.Equal(t, "my first message", messages[0].Message)
assert.Equal(t, "my second\n\nmessage", messages[1].Message) require.Equal(t, "my second\n\nmessage", messages[1].Message)
response = request(t, s, "GET", "/mytopic/sse?poll=1", "", nil) response = request(t, s, "GET", "/mytopic/sse?poll=1", "", nil)
lines := strings.Split(strings.TrimSpace(response.Body.String()), "\n") lines := strings.Split(strings.TrimSpace(response.Body.String()), "\n")
assert.Equal(t, 3, len(lines)) require.Equal(t, 3, len(lines))
assert.Equal(t, "my first message", toMessage(t, strings.TrimPrefix(lines[0], "data: ")).Message) require.Equal(t, "my first message", toMessage(t, strings.TrimPrefix(lines[0], "data: ")).Message)
assert.Equal(t, "", lines[1]) require.Equal(t, "", lines[1])
assert.Equal(t, "my second\n\nmessage", toMessage(t, strings.TrimPrefix(lines[2], "data: ")).Message) require.Equal(t, "my second\n\nmessage", toMessage(t, strings.TrimPrefix(lines[2], "data: ")).Message)
response = request(t, s, "GET", "/mytopic/raw?poll=1", "", nil) response = request(t, s, "GET", "/mytopic/raw?poll=1", "", nil)
lines = strings.Split(strings.TrimSpace(response.Body.String()), "\n") lines = strings.Split(strings.TrimSpace(response.Body.String()), "\n")
assert.Equal(t, 2, len(lines)) require.Equal(t, 2, len(lines))
assert.Equal(t, "my first message", lines[0]) require.Equal(t, "my first message", lines[0])
assert.Equal(t, "my second message", lines[1]) // \n -> " " require.Equal(t, "my second message", lines[1]) // \n -> " "
} }
func TestServer_SubscribeOpenAndKeepalive(t *testing.T) { func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
@ -69,21 +69,21 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
<-doneChan <-doneChan
messages := toMessages(t, rr.Body.String()) messages := toMessages(t, rr.Body.String())
assert.Equal(t, 2, len(messages)) require.Equal(t, 2, len(messages))
assert.Equal(t, openEvent, messages[0].Event) require.Equal(t, openEvent, messages[0].Event)
assert.Equal(t, "mytopic", messages[0].Topic) require.Equal(t, "mytopic", messages[0].Topic)
assert.Equal(t, "", messages[0].Message) require.Equal(t, "", messages[0].Message)
assert.Equal(t, "", messages[0].Title) require.Equal(t, "", messages[0].Title)
assert.Equal(t, 0, messages[0].Priority) require.Equal(t, 0, messages[0].Priority)
assert.Nil(t, messages[0].Tags) require.Nil(t, messages[0].Tags)
assert.Equal(t, keepaliveEvent, messages[1].Event) require.Equal(t, keepaliveEvent, messages[1].Event)
assert.Equal(t, "mytopic", messages[1].Topic) require.Equal(t, "mytopic", messages[1].Topic)
assert.Equal(t, "", messages[1].Message) require.Equal(t, "", messages[1].Message)
assert.Equal(t, "", messages[1].Title) require.Equal(t, "", messages[1].Title)
assert.Equal(t, 0, messages[1].Priority) require.Equal(t, 0, messages[1].Priority)
assert.Nil(t, messages[1].Tags) require.Nil(t, messages[1].Tags)
} }
func TestServer_PublishAndSubscribe(t *testing.T) { func TestServer_PublishAndSubscribe(t *testing.T) {
@ -93,63 +93,79 @@ func TestServer_PublishAndSubscribe(t *testing.T) {
subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR) subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR)
publishFirstRR := request(t, s, "PUT", "/mytopic", "my first message", nil) publishFirstRR := request(t, s, "PUT", "/mytopic", "my first message", nil)
assert.Equal(t, 200, publishFirstRR.Code) require.Equal(t, 200, publishFirstRR.Code)
publishSecondRR := request(t, s, "PUT", "/mytopic", "my other message", map[string]string{ publishSecondRR := request(t, s, "PUT", "/mytopic", "my other message", map[string]string{
"Title": " This is a title ", "Title": " This is a title ",
"X-Tags": "tag1,tag 2, tag3", "X-Tags": "tag1,tag 2, tag3",
"p": "1", "p": "1",
}) })
assert.Equal(t, 200, publishSecondRR.Code) require.Equal(t, 200, publishSecondRR.Code)
subscribeCancel() subscribeCancel()
messages := toMessages(t, subscribeRR.Body.String()) messages := toMessages(t, subscribeRR.Body.String())
assert.Equal(t, 3, len(messages)) require.Equal(t, 3, len(messages))
assert.Equal(t, openEvent, messages[0].Event) require.Equal(t, openEvent, messages[0].Event)
assert.Equal(t, messageEvent, messages[1].Event) require.Equal(t, messageEvent, messages[1].Event)
assert.Equal(t, "mytopic", messages[1].Topic) require.Equal(t, "mytopic", messages[1].Topic)
assert.Equal(t, "my first message", messages[1].Message) require.Equal(t, "my first message", messages[1].Message)
assert.Equal(t, "", messages[1].Title) require.Equal(t, "", messages[1].Title)
assert.Equal(t, 0, messages[1].Priority) require.Equal(t, 0, messages[1].Priority)
assert.Nil(t, messages[1].Tags) require.Nil(t, messages[1].Tags)
assert.Equal(t, messageEvent, messages[2].Event) require.Equal(t, messageEvent, messages[2].Event)
assert.Equal(t, "mytopic", messages[2].Topic) require.Equal(t, "mytopic", messages[2].Topic)
assert.Equal(t, "my other message", messages[2].Message) require.Equal(t, "my other message", messages[2].Message)
assert.Equal(t, "This is a title", messages[2].Title) require.Equal(t, "This is a title", messages[2].Title)
assert.Equal(t, 1, messages[2].Priority) require.Equal(t, 1, messages[2].Priority)
assert.Equal(t, []string{"tag1", "tag 2", "tag3"}, messages[2].Tags) require.Equal(t, []string{"tag1", "tag 2", "tag3"}, messages[2].Tags)
} }
func TestServer_StaticSites(t *testing.T) { func TestServer_StaticSites(t *testing.T) {
s := newTestServer(t, newTestConfig(t)) s := newTestServer(t, newTestConfig(t))
rr := request(t, s, "GET", "/", "", nil) rr := request(t, s, "GET", "/", "", nil)
assert.Equal(t, 200, rr.Code) require.Equal(t, 200, rr.Code)
assert.Contains(t, rr.Body.String(), "</html>") require.Contains(t, rr.Body.String(), "</html>")
rr = request(t, s, "HEAD", "/", "", nil) rr = request(t, s, "HEAD", "/", "", nil)
assert.Equal(t, 200, rr.Code) require.Equal(t, 200, rr.Code)
rr = request(t, s, "GET", "/does-not-exist.txt", "", nil) rr = request(t, s, "GET", "/does-not-exist.txt", "", nil)
assert.Equal(t, 404, rr.Code) require.Equal(t, 404, rr.Code)
rr = request(t, s, "GET", "/mytopic", "", nil) rr = request(t, s, "GET", "/mytopic", "", nil)
assert.Equal(t, 200, rr.Code) require.Equal(t, 200, rr.Code)
assert.Contains(t, rr.Body.String(), `<meta name="robots" content="noindex, nofollow" />`) require.Contains(t, rr.Body.String(), `<meta name="robots" content="noindex, nofollow" />`)
rr = request(t, s, "GET", "/static/css/app.css", "", nil) rr = request(t, s, "GET", "/static/css/app.css", "", nil)
assert.Equal(t, 200, rr.Code) require.Equal(t, 200, rr.Code)
assert.Contains(t, rr.Body.String(), `html, body {`) require.Contains(t, rr.Body.String(), `html, body {`)
rr = request(t, s, "GET", "/docs", "", nil) rr = request(t, s, "GET", "/docs", "", nil)
assert.Equal(t, 301, rr.Code) require.Equal(t, 301, rr.Code)
rr = request(t, s, "GET", "/docs/", "", nil) rr = request(t, s, "GET", "/docs/", "", nil)
assert.Equal(t, 200, rr.Code) require.Equal(t, 200, rr.Code)
assert.Contains(t, rr.Body.String(), `Made with ❤️ by Philipp C. Heckel`) require.Contains(t, rr.Body.String(), `Made with ❤️ by Philipp C. Heckel`)
assert.Contains(t, rr.Body.String(), `<script src=static/js/extra.js></script>`) require.Contains(t, rr.Body.String(), `<script src=static/js/extra.js></script>`)
}
func TestServer_PublishLargeMessage(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
body := strings.Repeat("this is a large message", 1000)
truncated := body[0:512]
response := request(t, s, "PUT", "/mytopic", body, nil)
msg := toMessage(t, response.Body.String())
require.NotEmpty(t, msg.ID)
require.Equal(t, truncated, msg.Message)
response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, truncated, messages[0].Message)
} }
func TestServer_PublishNoCache(t *testing.T) { func TestServer_PublishNoCache(t *testing.T) {
@ -159,12 +175,78 @@ func TestServer_PublishNoCache(t *testing.T) {
"Cache": "no", "Cache": "no",
}) })
msg := toMessage(t, response.Body.String()) msg := toMessage(t, response.Body.String())
assert.NotEmpty(t, msg.ID) require.NotEmpty(t, msg.ID)
assert.Equal(t, "this message is not cached", msg.Message) require.Equal(t, "this message is not cached", msg.Message)
response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil) response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String()) messages := toMessages(t, response.Body.String())
assert.Empty(t, messages) require.Empty(t, messages)
}
func TestServer_PublishAt(t *testing.T) {
c := newTestConfig(t)
c.MinDelay = time.Second
c.AtSenderInterval = 100 * time.Millisecond
s := newTestServer(t, c)
response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
"In": "1s",
})
require.Equal(t, 200, response.Code)
response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 0, len(messages))
time.Sleep(time.Second)
require.Nil(t, s.sendDelayedMessages())
response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, "a message", messages[0].Message)
}
func TestServer_PublishAtWithCacheError(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
"Cache": "no",
"In": "30 min",
})
require.Equal(t, 400, response.Code)
}
func TestServer_PublishAtTooShortDelay(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
"In": "1s",
})
require.Equal(t, 400, response.Code)
}
func TestServer_PublishAtTooLongDelay(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
"In": "99999999h",
})
require.Equal(t, 400, response.Code)
}
func TestServer_PublishAtAndPrune(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
"In": "1h",
})
require.Equal(t, 200, response.Code)
s.updateStatsAndPrune() // Fire pruning
response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages)) // Not affected by pruning
require.Equal(t, "a message", messages[0].Message)
} }
func TestServer_PublishAndMultiPoll(t *testing.T) { func TestServer_PublishAndMultiPoll(t *testing.T) {
@ -172,29 +254,29 @@ func TestServer_PublishAndMultiPoll(t *testing.T) {
response := request(t, s, "PUT", "/mytopic1", "message 1", nil) response := request(t, s, "PUT", "/mytopic1", "message 1", nil)
msg := toMessage(t, response.Body.String()) msg := toMessage(t, response.Body.String())
assert.NotEmpty(t, msg.ID) require.NotEmpty(t, msg.ID)
assert.Equal(t, "mytopic1", msg.Topic) require.Equal(t, "mytopic1", msg.Topic)
assert.Equal(t, "message 1", msg.Message) require.Equal(t, "message 1", msg.Message)
response = request(t, s, "PUT", "/mytopic2", "message 2", nil) response = request(t, s, "PUT", "/mytopic2", "message 2", nil)
msg = toMessage(t, response.Body.String()) msg = toMessage(t, response.Body.String())
assert.NotEmpty(t, msg.ID) require.NotEmpty(t, msg.ID)
assert.Equal(t, "mytopic2", msg.Topic) require.Equal(t, "mytopic2", msg.Topic)
assert.Equal(t, "message 2", msg.Message) require.Equal(t, "message 2", msg.Message)
response = request(t, s, "GET", "/mytopic1/json?poll=1", "", nil) response = request(t, s, "GET", "/mytopic1/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String()) messages := toMessages(t, response.Body.String())
assert.Equal(t, 1, len(messages)) require.Equal(t, 1, len(messages))
assert.Equal(t, "mytopic1", messages[0].Topic) require.Equal(t, "mytopic1", messages[0].Topic)
assert.Equal(t, "message 1", messages[0].Message) require.Equal(t, "message 1", messages[0].Message)
response = request(t, s, "GET", "/mytopic1,mytopic2/json?poll=1", "", nil) response = request(t, s, "GET", "/mytopic1,mytopic2/json?poll=1", "", nil)
messages = toMessages(t, response.Body.String()) messages = toMessages(t, response.Body.String())
assert.Equal(t, 2, len(messages)) require.Equal(t, 2, len(messages))
assert.Equal(t, "mytopic1", messages[0].Topic) require.Equal(t, "mytopic1", messages[0].Topic)
assert.Equal(t, "message 1", messages[0].Message) require.Equal(t, "message 1", messages[0].Message)
assert.Equal(t, "mytopic2", messages[1].Topic) require.Equal(t, "mytopic2", messages[1].Topic)
assert.Equal(t, "message 2", messages[1].Message) require.Equal(t, "message 2", messages[1].Message)
} }
func TestServer_PublishWithNopCache(t *testing.T) { func TestServer_PublishWithNopCache(t *testing.T) {
@ -206,18 +288,18 @@ func TestServer_PublishWithNopCache(t *testing.T) {
subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR) subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR)
publishRR := request(t, s, "PUT", "/mytopic", "my first message", nil) publishRR := request(t, s, "PUT", "/mytopic", "my first message", nil)
assert.Equal(t, 200, publishRR.Code) require.Equal(t, 200, publishRR.Code)
subscribeCancel() subscribeCancel()
messages := toMessages(t, subscribeRR.Body.String()) messages := toMessages(t, subscribeRR.Body.String())
assert.Equal(t, 2, len(messages)) require.Equal(t, 2, len(messages))
assert.Equal(t, openEvent, messages[0].Event) require.Equal(t, openEvent, messages[0].Event)
assert.Equal(t, messageEvent, messages[1].Event) require.Equal(t, messageEvent, messages[1].Event)
assert.Equal(t, "my first message", messages[1].Message) require.Equal(t, "my first message", messages[1].Message)
response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil) response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages = toMessages(t, response.Body.String()) messages = toMessages(t, response.Body.String())
assert.Empty(t, messages) require.Empty(t, messages)
} }
func newTestConfig(t *testing.T) *config.Config { func newTestConfig(t *testing.T) *config.Config {
@ -278,6 +360,6 @@ func toMessages(t *testing.T, s string) []*message {
func toMessage(t *testing.T, s string) *message { func toMessage(t *testing.T, s string) *message {
var m message var m message
assert.Nil(t, json.NewDecoder(strings.NewReader(s)).Decode(&m)) require.Nil(t, json.NewDecoder(strings.NewReader(s)).Decode(&m))
return &m return &m
} }

97
util/time.go Normal file
View File

@ -0,0 +1,97 @@
package util
import (
"errors"
"github.com/olebedev/when"
"regexp"
"strconv"
"strings"
"time"
)
var (
errUnparsableTime = errors.New("unable to parse time")
durationStrRegex = regexp.MustCompile(`(?i)^(\d+)\s*(d|days?|h|hours?|m|mins?|minutes?|s|secs?|seconds?)$`)
)
// ParseFutureTime parses a date/time string to a time.Time. It supports unix timestamps, durations
// and natural language dates
func ParseFutureTime(s string, now time.Time) (time.Time, error) {
s = strings.TrimSpace(s)
t, err := parseUnixTime(s, now)
if err == nil {
return t, nil
}
t, err = parseFromDuration(s, now)
if err == nil {
return t, nil
}
t, err = parseNaturalTime(s, now)
if err == nil {
return t, nil
}
return time.Time{}, errUnparsableTime
}
func parseFromDuration(s string, now time.Time) (time.Time, error) {
d, err := parseDuration(s)
if err == nil {
return now.Add(d), nil
}
return time.Time{}, errUnparsableTime
}
func parseDuration(s string) (time.Duration, error) {
d, err := time.ParseDuration(s)
if err == nil {
return d, nil
}
matches := durationStrRegex.FindStringSubmatch(s)
if matches != nil {
number, err := strconv.Atoi(matches[1])
if err != nil {
return 0, errUnparsableTime
}
switch unit := matches[2][0:1]; unit {
case "d":
return time.Duration(number) * 24 * time.Hour, nil
case "h":
return time.Duration(number) * time.Hour, nil
case "m":
return time.Duration(number) * time.Minute, nil
case "s":
return time.Duration(number) * time.Second, nil
default:
return 0, errUnparsableTime
}
}
return 0, errUnparsableTime
}
func parseUnixTime(s string, now time.Time) (time.Time, error) {
t, err := strconv.Atoi(s)
if err != nil {
return time.Time{}, err
} else if int64(t) < now.Unix() {
return time.Time{}, errUnparsableTime
}
return time.Unix(int64(t), 0).UTC(), nil
}
func parseNaturalTime(s string, now time.Time) (time.Time, error) {
r, err := when.EN.Parse(s, now) // returns "nil, nil" if no matches!
if err != nil || r == nil {
return time.Time{}, errUnparsableTime
} else if r.Time.After(now) {
return r.Time, nil
}
// Hack: If the time is parsable, but not in the future,
// simply append "tomorrow, " to it.
r, err = when.EN.Parse("tomorrow, "+s, now) // returns "nil, nil" if no matches!
if err != nil || r == nil {
return time.Time{}, errUnparsableTime
} else if r.Time.After(now) {
return r.Time, nil
}
return time.Time{}, errUnparsableTime
}

60
util/time_test.go Normal file
View File

@ -0,0 +1,60 @@
package util
import (
"github.com/stretchr/testify/require"
"testing"
"time"
)
var (
// 2021-12-10 10:17:23 (Friday)
base = time.Date(2021, 12, 10, 10, 17, 23, 0, time.UTC)
)
func TestParseFutureTime_11am_FutureTime(t *testing.T) {
d, err := ParseFutureTime("11am", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 10, 11, 0, 0, 0, time.UTC), d) // Same day
}
func TestParseFutureTime_9am_PastTime(t *testing.T) {
d, err := ParseFutureTime("9am", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 11, 9, 0, 0, 0, time.UTC), d) // Next day
}
func TestParseFutureTime_Monday_10_30pm_FutureTime(t *testing.T) {
d, err := ParseFutureTime("Monday, 10:30pm", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 13, 22, 30, 0, 0, time.UTC), d)
}
func TestParseFutureTime_30m(t *testing.T) {
d, err := ParseFutureTime("30m", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 10, 10, 47, 23, 0, time.UTC), d)
}
func TestParseFutureTime_30min(t *testing.T) {
d, err := ParseFutureTime("30min", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 10, 10, 47, 23, 0, time.UTC), d)
}
func TestParseFutureTime_3h(t *testing.T) {
d, err := ParseFutureTime("3h", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 10, 13, 17, 23, 0, time.UTC), d)
}
func TestParseFutureTime_1day(t *testing.T) {
d, err := ParseFutureTime("1 day", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 11, 10, 17, 23, 0, time.UTC), d)
}
func TestParseFutureTime_UnixTime(t *testing.T) {
d, err := ParseFutureTime("1639183911", base)
require.Nil(t, err)
require.Equal(t, time.Date(2021, 12, 11, 0, 51, 51, 0, time.UTC), d)
}