diff --git a/bin/memos/main.go b/bin/memos/main.go index f4ad21e4..0feec6b0 100644 --- a/bin/memos/main.go +++ b/bin/memos/main.go @@ -84,7 +84,7 @@ var ( printGreetings() - if err := s.Start(); err != nil { + if err := s.Start(ctx); err != nil { if err != http.ErrServerClosed { slog.Error("failed to start server", err) cancel() diff --git a/server/server.go b/server/server.go index 83ea92f8..eb4b668c 100644 --- a/server/server.go +++ b/server/server.go @@ -15,7 +15,6 @@ import ( "github.com/soheilhy/cmux" "google.golang.org/grpc" - "github.com/usememos/memos/internal/jobs" storepb "github.com/usememos/memos/proto/gen/store" "github.com/usememos/memos/server/profile" "github.com/usememos/memos/server/route/api/auth" @@ -23,6 +22,7 @@ import ( "github.com/usememos/memos/server/route/frontend" "github.com/usememos/memos/server/route/resource" "github.com/usememos/memos/server/route/rss" + resourcepresign "github.com/usememos/memos/server/service/resource_presign" versionchecker "github.com/usememos/memos/server/service/version_checker" "github.com/usememos/memos/store" ) @@ -104,7 +104,7 @@ func NewServer(ctx context.Context, profile *profile.Profile, store *store.Store return s, nil } -func (s *Server) Start() error { +func (s *Server) Start(ctx context.Context) error { address := fmt.Sprintf(":%d", s.Profile.Port) listener, err := net.Listen("tcp", address) if err != nil { @@ -125,6 +125,7 @@ func (s *Server) Start() error { slog.Error("failed to start echo server", err) } }() + s.StartBackgroundRunners(ctx) return muxServer.Serve() } @@ -146,8 +147,8 @@ func (s *Server) Shutdown(ctx context.Context) { fmt.Printf("memos stopped properly\n") } -func (s *Server) StartRunners(ctx context.Context) { - go jobs.RunPreSignLinks(ctx, s.Store) +func (s *Server) StartBackgroundRunners(ctx context.Context) { + go resourcepresign.RunPreSignLinks(ctx, s.Store) go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx) } diff --git a/internal/jobs/presign_link.go b/server/service/resource_presign/resource_presign.go similarity index 56% rename from internal/jobs/presign_link.go rename to server/service/resource_presign/resource_presign.go index 23868044..9186d2ef 100644 --- a/internal/jobs/presign_link.go +++ b/server/service/resource_presign/resource_presign.go @@ -1,4 +1,4 @@ -package jobs +package resourcepresign import ( "context" @@ -9,13 +9,11 @@ import ( "github.com/pkg/errors" "github.com/usememos/memos/plugin/storage/s3" - apiv2pb "github.com/usememos/memos/proto/gen/api/v2" storepb "github.com/usememos/memos/proto/gen/store" - apiv2 "github.com/usememos/memos/server/route/api/v2" "github.com/usememos/memos/store" ) -// RunPreSignLinks is a background job that pre-signs external links stored in the database. +// RunPreSignLinks is a background runner that pre-signs external links stored in the database. // It uses S3 client to generate presigned URLs and updates the corresponding resources in the store. func RunPreSignLinks(ctx context.Context, dataStore *store.Store) { for { @@ -33,8 +31,6 @@ func RunPreSignLinks(ctx context.Context, dataStore *store.Store) { } func signExternalLinks(ctx context.Context, dataStore *store.Store) error { - const pageSize = 32 - objectStore, err := findObjectStorage(ctx, dataStore) if err != nil { return errors.Wrapf(err, "find object storage") @@ -44,51 +40,40 @@ func signExternalLinks(ctx context.Context, dataStore *store.Store) error { return nil } - var offset int - var limit = pageSize - for { - resources, err := dataStore.ListResources(ctx, &store.FindResource{ - GetBlob: false, - Limit: &limit, - Offset: &offset, - }) + resources, err := dataStore.ListResources(ctx, &store.FindResource{ + GetBlob: false, + }) + if err != nil { + return errors.Wrapf(err, "list resources") + } + + for _, resource := range resources { + if resource.ExternalLink == "" { + // not for object store + continue + } + if strings.Contains(resource.ExternalLink, "?") && time.Since(time.Unix(resource.UpdatedTs, 0)) < s3.LinkLifetime/2 { + // resource not signed (hack for migration) + // resource was recently updated - skipping + continue + } + + newLink, err := objectStore.PreSignLink(ctx, resource.ExternalLink) if err != nil { - return errors.Wrapf(err, "list resources, offset %d", offset) + slog.Error("failed to pre-sign link", err) + continue } - for _, res := range resources { - if res.ExternalLink == "" { - // not for object store - continue - } - if strings.Contains(res.ExternalLink, "?") && time.Since(time.Unix(res.UpdatedTs, 0)) < s3.LinkLifetime/2 { - // resource not signed (hack for migration) - // resource was recently updated - skipping - continue - } - newLink, err := objectStore.PreSignLink(ctx, res.ExternalLink) - if err != nil { - slog.Error("failed to pre-sign link", err) - continue // do not fail - we may want update left over links too - } - now := time.Now().Unix() - // we may want to use here transaction and batch update in the future - _, err = dataStore.UpdateResource(ctx, &store.UpdateResource{ - ID: res.ID, - UpdatedTs: &now, - ExternalLink: &newLink, - }) - if err != nil { - // something with DB - better to stop here - return errors.Wrapf(err, "update resource %d link to %q", res.ID, newLink) - } - } - - offset += limit - if len(resources) < limit { - break + now := time.Now().Unix() + if _, err := dataStore.UpdateResource(ctx, &store.UpdateResource{ + ID: resource.ID, + UpdatedTs: &now, + ExternalLink: &newLink, + }); err != nil { + return errors.Wrapf(err, "update resource %d link to %q", resource.ID, newLink) } } + return nil } @@ -107,16 +92,11 @@ func findObjectStorage(ctx context.Context, dataStore *store.Store) (*s3.Client, if err != nil { return nil, errors.Wrap(err, "Failed to find storage") } - if storage == nil { + if storage == nil || storage.Type != storepb.Storage_S3 { return nil, nil } - storageMessage := apiv2.ConvertStorageFromStore(storage) - if storageMessage.Type != apiv2pb.Storage_S3 { - return nil, nil - } - - s3Config := storageMessage.Config.GetS3Config() + s3Config := storage.Config.GetS3Config() return s3.NewClient(ctx, &s3.Config{ AccessKey: s3Config.AccessKey, SecretKey: s3Config.SecretKey,