chore: update background services

This commit is contained in:
Steven 2024-04-18 23:34:35 +08:00
parent 2a93b8d720
commit e8dfd579c3
3 changed files with 39 additions and 58 deletions

View File

@ -84,7 +84,7 @@ var (
printGreetings() printGreetings()
if err := s.Start(); err != nil { if err := s.Start(ctx); err != nil {
if err != http.ErrServerClosed { if err != http.ErrServerClosed {
slog.Error("failed to start server", err) slog.Error("failed to start server", err)
cancel() cancel()

View File

@ -15,7 +15,6 @@ import (
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/usememos/memos/internal/jobs"
storepb "github.com/usememos/memos/proto/gen/store" storepb "github.com/usememos/memos/proto/gen/store"
"github.com/usememos/memos/server/profile" "github.com/usememos/memos/server/profile"
"github.com/usememos/memos/server/route/api/auth" "github.com/usememos/memos/server/route/api/auth"
@ -23,6 +22,7 @@ import (
"github.com/usememos/memos/server/route/frontend" "github.com/usememos/memos/server/route/frontend"
"github.com/usememos/memos/server/route/resource" "github.com/usememos/memos/server/route/resource"
"github.com/usememos/memos/server/route/rss" "github.com/usememos/memos/server/route/rss"
resourcepresign "github.com/usememos/memos/server/service/resource_presign"
versionchecker "github.com/usememos/memos/server/service/version_checker" versionchecker "github.com/usememos/memos/server/service/version_checker"
"github.com/usememos/memos/store" "github.com/usememos/memos/store"
) )
@ -104,7 +104,7 @@ func NewServer(ctx context.Context, profile *profile.Profile, store *store.Store
return s, nil return s, nil
} }
func (s *Server) Start() error { func (s *Server) Start(ctx context.Context) error {
address := fmt.Sprintf(":%d", s.Profile.Port) address := fmt.Sprintf(":%d", s.Profile.Port)
listener, err := net.Listen("tcp", address) listener, err := net.Listen("tcp", address)
if err != nil { if err != nil {
@ -125,6 +125,7 @@ func (s *Server) Start() error {
slog.Error("failed to start echo server", err) slog.Error("failed to start echo server", err)
} }
}() }()
s.StartBackgroundRunners(ctx)
return muxServer.Serve() return muxServer.Serve()
} }
@ -146,8 +147,8 @@ func (s *Server) Shutdown(ctx context.Context) {
fmt.Printf("memos stopped properly\n") fmt.Printf("memos stopped properly\n")
} }
func (s *Server) StartRunners(ctx context.Context) { func (s *Server) StartBackgroundRunners(ctx context.Context) {
go jobs.RunPreSignLinks(ctx, s.Store) go resourcepresign.RunPreSignLinks(ctx, s.Store)
go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx) go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx)
} }

View File

@ -1,4 +1,4 @@
package jobs package resourcepresign
import ( import (
"context" "context"
@ -9,13 +9,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/usememos/memos/plugin/storage/s3" "github.com/usememos/memos/plugin/storage/s3"
apiv2pb "github.com/usememos/memos/proto/gen/api/v2"
storepb "github.com/usememos/memos/proto/gen/store" storepb "github.com/usememos/memos/proto/gen/store"
apiv2 "github.com/usememos/memos/server/route/api/v2"
"github.com/usememos/memos/store" "github.com/usememos/memos/store"
) )
// RunPreSignLinks is a background job that pre-signs external links stored in the database. // RunPreSignLinks is a background runner that pre-signs external links stored in the database.
// It uses S3 client to generate presigned URLs and updates the corresponding resources in the store. // It uses S3 client to generate presigned URLs and updates the corresponding resources in the store.
func RunPreSignLinks(ctx context.Context, dataStore *store.Store) { func RunPreSignLinks(ctx context.Context, dataStore *store.Store) {
for { for {
@ -33,8 +31,6 @@ func RunPreSignLinks(ctx context.Context, dataStore *store.Store) {
} }
func signExternalLinks(ctx context.Context, dataStore *store.Store) error { func signExternalLinks(ctx context.Context, dataStore *store.Store) error {
const pageSize = 32
objectStore, err := findObjectStorage(ctx, dataStore) objectStore, err := findObjectStorage(ctx, dataStore)
if err != nil { if err != nil {
return errors.Wrapf(err, "find object storage") return errors.Wrapf(err, "find object storage")
@ -44,51 +40,40 @@ func signExternalLinks(ctx context.Context, dataStore *store.Store) error {
return nil return nil
} }
var offset int resources, err := dataStore.ListResources(ctx, &store.FindResource{
var limit = pageSize GetBlob: false,
for { })
resources, err := dataStore.ListResources(ctx, &store.FindResource{ if err != nil {
GetBlob: false, return errors.Wrapf(err, "list resources")
Limit: &limit, }
Offset: &offset,
}) for _, resource := range resources {
if resource.ExternalLink == "" {
// not for object store
continue
}
if strings.Contains(resource.ExternalLink, "?") && time.Since(time.Unix(resource.UpdatedTs, 0)) < s3.LinkLifetime/2 {
// resource not signed (hack for migration)
// resource was recently updated - skipping
continue
}
newLink, err := objectStore.PreSignLink(ctx, resource.ExternalLink)
if err != nil { if err != nil {
return errors.Wrapf(err, "list resources, offset %d", offset) slog.Error("failed to pre-sign link", err)
continue
} }
for _, res := range resources { now := time.Now().Unix()
if res.ExternalLink == "" { if _, err := dataStore.UpdateResource(ctx, &store.UpdateResource{
// not for object store ID: resource.ID,
continue UpdatedTs: &now,
} ExternalLink: &newLink,
if strings.Contains(res.ExternalLink, "?") && time.Since(time.Unix(res.UpdatedTs, 0)) < s3.LinkLifetime/2 { }); err != nil {
// resource not signed (hack for migration) return errors.Wrapf(err, "update resource %d link to %q", resource.ID, newLink)
// resource was recently updated - skipping
continue
}
newLink, err := objectStore.PreSignLink(ctx, res.ExternalLink)
if err != nil {
slog.Error("failed to pre-sign link", err)
continue // do not fail - we may want update left over links too
}
now := time.Now().Unix()
// we may want to use here transaction and batch update in the future
_, err = dataStore.UpdateResource(ctx, &store.UpdateResource{
ID: res.ID,
UpdatedTs: &now,
ExternalLink: &newLink,
})
if err != nil {
// something with DB - better to stop here
return errors.Wrapf(err, "update resource %d link to %q", res.ID, newLink)
}
}
offset += limit
if len(resources) < limit {
break
} }
} }
return nil return nil
} }
@ -107,16 +92,11 @@ func findObjectStorage(ctx context.Context, dataStore *store.Store) (*s3.Client,
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Failed to find storage") return nil, errors.Wrap(err, "Failed to find storage")
} }
if storage == nil { if storage == nil || storage.Type != storepb.Storage_S3 {
return nil, nil return nil, nil
} }
storageMessage := apiv2.ConvertStorageFromStore(storage) s3Config := storage.Config.GetS3Config()
if storageMessage.Type != apiv2pb.Storage_S3 {
return nil, nil
}
s3Config := storageMessage.Config.GetS3Config()
return s3.NewClient(ctx, &s3.Config{ return s3.NewClient(ctx, &s3.Config{
AccessKey: s3Config.AccessKey, AccessKey: s3Config.AccessKey,
SecretKey: s3Config.SecretKey, SecretKey: s3Config.SecretKey,